Add a wrapper method around calling the tasks to manage when to call stats tasks

This commit is contained in:
Martyn Inglis
2017-05-09 18:17:26 +01:00
parent 70bc468da0
commit f2a47044a4
2 changed files with 102 additions and 16 deletions

View File

@@ -3,14 +3,32 @@ from sqlalchemy.exc import SQLAlchemyError
from app import notify_celery
from flask import current_app
from app.statsd_decorators import statsd
from app.dao.statistics_dao import create_or_update_job_sending_statistics, update_job_stats_outcome_count
from app.dao.statistics_dao import (
create_or_update_job_sending_statistics,
update_job_stats_outcome_count
)
from app.dao.notifications_dao import get_notification_by_id
@notify_celery.task(bind=True, name='record_initial_job_statistics', max_retries=20, default_retry_delay=300)
def create_initial_notification_statistic_tasks(notification):
if notification.job_id:
record_initial_job_statistics.apply_async((str(notification.id),), queue="notify")
def create_outcome_notification_statistic_tasks(notification):
if notification.job_id:
record_outcome_job_statistics.apply_async((str(notification.id),), queue="notify")
@notify_celery.task(bind=True, name='record_initial_job_statistics', max_retries=20, default_retry_delay=10)
@statsd(namespace="tasks")
def record_initial_job_statistics(self, notification):
def record_initial_job_statistics(self, notification_id):
try:
create_or_update_job_sending_statistics(notification)
notification = get_notification_by_id(notification_id)
if notification:
create_or_update_job_sending_statistics(notification)
else:
raise SQLAlchemyError("Failed to find notification with id {}".format(notification_id))
except SQLAlchemyError as e:
current_app.logger.exception(e)
self.retry(queue="retry")
@@ -20,13 +38,17 @@ def record_initial_job_statistics(self, notification):
)
@notify_celery.task(bind=True, name='record_outcome_job_statistics', max_retries=20, default_retry_delay=300)
@notify_celery.task(bind=True, name='record_outcome_job_statistics', max_retries=20, default_retry_delay=10)
@statsd(namespace="tasks")
def record_outcome_job_statistics(self, notification):
def record_outcome_job_statistics(self, notification_id):
try:
updated_count = update_job_stats_outcome_count(notification)
if updated_count == 0:
self.retry(queue="retry")
notification = get_notification_by_id(notification_id)
if notification:
updated_count = update_job_stats_outcome_count(notification)
if updated_count == 0:
self.retry(queue="retry")
else:
raise SQLAlchemyError("Failed to find notification with id {}".format(notification_id))
except SQLAlchemyError as e:
current_app.logger.exception(e)
self.retry(queue="retry")

View File

@@ -1,12 +1,51 @@
from app.celery.statistics_tasks import record_initial_job_statistics, record_outcome_job_statistics
from app.celery.statistics_tasks import (
record_initial_job_statistics,
record_outcome_job_statistics,
create_initial_notification_statistic_tasks,
create_outcome_notification_statistic_tasks)
from sqlalchemy.exc import SQLAlchemyError
from app import create_uuid
from tests.app.conftest import sample_notification
def test_should_create_initial_job_task_if_notification_is_related_to_a_job(
notify_db, notify_db_session, sample_job, mocker
):
mock = mocker.patch("app.celery.statistics_tasks.record_initial_job_statistics.apply_async")
notification = sample_notification(notify_db, notify_db_session, job=sample_job)
create_initial_notification_statistic_tasks(notification)
mock.assert_called_once_with((str(notification.id), ), queue="notify")
def test_should_not_create_initial_job_task_if_notification_is_related_to_a_job(
notify_db, notify_db_session, sample_notification, mocker
):
mock = mocker.patch("app.celery.statistics_tasks.record_initial_job_statistics.apply_async")
create_initial_notification_statistic_tasks(sample_notification)
mock.assert_not_called()
def test_should_create_outcome_job_task_if_notification_is_related_to_a_job(
notify_db, notify_db_session, sample_job, mocker
):
mock = mocker.patch("app.celery.statistics_tasks.record_outcome_job_statistics.apply_async")
notification = sample_notification(notify_db, notify_db_session, job=sample_job)
create_outcome_notification_statistic_tasks(notification)
mock.assert_called_once_with((str(notification.id), ), queue="notify")
def test_should_not_create_outcome_job_task_if_notification_is_related_to_a_job(
notify_db, notify_db_session, sample_notification, mocker
):
mock = mocker.patch("app.celery.statistics_tasks.record_outcome_job_statistics.apply_async")
create_outcome_notification_statistic_tasks(sample_notification)
mock.assert_not_called()
import app
def test_should_call_create_job_stats_dao_methods(notify_db, notify_db_session, sample_notification, mocker):
dao_mock = mocker.patch("app.celery.statistics_tasks.create_or_update_job_sending_statistics")
record_initial_job_statistics(sample_notification)
record_initial_job_statistics(str(sample_notification.id))
dao_mock.assert_called_once_with(sample_notification)
@@ -22,14 +61,14 @@ def test_should_retry_if_persisting_the_job_stats_has_a_sql_alchemy_exception(
)
retry_mock = mocker.patch('app.celery.statistics_tasks.record_initial_job_statistics.retry')
record_initial_job_statistics(sample_notification)
record_initial_job_statistics(str(sample_notification.id))
dao_mock.assert_called_once_with(sample_notification)
retry_mock.assert_called_with(queue="retry")
def test_should_call_update_job_stats_dao_outcome_methods(notify_db, notify_db_session, sample_notification, mocker):
dao_mock = mocker.patch("app.celery.statistics_tasks.update_job_stats_outcome_count")
record_outcome_job_statistics(sample_notification)
record_outcome_job_statistics(str(sample_notification.id))
dao_mock.assert_called_once_with(sample_notification)
@@ -45,7 +84,7 @@ def test_should_retry_if_persisting_the_job_outcome_stats_has_a_sql_alchemy_exce
)
retry_mock = mocker.patch('app.celery.statistics_tasks.record_outcome_job_statistics.retry')
record_outcome_job_statistics(sample_notification)
record_outcome_job_statistics(str(sample_notification.id))
dao_mock.assert_called_once_with(sample_notification)
retry_mock.assert_called_with(queue="retry")
@@ -58,6 +97,31 @@ def test_should_retry_if_persisting_the_job_outcome_stats_updates_zero_rows(
dao_mock = mocker.patch("app.celery.statistics_tasks.update_job_stats_outcome_count", return_value=0)
retry_mock = mocker.patch('app.celery.statistics_tasks.record_outcome_job_statistics.retry')
record_outcome_job_statistics(sample_notification)
record_outcome_job_statistics(str(sample_notification.id))
dao_mock.assert_called_once_with(sample_notification)
retry_mock.assert_called_with(queue="retry")
def test_should_retry_if_persisting_the_job_stats_creation_cant_find_notification_by_id(
notify_db,
notify_db_session,
mocker):
dao_mock = mocker.patch("app.celery.statistics_tasks.create_or_update_job_sending_statistics")
retry_mock = mocker.patch('app.celery.statistics_tasks.record_initial_job_statistics.retry')
record_initial_job_statistics(str(create_uuid()))
dao_mock.assert_not_called()
retry_mock.assert_called_with(queue="retry")
def test_should_retry_if_persisting_the_job_stats_outcome_cant_find_notification_by_id(
notify_db,
notify_db_session,
mocker):
dao_mock = mocker.patch("app.celery.statistics_tasks.update_job_stats_outcome_count")
retry_mock = mocker.patch('app.celery.statistics_tasks.record_outcome_job_statistics.retry')
record_outcome_job_statistics(str(create_uuid()))
dao_mock.assert_not_called()
retry_mock.assert_called_with(queue="retry")