diff --git a/app/celery/statistics_tasks.py b/app/celery/statistics_tasks.py index f8b893a6e..3efcd389d 100644 --- a/app/celery/statistics_tasks.py +++ b/app/celery/statistics_tasks.py @@ -3,14 +3,32 @@ from sqlalchemy.exc import SQLAlchemyError from app import notify_celery from flask import current_app from app.statsd_decorators import statsd -from app.dao.statistics_dao import create_or_update_job_sending_statistics, update_job_stats_outcome_count +from app.dao.statistics_dao import ( + create_or_update_job_sending_statistics, + update_job_stats_outcome_count +) +from app.dao.notifications_dao import get_notification_by_id -@notify_celery.task(bind=True, name='record_initial_job_statistics', max_retries=20, default_retry_delay=300) +def create_initial_notification_statistic_tasks(notification): + if notification.job_id: + record_initial_job_statistics.apply_async((str(notification.id),), queue="notify") + + +def create_outcome_notification_statistic_tasks(notification): + if notification.job_id: + record_outcome_job_statistics.apply_async((str(notification.id),), queue="notify") + + +@notify_celery.task(bind=True, name='record_initial_job_statistics', max_retries=20, default_retry_delay=10) @statsd(namespace="tasks") -def record_initial_job_statistics(self, notification): +def record_initial_job_statistics(self, notification_id): try: - create_or_update_job_sending_statistics(notification) + notification = get_notification_by_id(notification_id) + if notification: + create_or_update_job_sending_statistics(notification) + else: + raise SQLAlchemyError("Failed to find notification with id {}".format(notification_id)) except SQLAlchemyError as e: current_app.logger.exception(e) self.retry(queue="retry") @@ -20,13 +38,17 @@ def record_initial_job_statistics(self, notification): ) -@notify_celery.task(bind=True, name='record_outcome_job_statistics', max_retries=20, default_retry_delay=300) +@notify_celery.task(bind=True, name='record_outcome_job_statistics', max_retries=20, default_retry_delay=10) @statsd(namespace="tasks") -def record_outcome_job_statistics(self, notification): +def record_outcome_job_statistics(self, notification_id): try: - updated_count = update_job_stats_outcome_count(notification) - if updated_count == 0: - self.retry(queue="retry") + notification = get_notification_by_id(notification_id) + if notification: + updated_count = update_job_stats_outcome_count(notification) + if updated_count == 0: + self.retry(queue="retry") + else: + raise SQLAlchemyError("Failed to find notification with id {}".format(notification_id)) except SQLAlchemyError as e: current_app.logger.exception(e) self.retry(queue="retry") diff --git a/tests/app/celery/test_statistics_tasks.py b/tests/app/celery/test_statistics_tasks.py index edc451c17..2dbc4451a 100644 --- a/tests/app/celery/test_statistics_tasks.py +++ b/tests/app/celery/test_statistics_tasks.py @@ -1,12 +1,51 @@ -from app.celery.statistics_tasks import record_initial_job_statistics, record_outcome_job_statistics +from app.celery.statistics_tasks import ( + record_initial_job_statistics, + record_outcome_job_statistics, + create_initial_notification_statistic_tasks, + create_outcome_notification_statistic_tasks) from sqlalchemy.exc import SQLAlchemyError +from app import create_uuid +from tests.app.conftest import sample_notification + + +def test_should_create_initial_job_task_if_notification_is_related_to_a_job( + notify_db, notify_db_session, sample_job, mocker +): + mock = mocker.patch("app.celery.statistics_tasks.record_initial_job_statistics.apply_async") + notification = sample_notification(notify_db, notify_db_session, job=sample_job) + create_initial_notification_statistic_tasks(notification) + mock.assert_called_once_with((str(notification.id), ), queue="notify") + + +def test_should_not_create_initial_job_task_if_notification_is_related_to_a_job( + notify_db, notify_db_session, sample_notification, mocker +): + mock = mocker.patch("app.celery.statistics_tasks.record_initial_job_statistics.apply_async") + create_initial_notification_statistic_tasks(sample_notification) + mock.assert_not_called() + + +def test_should_create_outcome_job_task_if_notification_is_related_to_a_job( + notify_db, notify_db_session, sample_job, mocker +): + mock = mocker.patch("app.celery.statistics_tasks.record_outcome_job_statistics.apply_async") + notification = sample_notification(notify_db, notify_db_session, job=sample_job) + create_outcome_notification_statistic_tasks(notification) + mock.assert_called_once_with((str(notification.id), ), queue="notify") + + +def test_should_not_create_outcome_job_task_if_notification_is_related_to_a_job( + notify_db, notify_db_session, sample_notification, mocker +): + mock = mocker.patch("app.celery.statistics_tasks.record_outcome_job_statistics.apply_async") + create_outcome_notification_statistic_tasks(sample_notification) + mock.assert_not_called() -import app def test_should_call_create_job_stats_dao_methods(notify_db, notify_db_session, sample_notification, mocker): dao_mock = mocker.patch("app.celery.statistics_tasks.create_or_update_job_sending_statistics") - record_initial_job_statistics(sample_notification) + record_initial_job_statistics(str(sample_notification.id)) dao_mock.assert_called_once_with(sample_notification) @@ -22,14 +61,14 @@ def test_should_retry_if_persisting_the_job_stats_has_a_sql_alchemy_exception( ) retry_mock = mocker.patch('app.celery.statistics_tasks.record_initial_job_statistics.retry') - record_initial_job_statistics(sample_notification) + record_initial_job_statistics(str(sample_notification.id)) dao_mock.assert_called_once_with(sample_notification) retry_mock.assert_called_with(queue="retry") def test_should_call_update_job_stats_dao_outcome_methods(notify_db, notify_db_session, sample_notification, mocker): dao_mock = mocker.patch("app.celery.statistics_tasks.update_job_stats_outcome_count") - record_outcome_job_statistics(sample_notification) + record_outcome_job_statistics(str(sample_notification.id)) dao_mock.assert_called_once_with(sample_notification) @@ -45,7 +84,7 @@ def test_should_retry_if_persisting_the_job_outcome_stats_has_a_sql_alchemy_exce ) retry_mock = mocker.patch('app.celery.statistics_tasks.record_outcome_job_statistics.retry') - record_outcome_job_statistics(sample_notification) + record_outcome_job_statistics(str(sample_notification.id)) dao_mock.assert_called_once_with(sample_notification) retry_mock.assert_called_with(queue="retry") @@ -58,6 +97,31 @@ def test_should_retry_if_persisting_the_job_outcome_stats_updates_zero_rows( dao_mock = mocker.patch("app.celery.statistics_tasks.update_job_stats_outcome_count", return_value=0) retry_mock = mocker.patch('app.celery.statistics_tasks.record_outcome_job_statistics.retry') - record_outcome_job_statistics(sample_notification) + record_outcome_job_statistics(str(sample_notification.id)) dao_mock.assert_called_once_with(sample_notification) retry_mock.assert_called_with(queue="retry") + + +def test_should_retry_if_persisting_the_job_stats_creation_cant_find_notification_by_id( + notify_db, + notify_db_session, + mocker): + dao_mock = mocker.patch("app.celery.statistics_tasks.create_or_update_job_sending_statistics") + retry_mock = mocker.patch('app.celery.statistics_tasks.record_initial_job_statistics.retry') + + record_initial_job_statistics(str(create_uuid())) + dao_mock.assert_not_called() + retry_mock.assert_called_with(queue="retry") + + +def test_should_retry_if_persisting_the_job_stats_outcome_cant_find_notification_by_id( + notify_db, + notify_db_session, + mocker): + + dao_mock = mocker.patch("app.celery.statistics_tasks.update_job_stats_outcome_count") + retry_mock = mocker.patch('app.celery.statistics_tasks.record_outcome_job_statistics.retry') + + record_outcome_job_statistics(str(create_uuid())) + dao_mock.assert_not_called() + retry_mock.assert_called_with(queue="retry")