mirror of
https://github.com/GSA/notifications-api.git
synced 2026-02-02 17:31:14 -05:00
more tests
This commit is contained in:
@@ -320,8 +320,8 @@ def batch_insert_notifications(self):
|
|||||||
batch.append(notification)
|
batch.append(notification)
|
||||||
try:
|
try:
|
||||||
dao_batch_insert_notifications(batch)
|
dao_batch_insert_notifications(batch)
|
||||||
except Exception as e:
|
except Exception:
|
||||||
current_app.logger.exception(f"Notification batch insert failed {e}")
|
current_app.logger.exception("Notification batch insert failed")
|
||||||
for n in batch:
|
for n in batch:
|
||||||
# Use 'created_at' as a TTL so we don't retry infinitely
|
# Use 'created_at' as a TTL so we don't retry infinitely
|
||||||
if n.created_at < utc_now() - timedelta(minutes=1):
|
if n.created_at < utc_now() - timedelta(minutes=1):
|
||||||
|
|||||||
@@ -547,3 +547,56 @@ def test_batch_insert_with_valid_notifications(mocker):
|
|||||||
|
|
||||||
rs.llen.assert_called_once_with("message_queue")
|
rs.llen.assert_called_once_with("message_queue")
|
||||||
rs.lpop.assert_called_with("message_queue")
|
rs.lpop.assert_called_with("message_queue")
|
||||||
|
|
||||||
|
|
||||||
|
def test_batch_insert_with_expired_notifications(mocker):
|
||||||
|
expired_time = utc_now() - timedelta(minutes=2)
|
||||||
|
mocker.patch(
|
||||||
|
"app.celery.scheduled_tasks.dao_batch_insert_notifications",
|
||||||
|
side_effect=Exception("DB Error"),
|
||||||
|
)
|
||||||
|
rs = MagicMock()
|
||||||
|
mocker.patch("app.celery.scheduled_tasks.redis_store", rs)
|
||||||
|
notifications = [
|
||||||
|
{
|
||||||
|
"id": 1,
|
||||||
|
"notification_status": "pending",
|
||||||
|
"created_at": utc_now().isoformat(),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"id": 2,
|
||||||
|
"notification_status": "pending",
|
||||||
|
"created_at": expired_time.isoformat(),
|
||||||
|
},
|
||||||
|
]
|
||||||
|
serialized_notifications = [json.dumps(n).encode("utf-8") for n in notifications]
|
||||||
|
|
||||||
|
pipeline_mock = MagicMock()
|
||||||
|
|
||||||
|
rs.pipeline.return_value.__enter__.return_value = pipeline_mock
|
||||||
|
rs.llen.return_value = len(notifications)
|
||||||
|
rs.lpop.side_effect = serialized_notifications
|
||||||
|
|
||||||
|
batch_insert_notifications()
|
||||||
|
|
||||||
|
rs.llen.assert_called_once_with("message_queue")
|
||||||
|
rs.rpush.assert_called_once()
|
||||||
|
requeued_notification = json.loads(rs.rpush.call_args[0][1])
|
||||||
|
assert requeued_notification["id"] == 1
|
||||||
|
|
||||||
|
|
||||||
|
def test_batch_insert_with_malformed_notifications(mocker):
|
||||||
|
rs = MagicMock()
|
||||||
|
mocker.patch("app.celery.scheduled_tasks.redis_store", rs)
|
||||||
|
malformed_data = b"not_a_valid_json"
|
||||||
|
pipeline_mock = MagicMock()
|
||||||
|
|
||||||
|
rs.pipeline.return_value.__enter__.return_value = pipeline_mock
|
||||||
|
rs.llen.return_value = 1
|
||||||
|
rs.lpop.side_effect = [malformed_data]
|
||||||
|
|
||||||
|
with pytest.raises(json.JSONDecodeError):
|
||||||
|
batch_insert_notifications()
|
||||||
|
|
||||||
|
rs.llen.assert_called_once_with("message_queue")
|
||||||
|
rs.rpush.assert_not_called()
|
||||||
|
|||||||
Reference in New Issue
Block a user