add a test

This commit is contained in:
Kenneth Kehl
2025-01-13 10:00:18 -08:00
parent 28470468e2
commit a92eb91470
3 changed files with 48 additions and 25 deletions

View File

@@ -292,39 +292,42 @@ def cleanup_delivery_receipts(self):
@notify_celery.task(bind=True, name="batch-insert-notifications") @notify_celery.task(bind=True, name="batch-insert-notifications")
def batch_insert_notifications(self): def batch_insert_notifications(self):
current_app.logger.info("ENTER SCHEDULED TASK")
batch = [] batch = []
# TODO We probably need some way to clear the list if
# things go haywire. A command?
# with redis_store.pipeline(): # with redis_store.pipeline():
# while redis_store.llen("message_queue") > 0: # while redis_store.llen("message_queue") > 0:
# redis_store.lpop("message_queue") # redis_store.lpop("message_queue")
# current_app.logger.info("EMPTY!") # current_app.logger.info("EMPTY!")
# return # return
current_len = redis_store.llen("message_queue")
with redis_store.pipeline(): with redis_store.pipeline():
current_app.logger.info("PIPELINE") # since this list is being fed by other processes, just grab what is available when
# since this list is always growing, just grab what is available when
# this call is made and process that. # this call is made and process that.
current_len = redis_store.llen("message_queue")
count = 0 count = 0
while count < current_len: while count < current_len:
count = count + 1 count = count + 1
notification_bytes = redis_store.lpop("message_queue") notification_bytes = redis_store.lpop("message_queue")
notification_dict = json.loads(notification_bytes.decode("utf-8")) notification_dict = json.loads(notification_bytes.decode("utf-8"))
notification_dict["status"] = notification_dict.pop("notification_status") notification_dict["status"] = notification_dict.pop("notification_status")
notification_dict["created_at"] = utc_now() if not notification_dict.get("created_at"):
notification_dict["created_at"] = utc_now()
notification = Notification(**notification_dict) notification = Notification(**notification_dict)
current_app.logger.info(
f"WHAT IS THIS NOTIFICATION {type(notification)} {notification}"
)
if notification is not None: if notification is not None:
current_app.logger.info(
f"SCHEDULED adding notification {notification.id} to batch"
)
batch.append(notification) batch.append(notification)
try: try:
current_app.logger.info("GOING TO DO BATCH INSERT")
dao_batch_insert_notifications(batch) dao_batch_insert_notifications(batch)
except Exception as e: except Exception as e:
current_app.logger.exception(f"Notification batch insert failed {e}") current_app.logger.exception(f"Notification batch insert failed {e}")
for n in batch:
for msg in batch: # Use 'created_at' as a TTL so we don't retry infinitely
redis_store.rpush("notification_queue", json.dumps(msg)) if n.created_at < utc_now() - timedelta(minutes=1):
current_app.logger.warning(
f"Abandoning stale data, could not write to db: {n.serialize_for_redis(n)}"
)
continue
else:
redis_store.rpush("message_queue", json.dumps(n.serialize_for_redis(n)))

View File

@@ -2,7 +2,6 @@ import json
from datetime import timedelta from datetime import timedelta
from time import time from time import time
import sqlalchemy
from flask import current_app from flask import current_app
from sqlalchemy import ( from sqlalchemy import (
TIMESTAMP, TIMESTAMP,
@@ -803,11 +802,8 @@ def dao_close_out_delivery_receipts():
def dao_batch_insert_notifications(batch): def dao_batch_insert_notifications(batch):
current_app.logger.info("DOING BATCH INSERT IN DAO")
try: db.session.bulk_save_objects(batch)
db.session.bulk_save_objects(batch) db.session.commit()
db.session.commit() current_app.logger.info(f"Batch inserted notifications: {len(batch)}")
current_app.logger.info(f"SUCCESSFULLY INSERTED: {len(batch)}") return len(batch)
return len(batch)
except sqlalchemy.exc.SQLAlchemyError as e:
current_app.logger.exception(f"Error during batch insert {e}")

View File

@@ -1,12 +1,14 @@
import json
from collections import namedtuple from collections import namedtuple
from datetime import timedelta from datetime import timedelta
from unittest import mock from unittest import mock
from unittest.mock import ANY, call from unittest.mock import ANY, MagicMock, call
import pytest import pytest
from app.celery import scheduled_tasks from app.celery import scheduled_tasks
from app.celery.scheduled_tasks import ( from app.celery.scheduled_tasks import (
batch_insert_notifications,
check_for_missing_rows_in_completed_jobs, check_for_missing_rows_in_completed_jobs,
check_for_services_with_high_failure_rates_or_sending_to_tv_numbers, check_for_services_with_high_failure_rates_or_sending_to_tv_numbers,
check_job_status, check_job_status,
@@ -523,3 +525,25 @@ def test_check_for_services_with_high_failure_rates_or_sending_to_tv_numbers(
technical_ticket=True, technical_ticket=True,
) )
mock_send_ticket_to_zendesk.assert_called_once() mock_send_ticket_to_zendesk.assert_called_once()
def test_batch_insert_with_valid_notifications(mocker):
mocker.patch("app.celery.scheduled_tasks.dao_batch_insert_notifications")
rs = MagicMock()
mocker.patch("app.celery.scheduled_tasks.redis_store", rs)
notifications = [
{"id": 1, "notification_status": "pending"},
{"id": 2, "notification_status": "pending"},
]
serialized_notifications = [json.dumps(n).encode("utf-8") for n in notifications]
pipeline_mock = MagicMock()
rs.pipeline.return_value.__enter__.return_value = pipeline_mock
rs.llen.return_value = len(notifications)
rs.lpop.side_effect = serialized_notifications
batch_insert_notifications()
rs.llen.assert_called_once_with("message_queue")
rs.lpop.assert_called_with("message_queue")