merge from main

This commit is contained in:
Kenneth Kehl
2025-01-15 07:40:53 -08:00
8 changed files with 71 additions and 6 deletions

View File

@@ -18,6 +18,7 @@ from sqlalchemy import event
from werkzeug.exceptions import HTTPException as WerkzeugHTTPException
from werkzeug.local import LocalProxy
from app import config
from app.clients import NotificationProviderClients
from app.clients.cloudwatch.aws_cloudwatch import AwsCloudwatchClient
from app.clients.document_download import DocumentDownloadClient
@@ -58,15 +59,28 @@ class SQLAlchemy(_SQLAlchemy):
def apply_driver_hacks(self, app, info, options):
sa_url, options = super().apply_driver_hacks(app, info, options)
if "connect_args" not in options:
options["connect_args"] = {}
options["connect_args"]["options"] = "-c statement_timeout={}".format(
int(app.config["SQLALCHEMY_STATEMENT_TIMEOUT"]) * 1000
)
return (sa_url, options)
db = SQLAlchemy()
# Set db engine settings here for now.
# They were not being set previous (despite environmental variables with appropriate
# sounding names) and were defaulting to low values
db = SQLAlchemy(
engine_options={
"pool_size": config.Config.SQLALCHEMY_POOL_SIZE,
"max_overflow": 10,
"pool_timeout": config.Config.SQLALCHEMY_POOL_TIMEOUT,
"pool_recycle": config.Config.SQLALCHEMY_POOL_RECYCLE,
"pool_pre_ping": True,
}
)
migrate = Migrate()
ma = Marshmallow()
notify_celery = NotifyCelery()

View File

@@ -24,6 +24,7 @@ from app.dao.jobs_dao import (
find_missing_row_for_job,
)
from app.dao.notifications_dao import (
dao_close_out_delivery_receipts,
dao_update_delivery_receipts,
notifications_not_yet_sent,
)
@@ -242,6 +243,8 @@ def check_for_services_with_high_failure_rates_or_sending_to_tv_numbers():
bind=True, max_retries=7, default_retry_delay=3600, name="process-delivery-receipts"
)
def process_delivery_receipts(self):
# If we need to check db settings do it here for convenience
# current_app.logger.info(f"POOL SIZE {app.db.engine.pool.size()}")
"""
Every eight minutes or so (see config.py) we run this task, which searches the last ten
minutes of logs for delivery receipts and batch updates the db with the results. The overlap
@@ -278,3 +281,10 @@ def process_delivery_receipts(self):
current_app.logger.error(
"Failed process delivery receipts after max retries"
)
@notify_celery.task(
bind=True, max_retries=2, default_retry_delay=3600, name="cleanup-delivery-receipts"
)
def cleanup_delivery_receipts(self):
dao_close_out_delivery_receipts()

View File

@@ -13,8 +13,7 @@ AWS_CLIENT_CONFIG = Config(
"addressing_style": "virtual",
},
use_fips_endpoint=True,
# This is the default but just for doc sake
max_pool_connections=10,
max_pool_connections=50, # This should be equal or greater than our celery concurrency
)

View File

@@ -90,7 +90,7 @@ class Config(object):
SQLALCHEMY_DATABASE_URI = cloud_config.database_url
SQLALCHEMY_RECORD_QUERIES = False
SQLALCHEMY_TRACK_MODIFICATIONS = False
SQLALCHEMY_POOL_SIZE = int(getenv("SQLALCHEMY_POOL_SIZE", 5))
SQLALCHEMY_POOL_SIZE = int(getenv("SQLALCHEMY_POOL_SIZE", 40))
SQLALCHEMY_POOL_TIMEOUT = 30
SQLALCHEMY_POOL_RECYCLE = 300
SQLALCHEMY_STATEMENT_TIMEOUT = 1200
@@ -215,6 +215,11 @@ class Config(object):
"schedule": timedelta(minutes=2),
"options": {"queue": QueueNames.PERIODIC},
},
"cleanup-delivery-receipts": {
"task": "cleanup-delivery-receipts",
"schedule": timedelta(minutes=82),
"options": {"queue": QueueNames.PERIODIC},
},
"expire-or-delete-invitations": {
"task": "expire-or-delete-invitations",
"schedule": timedelta(minutes=66),

View File

@@ -45,7 +45,7 @@ def dao_get_job_by_service_id_and_job_id(service_id, job_id):
def dao_get_unfinished_jobs():
stmt = select(Job).filter(Job.processing_finished.is_(None))
return db.session.execute(stmt).all()
return db.session.execute(stmt).scalars().all()
def dao_get_jobs_by_service_id(

View File

@@ -780,3 +780,22 @@ def dao_update_delivery_receipts(receipts, delivered):
f"#loadtestperformance batch update query time: \
updated {len(receipts)} notification in {elapsed_time} ms"
)
def dao_close_out_delivery_receipts():
THREE_DAYS_AGO = utc_now() - timedelta(minutes=3)
stmt = (
update(Notification)
.where(
Notification.status == NotificationStatus.PENDING,
Notification.sent_at < THREE_DAYS_AGO,
)
.values(status=NotificationStatus.FAILED, provider_response="Technical Failure")
)
result = db.session.execute(stmt)
db.session.commit()
if result:
current_app.logger.info(
f"Marked {result.rowcount} notifications as technical failures"
)