From 1213463b8e4b915f61992025d5da1362c071a899 Mon Sep 17 00:00:00 2001 From: Ben Thorner Date: Tue, 25 Jan 2022 11:29:57 +0000 Subject: [PATCH] Only aggregate status when necessary for a service This takes a similar approach to the nightly deletion task so that we only create sub-tasks when there are actually notifications to aggregate for a given type and day [1]. We're making this change to stop the duplication errors we're getting at the moment and ensure the task can scale to more messages and more services. There are two parts to this: - Each subtask should now run within the 5 minute visibility timeout. However, they may still be duplicated if the parent task overruns [2]. - The parent task creates a mininal number of subtasks, and the query to determine this is very fast for a normal process day (milliseconds). Since all tasks will run quickly, there should be no more duplication. In order to test this more nuanced task, I rewrote the tests: - One test checks the subtask is called correctly. - One test checks we create all the right subtasks. [1]: https://github.com/alphagov/notifications-api/pull/3381 [2]: https://docs.google.com/document/d/1MaP6Nyy3nJKkuh_4lP1wuDm19X8LZITOLRd9n3Ax-xg/edit#heading=h.q3intzwqhfzl --- app/celery/reporting_tasks.py | 10 ++- app/dao/notifications_dao.py | 14 +++++ tests/app/celery/test_reporting_tasks.py | 77 +++++++++++++++--------- 3 files changed, 69 insertions(+), 32 deletions(-) diff --git a/app/celery/reporting_tasks.py b/app/celery/reporting_tasks.py index 2f74bca10..118bf967d 100644 --- a/app/celery/reporting_tasks.py +++ b/app/celery/reporting_tasks.py @@ -3,7 +3,7 @@ from datetime import datetime, timedelta from flask import current_app from notifications_utils.timezones import convert_utc_to_bst -from app import db, notify_celery +from app import notify_celery from app.config import QueueNames from app.cronitor import cronitor from app.dao.fact_billing_dao import ( @@ -14,7 +14,8 @@ from app.dao.fact_notification_status_dao import ( fetch_status_data_for_service_and_day, update_fact_notification_status, ) -from app.models import EMAIL_TYPE, LETTER_TYPE, SMS_TYPE, Service +from app.dao.notifications_dao import get_service_ids_with_notifications_on_date +from app.models import EMAIL_TYPE, LETTER_TYPE, SMS_TYPE @notify_celery.task(name="create-nightly-billing") @@ -97,8 +98,11 @@ def create_nightly_notification_status(): for i in range(days): process_day = yesterday - timedelta(days=i) - for (service_id,) in db.session.query(Service.id): + relevant_service_ids = get_service_ids_with_notifications_on_date( + notification_type, process_day + ) + for service_id in relevant_service_ids: create_nightly_notification_status_for_service_and_day.apply_async( kwargs={ 'process_day': process_day.isoformat(), diff --git a/app/dao/notifications_dao.py b/app/dao/notifications_dao.py index 393bba051..4292f1c7a 100644 --- a/app/dao/notifications_dao.py +++ b/app/dao/notifications_dao.py @@ -794,3 +794,17 @@ def get_service_ids_that_have_notifications_from_before_timestamp(notification_t Notification.created_at < timestamp ).distinct() } + + +def get_service_ids_with_notifications_on_date(notification_type, date): + return { + row.service_id + for row in db.session.query( + Notification.service_id + ).filter( + Notification.notification_type == notification_type, + # using >= + < is much more efficient than date(created_at) + Notification.created_at >= date, + Notification.created_at < date + timedelta(days=1) + ).distinct() + } diff --git a/tests/app/celery/test_reporting_tasks.py b/tests/app/celery/test_reporting_tasks.py index 9e0345200..1e83161f7 100644 --- a/tests/app/celery/test_reporting_tasks.py +++ b/tests/app/celery/test_reporting_tasks.py @@ -1,4 +1,3 @@ -import itertools from datetime import date, datetime, time, timedelta from decimal import Decimal from uuid import UUID @@ -20,6 +19,7 @@ from app.models import ( KEY_TYPE_TEAM, KEY_TYPE_TEST, LETTER_TYPE, + NOTIFICATION_TYPES, SMS_TYPE, FactBilling, FactNotificationStatus, @@ -61,39 +61,58 @@ def test_create_nightly_billing_triggers_tasks_for_days(notify_api, mocker, day_ assert mock_celery.apply_async.call_args_list[i][1]['kwargs'] == {'process_day': expected_kwargs[i]} -@freeze_time('2019-08-01') -def test_create_nightly_notification_status_triggers_tasks(notify_api, sample_service, mocker): - mock_celery = mocker.patch('app.celery.reporting_tasks.create_nightly_notification_status_for_service_and_day') +@freeze_time('2019-08-01T00:30') +def test_create_nightly_notification_status_triggers_tasks( + notify_api, + sample_service, + sample_template, + mocker, +): + mock_celery = mocker.patch( + 'app.celery.reporting_tasks.create_nightly_notification_status_for_service_and_day' + ).apply_async + + create_notification(template=sample_template, created_at='2019-07-31') create_nightly_notification_status() - assert mock_celery.apply_async.call_count == ( - (4 * 3) # four days, three notification types - + - 6 # six more days of just letters + mock_celery.assert_called_with( + kwargs={ + 'service_id': sample_service.id, + 'process_day': '2019-07-31', + 'notification_type': SMS_TYPE + }, + queue=QueueNames.REPORTING ) - for process_date, notification_type in itertools.product( - ['2019-07-31', '2019-07-30', '2019-07-29', '2019-07-28'], - [SMS_TYPE, EMAIL_TYPE, LETTER_TYPE] - ): - mock_celery.apply_async.assert_any_call( - kwargs={ - 'process_day': process_date, - 'notification_type': notification_type, - 'service_id': sample_service.id, - }, - queue=QueueNames.REPORTING - ) - for process_date in ['2019-07-27', '2019-07-26', '2019-07-25', '2019-07-24', '2019-07-23', '2019-07-22']: - mock_celery.apply_async.assert_any_call( - kwargs={ - 'process_day': process_date, - 'notification_type': LETTER_TYPE, - 'service_id': sample_service.id, - }, - queue=QueueNames.REPORTING - ) +@freeze_time('2019-08-01T00:30') +@pytest.mark.parametrize('notification_date, expected_types_aggregated', [ + ('2019-08-01', set()), + ('2019-07-31', {EMAIL_TYPE, SMS_TYPE, LETTER_TYPE}), + ('2019-07-28', {EMAIL_TYPE, SMS_TYPE, LETTER_TYPE}), + ('2019-07-27', {LETTER_TYPE}), + ('2019-07-22', {LETTER_TYPE}), + ('2019-07-21', set()), +]) +def test_create_nightly_notification_status_triggers_relevant_tasks( + notify_api, + sample_service, + mocker, + notification_date, + expected_types_aggregated, +): + mock_celery = mocker.patch( + 'app.celery.reporting_tasks.create_nightly_notification_status_for_service_and_day' + ).apply_async + + for notification_type in NOTIFICATION_TYPES: + template = create_template(sample_service, template_type=notification_type) + create_notification(template=template, created_at=notification_date) + + create_nightly_notification_status() + + types = {call.kwargs['kwargs']['notification_type'] for call in mock_celery.mock_calls} + assert types == expected_types_aggregated @pytest.mark.parametrize('second_rate, records_num, billable_units, multiplier',