Merge pull request #1533 from GSA/notify-api-1531

Investigate bulk_save_objects for notification inserts
This commit is contained in:
Kenneth Kehl
2025-01-15 09:47:00 -08:00
committed by GitHub
17 changed files with 278 additions and 46 deletions

View File

@@ -1,10 +1,11 @@
from datetime import timedelta
import json
from datetime import datetime, timedelta
from flask import current_app
from sqlalchemy import between
from sqlalchemy.exc import SQLAlchemyError
from app import notify_celery, zendesk_client
from app import notify_celery, redis_store, zendesk_client
from app.celery.tasks import (
get_recipient_csv_and_template_and_sender_id,
process_incomplete_jobs,
@@ -24,6 +25,7 @@ from app.dao.jobs_dao import (
find_missing_row_for_job,
)
from app.dao.notifications_dao import (
dao_batch_insert_notifications,
dao_close_out_delivery_receipts,
dao_update_delivery_receipts,
notifications_not_yet_sent,
@@ -34,7 +36,7 @@ from app.dao.services_dao import (
)
from app.dao.users_dao import delete_codes_older_created_more_than_a_day_ago
from app.enums import JobStatus, NotificationType
from app.models import Job
from app.models import Job, Notification
from app.notifications.process_notifications import send_notification_to_queue
from app.utils import utc_now
from notifications_utils import aware_utcnow
@@ -288,3 +290,51 @@ def process_delivery_receipts(self):
)
def cleanup_delivery_receipts(self):
dao_close_out_delivery_receipts()
@notify_celery.task(bind=True, name="batch-insert-notifications")
def batch_insert_notifications(self):
batch = []
# TODO We probably need some way to clear the list if
# things go haywire. A command?
# with redis_store.pipeline():
# while redis_store.llen("message_queue") > 0:
# redis_store.lpop("message_queue")
# current_app.logger.info("EMPTY!")
# return
current_len = redis_store.llen("message_queue")
with redis_store.pipeline():
# since this list is being fed by other processes, just grab what is available when
# this call is made and process that.
count = 0
while count < current_len:
count = count + 1
notification_bytes = redis_store.lpop("message_queue")
notification_dict = json.loads(notification_bytes.decode("utf-8"))
notification_dict["status"] = notification_dict.pop("notification_status")
if not notification_dict.get("created_at"):
notification_dict["created_at"] = utc_now()
elif isinstance(notification_dict["created_at"], list):
notification_dict["created_at"] = notification_dict["created_at"][0]
notification = Notification(**notification_dict)
if notification is not None:
batch.append(notification)
try:
dao_batch_insert_notifications(batch)
except Exception:
current_app.logger.exception("Notification batch insert failed")
for n in batch:
# Use 'created_at' as a TTL so we don't retry infinitely
notification_time = n.created_at
if isinstance(notification_time, str):
notification_time = datetime.fromisoformat(n.created_at)
if notification_time < utc_now() - timedelta(seconds=50):
current_app.logger.warning(
f"Abandoning stale data, could not write to db: {n.serialize_for_redis(n)}"
)
continue
else:
redis_store.rpush("message_queue", json.dumps(n.serialize_for_redis(n)))

View File

@@ -256,7 +256,7 @@ def save_sms(self, service_id, notification_id, encrypted_notification, sender_i
)
)
provider_tasks.deliver_sms.apply_async(
[str(saved_notification.id)], queue=QueueNames.SEND_SMS
[str(saved_notification.id)], queue=QueueNames.SEND_SMS, countdown=60
)
current_app.logger.debug(