diff --git a/app/__init__.py b/app/__init__.py index 23c2399e1..add218e5d 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -18,6 +18,7 @@ from sqlalchemy import event from werkzeug.exceptions import HTTPException as WerkzeugHTTPException from werkzeug.local import LocalProxy +from app import config from app.clients import NotificationProviderClients from app.clients.cloudwatch.aws_cloudwatch import AwsCloudwatchClient from app.clients.document_download import DocumentDownloadClient @@ -58,15 +59,28 @@ class SQLAlchemy(_SQLAlchemy): def apply_driver_hacks(self, app, info, options): sa_url, options = super().apply_driver_hacks(app, info, options) + if "connect_args" not in options: options["connect_args"] = {} options["connect_args"]["options"] = "-c statement_timeout={}".format( int(app.config["SQLALCHEMY_STATEMENT_TIMEOUT"]) * 1000 ) + return (sa_url, options) -db = SQLAlchemy() +# Set db engine settings here for now. +# They were not being set previous (despite environmental variables with appropriate +# sounding names) and were defaulting to low values +db = SQLAlchemy( + engine_options={ + "pool_size": config.Config.SQLALCHEMY_POOL_SIZE, + "max_overflow": 10, + "pool_timeout": config.Config.SQLALCHEMY_POOL_TIMEOUT, + "pool_recycle": config.Config.SQLALCHEMY_POOL_RECYCLE, + "pool_pre_ping": True, + } +) migrate = Migrate() ma = Marshmallow() notify_celery = NotifyCelery() diff --git a/app/celery/scheduled_tasks.py b/app/celery/scheduled_tasks.py index 2dcd570cc..72806aa58 100644 --- a/app/celery/scheduled_tasks.py +++ b/app/celery/scheduled_tasks.py @@ -24,6 +24,7 @@ from app.dao.jobs_dao import ( find_missing_row_for_job, ) from app.dao.notifications_dao import ( + dao_close_out_delivery_receipts, dao_update_delivery_receipts, notifications_not_yet_sent, ) @@ -242,6 +243,8 @@ def check_for_services_with_high_failure_rates_or_sending_to_tv_numbers(): bind=True, max_retries=7, default_retry_delay=3600, name="process-delivery-receipts" ) def process_delivery_receipts(self): + # If we need to check db settings do it here for convenience + # current_app.logger.info(f"POOL SIZE {app.db.engine.pool.size()}") """ Every eight minutes or so (see config.py) we run this task, which searches the last ten minutes of logs for delivery receipts and batch updates the db with the results. The overlap @@ -278,3 +281,10 @@ def process_delivery_receipts(self): current_app.logger.error( "Failed process delivery receipts after max retries" ) + + +@notify_celery.task( + bind=True, max_retries=2, default_retry_delay=3600, name="cleanup-delivery-receipts" +) +def cleanup_delivery_receipts(self): + dao_close_out_delivery_receipts() diff --git a/app/clients/__init__.py b/app/clients/__init__.py index 3392928e4..f185e45e2 100644 --- a/app/clients/__init__.py +++ b/app/clients/__init__.py @@ -13,8 +13,7 @@ AWS_CLIENT_CONFIG = Config( "addressing_style": "virtual", }, use_fips_endpoint=True, - # This is the default but just for doc sake - max_pool_connections=10, + max_pool_connections=50, # This should be equal or greater than our celery concurrency ) diff --git a/app/config.py b/app/config.py index 9ec37a71c..0a67b7a95 100644 --- a/app/config.py +++ b/app/config.py @@ -90,7 +90,7 @@ class Config(object): SQLALCHEMY_DATABASE_URI = cloud_config.database_url SQLALCHEMY_RECORD_QUERIES = False SQLALCHEMY_TRACK_MODIFICATIONS = False - SQLALCHEMY_POOL_SIZE = int(getenv("SQLALCHEMY_POOL_SIZE", 5)) + SQLALCHEMY_POOL_SIZE = int(getenv("SQLALCHEMY_POOL_SIZE", 40)) SQLALCHEMY_POOL_TIMEOUT = 30 SQLALCHEMY_POOL_RECYCLE = 300 SQLALCHEMY_STATEMENT_TIMEOUT = 1200 @@ -215,6 +215,11 @@ class Config(object): "schedule": timedelta(minutes=2), "options": {"queue": QueueNames.PERIODIC}, }, + "cleanup-delivery-receipts": { + "task": "cleanup-delivery-receipts", + "schedule": timedelta(minutes=82), + "options": {"queue": QueueNames.PERIODIC}, + }, "expire-or-delete-invitations": { "task": "expire-or-delete-invitations", "schedule": timedelta(minutes=66), diff --git a/app/dao/jobs_dao.py b/app/dao/jobs_dao.py index ddec26956..c969c4b53 100644 --- a/app/dao/jobs_dao.py +++ b/app/dao/jobs_dao.py @@ -45,7 +45,7 @@ def dao_get_job_by_service_id_and_job_id(service_id, job_id): def dao_get_unfinished_jobs(): stmt = select(Job).filter(Job.processing_finished.is_(None)) - return db.session.execute(stmt).all() + return db.session.execute(stmt).scalars().all() def dao_get_jobs_by_service_id( diff --git a/app/dao/notifications_dao.py b/app/dao/notifications_dao.py index 139f7ae8a..c8f2797a0 100644 --- a/app/dao/notifications_dao.py +++ b/app/dao/notifications_dao.py @@ -780,3 +780,22 @@ def dao_update_delivery_receipts(receipts, delivered): f"#loadtestperformance batch update query time: \ updated {len(receipts)} notification in {elapsed_time} ms" ) + + +def dao_close_out_delivery_receipts(): + THREE_DAYS_AGO = utc_now() - timedelta(minutes=3) + stmt = ( + update(Notification) + .where( + Notification.status == NotificationStatus.PENDING, + Notification.sent_at < THREE_DAYS_AGO, + ) + .values(status=NotificationStatus.FAILED, provider_response="Technical Failure") + ) + result = db.session.execute(stmt) + + db.session.commit() + if result: + current_app.logger.info( + f"Marked {result.rowcount} notifications as technical failures" + ) diff --git a/manifest.yml b/manifest.yml index 39e842730..0763a1911 100644 --- a/manifest.yml +++ b/manifest.yml @@ -26,7 +26,7 @@ applications: - type: worker instances: ((worker_instances)) memory: ((worker_memory)) - command: newrelic-admin run-program celery -A run_celery.notify_celery worker --loglevel=INFO --pool=threads --concurrency=10 + command: newrelic-admin run-program celery -A run_celery.notify_celery worker --loglevel=INFO --pool=threads --concurrency=10 --prefetch-multiplier=2 - type: scheduler instances: 1 memory: ((scheduler_memory)) diff --git a/tests/app/dao/notification_dao/test_notification_dao.py b/tests/app/dao/notification_dao/test_notification_dao.py index 6e09f182a..f6905a749 100644 --- a/tests/app/dao/notification_dao/test_notification_dao.py +++ b/tests/app/dao/notification_dao/test_notification_dao.py @@ -11,6 +11,7 @@ from sqlalchemy.orm.exc import NoResultFound from app import db from app.dao.notifications_dao import ( + dao_close_out_delivery_receipts, dao_create_notification, dao_delete_notifications_by_id, dao_get_last_notification_added_for_job_id, @@ -2026,6 +2027,23 @@ def test_update_delivery_receipts(mocker): assert "provider_response" in kwargs +def test_close_out_delivery_receipts(mocker): + mock_session = mocker.patch("app.dao.notifications_dao.db.session") + mock_update = MagicMock() + mock_where = MagicMock() + mock_values = MagicMock() + mock_update.where.return_value = mock_where + mock_where.values.return_value = mock_values + + mock_session.execute.return_value = None + with patch("app.dao.notifications_dao.update", return_value=mock_update): + dao_close_out_delivery_receipts() + mock_update.where.assert_called_once() + mock_where.values.assert_called_once() + mock_session.execute.assert_called_once_with(mock_values) + mock_session.commit.assert_called_once() + + @pytest.mark.parametrize( "created_at_utc,date_to_check,expected_count", [