diff --git a/app/celery/scheduled_tasks.py b/app/celery/scheduled_tasks.py index cb0e0886e..e173c923a 100644 --- a/app/celery/scheduled_tasks.py +++ b/app/celery/scheduled_tasks.py @@ -1,10 +1,11 @@ +import json from datetime import timedelta from flask import current_app from sqlalchemy import between from sqlalchemy.exc import SQLAlchemyError -from app import notify_celery, zendesk_client +from app import notify_celery, redis_store, zendesk_client from app.celery.tasks import ( get_recipient_csv_and_template_and_sender_id, process_incomplete_jobs, @@ -24,6 +25,7 @@ from app.dao.jobs_dao import ( find_missing_row_for_job, ) from app.dao.notifications_dao import ( + dao_batch_insert_notifications, dao_close_out_delivery_receipts, dao_update_delivery_receipts, notifications_not_yet_sent, @@ -286,3 +288,17 @@ def process_delivery_receipts(self): ) def cleanup_delivery_receipts(self): dao_close_out_delivery_receipts() + + +@notify_celery.task(bind=True, name="batch-insert-notifications") +def batch_insert_notifications(self): + batch = [] + with redis_store.pipeline: + notification = redis_store.lpop("notification_queue") + batch.append(json.loads(notification)) + try: + dao_batch_insert_notifications(batch) + except Exception as e: + for msg in batch: + redis_store.rpush("notification_queue", json.dumps(msg)) + current_app.logger.exception(f"Notification batch insert failed {e}") diff --git a/app/config.py b/app/config.py index 580495731..bd19ffa59 100644 --- a/app/config.py +++ b/app/config.py @@ -208,6 +208,11 @@ class Config(object): "schedule": timedelta(minutes=82), "options": {"queue": QueueNames.PERIODIC}, }, + "batch-insert-notifications": { + "task": "batch-insert-notifications", + "schedule": 10.0, + "options": {"queue": QueueNames.PERIODIC}, + }, "expire-or-delete-invitations": { "task": "expire-or-delete-invitations", "schedule": timedelta(minutes=66), diff --git a/app/dao/notifications_dao.py b/app/dao/notifications_dao.py index c8f2797a0..cd3c0e1aa 100644 --- a/app/dao/notifications_dao.py +++ b/app/dao/notifications_dao.py @@ -2,6 +2,7 @@ import json from datetime import timedelta from time import time +import sqlalchemy from flask import current_app from sqlalchemy import ( TIMESTAMP, @@ -799,3 +800,12 @@ def dao_close_out_delivery_receipts(): current_app.logger.info( f"Marked {result.rowcount} notifications as technical failures" ) + + +def dao_batch_insert_notifications(batch): + try: + db.session.bulk_save_objects(Notification(**msg) for msg in batch) + db.session.commit() + return len(batch) + except sqlalchemy.exc.SQLAlchemyError as e: + current_app.logger.exception(f"Error during batch insert {e}") diff --git a/app/notifications/process_notifications.py b/app/notifications/process_notifications.py index 5f1c6676d..347d2fc0b 100644 --- a/app/notifications/process_notifications.py +++ b/app/notifications/process_notifications.py @@ -6,7 +6,6 @@ from app import redis_store from app.celery import provider_tasks from app.config import QueueNames from app.dao.notifications_dao import ( - dao_create_notification, dao_delete_notifications_by_id, dao_notification_exists, get_notification_by_id, @@ -139,8 +138,9 @@ def persist_notification( # if simulated create a Notification model to return but do not persist the Notification to the dB if not simulated: - current_app.logger.info("Firing dao_create_notification") - dao_create_notification(notification) + # current_app.logger.info("Firing dao_create_notification") + # dao_create_notification(notification) + redis_store.rpush("message_queue", notification) if key_type != KeyType.TEST and current_app.config["REDIS_ENABLED"]: current_app.logger.info( "Redis enabled, querying cache key for service id: {}".format( @@ -172,7 +172,7 @@ def send_notification_to_queue_detached( deliver_task = provider_tasks.deliver_email try: - deliver_task.apply_async([str(notification_id)], queue=queue) + deliver_task.apply_async([str(notification_id)], queue=queue, countdown=30) except Exception: dao_delete_notifications_by_id(notification_id) raise