From b4bf332ea055256273803fc68d678ed1e0494c8b Mon Sep 17 00:00:00 2001 From: Martyn Inglis Date: Tue, 9 May 2017 15:24:05 +0100 Subject: [PATCH] Added in tasks for created initial job stats and for updating following an outcome. --- app/celery/statistics_tasks.py | 34 ++++++++++-- tests/app/celery/test_statistics_tasks.py | 63 +++++++++++++++++++++++ 2 files changed, 94 insertions(+), 3 deletions(-) create mode 100644 tests/app/celery/test_statistics_tasks.py diff --git a/app/celery/statistics_tasks.py b/app/celery/statistics_tasks.py index bd92e6e5d..f8b893a6e 100644 --- a/app/celery/statistics_tasks.py +++ b/app/celery/statistics_tasks.py @@ -1,8 +1,36 @@ +from sqlalchemy.exc import SQLAlchemyError + from app import notify_celery +from flask import current_app from app.statsd_decorators import statsd +from app.dao.statistics_dao import create_or_update_job_sending_statistics, update_job_stats_outcome_count -@notify_celery.task(bind=True, name='record_initial_job_statistics') +@notify_celery.task(bind=True, name='record_initial_job_statistics', max_retries=20, default_retry_delay=300) @statsd(namespace="tasks") -def record_initial_job_statistics(notification): - pass +def record_initial_job_statistics(self, notification): + try: + create_or_update_job_sending_statistics(notification) + except SQLAlchemyError as e: + current_app.logger.exception(e) + self.retry(queue="retry") + except self.MaxRetriesExceededError: + current_app.logger.error( + "RETRY FAILED: task record_initial_job_statistics failed for notification {}".format(notification.id) + ) + + +@notify_celery.task(bind=True, name='record_outcome_job_statistics', max_retries=20, default_retry_delay=300) +@statsd(namespace="tasks") +def record_outcome_job_statistics(self, notification): + try: + updated_count = update_job_stats_outcome_count(notification) + if updated_count == 0: + self.retry(queue="retry") + except SQLAlchemyError as e: + current_app.logger.exception(e) + self.retry(queue="retry") + except self.MaxRetriesExceededError: + current_app.logger.error( + "RETRY FAILED: task update_job_stats_outcome_count failed for notification {}".format(notification.id) + ) diff --git a/tests/app/celery/test_statistics_tasks.py b/tests/app/celery/test_statistics_tasks.py new file mode 100644 index 000000000..edc451c17 --- /dev/null +++ b/tests/app/celery/test_statistics_tasks.py @@ -0,0 +1,63 @@ +from app.celery.statistics_tasks import record_initial_job_statistics, record_outcome_job_statistics +from sqlalchemy.exc import SQLAlchemyError + +import app + + +def test_should_call_create_job_stats_dao_methods(notify_db, notify_db_session, sample_notification, mocker): + dao_mock = mocker.patch("app.celery.statistics_tasks.create_or_update_job_sending_statistics") + record_initial_job_statistics(sample_notification) + + dao_mock.assert_called_once_with(sample_notification) + + +def test_should_retry_if_persisting_the_job_stats_has_a_sql_alchemy_exception( + notify_db, + notify_db_session, + sample_notification, + mocker): + dao_mock = mocker.patch( + "app.celery.statistics_tasks.create_or_update_job_sending_statistics", + side_effect=SQLAlchemyError() + ) + retry_mock = mocker.patch('app.celery.statistics_tasks.record_initial_job_statistics.retry') + + record_initial_job_statistics(sample_notification) + dao_mock.assert_called_once_with(sample_notification) + retry_mock.assert_called_with(queue="retry") + + +def test_should_call_update_job_stats_dao_outcome_methods(notify_db, notify_db_session, sample_notification, mocker): + dao_mock = mocker.patch("app.celery.statistics_tasks.update_job_stats_outcome_count") + record_outcome_job_statistics(sample_notification) + + dao_mock.assert_called_once_with(sample_notification) + + +def test_should_retry_if_persisting_the_job_outcome_stats_has_a_sql_alchemy_exception( + notify_db, + notify_db_session, + sample_notification, + mocker): + dao_mock = mocker.patch( + "app.celery.statistics_tasks.update_job_stats_outcome_count", + side_effect=SQLAlchemyError() + ) + retry_mock = mocker.patch('app.celery.statistics_tasks.record_outcome_job_statistics.retry') + + record_outcome_job_statistics(sample_notification) + dao_mock.assert_called_once_with(sample_notification) + retry_mock.assert_called_with(queue="retry") + + +def test_should_retry_if_persisting_the_job_outcome_stats_updates_zero_rows( + notify_db, + notify_db_session, + sample_notification, + mocker): + dao_mock = mocker.patch("app.celery.statistics_tasks.update_job_stats_outcome_count", return_value=0) + retry_mock = mocker.patch('app.celery.statistics_tasks.record_outcome_job_statistics.retry') + + record_outcome_job_statistics(sample_notification) + dao_mock.assert_called_once_with(sample_notification) + retry_mock.assert_called_with(queue="retry")