diff --git a/app/celery/scheduled_tasks.py b/app/celery/scheduled_tasks.py index 12c721114..a60551b75 100644 --- a/app/celery/scheduled_tasks.py +++ b/app/celery/scheduled_tasks.py @@ -320,8 +320,8 @@ def batch_insert_notifications(self): batch.append(notification) try: dao_batch_insert_notifications(batch) - except Exception as e: - current_app.logger.exception(f"Notification batch insert failed {e}") + except Exception: + current_app.logger.exception("Notification batch insert failed") for n in batch: # Use 'created_at' as a TTL so we don't retry infinitely if n.created_at < utc_now() - timedelta(minutes=1): diff --git a/tests/app/celery/test_scheduled_tasks.py b/tests/app/celery/test_scheduled_tasks.py index 8b5fc6be9..fec64480a 100644 --- a/tests/app/celery/test_scheduled_tasks.py +++ b/tests/app/celery/test_scheduled_tasks.py @@ -547,3 +547,56 @@ def test_batch_insert_with_valid_notifications(mocker): rs.llen.assert_called_once_with("message_queue") rs.lpop.assert_called_with("message_queue") + + +def test_batch_insert_with_expired_notifications(mocker): + expired_time = utc_now() - timedelta(minutes=2) + mocker.patch( + "app.celery.scheduled_tasks.dao_batch_insert_notifications", + side_effect=Exception("DB Error"), + ) + rs = MagicMock() + mocker.patch("app.celery.scheduled_tasks.redis_store", rs) + notifications = [ + { + "id": 1, + "notification_status": "pending", + "created_at": utc_now().isoformat(), + }, + { + "id": 2, + "notification_status": "pending", + "created_at": expired_time.isoformat(), + }, + ] + serialized_notifications = [json.dumps(n).encode("utf-8") for n in notifications] + + pipeline_mock = MagicMock() + + rs.pipeline.return_value.__enter__.return_value = pipeline_mock + rs.llen.return_value = len(notifications) + rs.lpop.side_effect = serialized_notifications + + batch_insert_notifications() + + rs.llen.assert_called_once_with("message_queue") + rs.rpush.assert_called_once() + requeued_notification = json.loads(rs.rpush.call_args[0][1]) + assert requeued_notification["id"] == 1 + + +def test_batch_insert_with_malformed_notifications(mocker): + rs = MagicMock() + mocker.patch("app.celery.scheduled_tasks.redis_store", rs) + malformed_data = b"not_a_valid_json" + pipeline_mock = MagicMock() + + rs.pipeline.return_value.__enter__.return_value = pipeline_mock + rs.llen.return_value = 1 + rs.lpop.side_effect = [malformed_data] + + with pytest.raises(json.JSONDecodeError): + batch_insert_notifications() + + rs.llen.assert_called_once_with("message_queue") + rs.rpush.assert_not_called()