From 9fc8b904c6a681a4a1bd67658a581fe0440b98ca Mon Sep 17 00:00:00 2001 From: Ben Thorner Date: Tue, 11 Jan 2022 15:55:24 +0000 Subject: [PATCH 1/8] DRY up status aggregation tests (move DAO tests up) The previous DAO tests were also confusing because they were testing two functions at the same time, so moving the tests up to the task level seems very reasonable, and will make it easier to change how this code works in the next commits. --- tests/app/celery/test_reporting_tasks.py | 106 ++++++++++++++---- .../dao/test_fact_notification_status_dao.py | 101 +---------------- 2 files changed, 85 insertions(+), 122 deletions(-) diff --git a/tests/app/celery/test_reporting_tasks.py b/tests/app/celery/test_reporting_tasks.py index f8f873955..dacab4db0 100644 --- a/tests/app/celery/test_reporting_tasks.py +++ b/tests/app/celery/test_reporting_tasks.py @@ -1,6 +1,7 @@ import itertools -from datetime import date, datetime, timedelta +from datetime import date, datetime, time, timedelta from decimal import Decimal +from uuid import UUID import pytest from freezegun import freeze_time @@ -15,6 +16,9 @@ from app.config import QueueNames from app.dao.fact_billing_dao import get_rate from app.models import ( EMAIL_TYPE, + KEY_TYPE_NORMAL, + KEY_TYPE_TEAM, + KEY_TYPE_TEST, LETTER_TYPE, SMS_TYPE, FactBilling, @@ -24,8 +28,10 @@ from app.models import ( from tests.app.db import ( create_letter_rate, create_notification, + create_notification_history, create_rate, create_service, + create_service_data_retention, create_template, ) @@ -495,7 +501,6 @@ def test_create_nightly_billing_for_day_update_when_record_exists( assert records[0].updated_at -@freeze_time('2019-01-05') def test_create_nightly_notification_status_for_day(notify_db_session): first_service = create_service(service_name='First Service') first_template = create_template(service=first_service) @@ -504,35 +509,92 @@ def test_create_nightly_notification_status_for_day(notify_db_session): third_service = create_service(service_name='third Service') third_template = create_template(service=third_service, template_type='letter') - create_notification(template=first_template, status='delivered') - create_notification(template=first_template, status='delivered', created_at=datetime(2019, 1, 1, 12, 0)) + create_service_data_retention(second_service, 'email', days_of_retention=3) - create_notification(template=second_template, status='temporary-failure') - create_notification(template=second_template, status='temporary-failure', created_at=datetime(2019, 1, 1, 12, 0)) + process_day = date.today() - timedelta(days=5) + with freeze_time(datetime.combine(process_day, time.min)): + create_notification(template=first_template, status='delivered') - create_notification(template=third_template, status='created') - create_notification(template=third_template, status='created', created_at=datetime(2019, 1, 1, 12, 0)) + # 2nd service email has 3 day data retention - data has been moved to history and doesn't exist in notifications + create_notification_history(template=second_template, status='temporary-failure') + + # team API key notifications are included + create_notification(template=third_template, status='sending', key_type=KEY_TYPE_TEAM) + + # test notifications are ignored + create_notification(template=third_template, status='sending', key_type=KEY_TYPE_TEST) + + # these created notifications from a different day get ignored + with freeze_time(datetime.combine(date.today() - timedelta(days=4), time.min)): + create_notification(template=first_template) + create_notification_history(template=second_template) + create_notification(template=third_template) assert len(FactNotificationStatus.query.all()) == 0 - create_nightly_notification_status_for_day('2019-01-01', 'sms') - create_nightly_notification_status_for_day('2019-01-01', 'email') - create_nightly_notification_status_for_day('2019-01-01', 'letter') + create_nightly_notification_status_for_day(str(process_day), 'sms') + create_nightly_notification_status_for_day(str(process_day), 'email') + create_nightly_notification_status_for_day(str(process_day), 'letter') - new_data = FactNotificationStatus.query.order_by(FactNotificationStatus.created_at).all() + new_fact_data = FactNotificationStatus.query.order_by( + FactNotificationStatus.notification_type + ).all() - assert len(new_data) == 3 - assert new_data[0].bst_date == date(2019, 1, 1) - assert new_data[1].bst_date == date(2019, 1, 1) - assert new_data[2].bst_date == date(2019, 1, 1) + assert len(new_fact_data) == 3 + assert new_fact_data[0].bst_date == process_day + assert new_fact_data[0].template_id == second_template.id + assert new_fact_data[0].service_id == second_service.id + assert new_fact_data[0].job_id == UUID('00000000-0000-0000-0000-000000000000') + assert new_fact_data[0].notification_type == 'email' + assert new_fact_data[0].notification_status == 'temporary-failure' + assert new_fact_data[0].notification_count == 1 + assert new_fact_data[0].key_type == KEY_TYPE_NORMAL - assert new_data[0].notification_type == 'sms' - assert new_data[1].notification_type == 'email' - assert new_data[2].notification_type == 'letter' + assert new_fact_data[1].bst_date == process_day + assert new_fact_data[1].template_id == third_template.id + assert new_fact_data[1].service_id == third_service.id + assert new_fact_data[1].job_id == UUID('00000000-0000-0000-0000-000000000000') + assert new_fact_data[1].notification_type == 'letter' + assert new_fact_data[1].notification_status == 'sending' + assert new_fact_data[1].notification_count == 1 + assert new_fact_data[1].key_type == KEY_TYPE_TEAM - assert new_data[0].notification_status == 'delivered' - assert new_data[1].notification_status == 'temporary-failure' - assert new_data[2].notification_status == 'created' + assert new_fact_data[2].bst_date == process_day + assert new_fact_data[2].template_id == first_template.id + assert new_fact_data[2].service_id == first_service.id + assert new_fact_data[2].job_id == UUID('00000000-0000-0000-0000-000000000000') + assert new_fact_data[2].notification_type == 'sms' + assert new_fact_data[2].notification_status == 'delivered' + assert new_fact_data[2].notification_count == 1 + assert new_fact_data[2].key_type == KEY_TYPE_NORMAL + + +def test_create_nightly_notification_status_for_day_overwrites_old_data(notify_db_session): + first_service = create_service(service_name='First Service') + first_template = create_template(service=first_service) + create_notification(template=first_template, status='delivered') + + process_day = date.today() + create_nightly_notification_status_for_day(str(process_day), 'sms') + + new_fact_data = FactNotificationStatus.query.order_by( + FactNotificationStatus.bst_date, + FactNotificationStatus.notification_type + ).all() + + assert len(new_fact_data) == 1 + assert new_fact_data[0].notification_count == 1 + + create_notification(template=first_template, status='delivered') + create_nightly_notification_status_for_day(str(process_day), 'sms') + + updated_fact_data = FactNotificationStatus.query.order_by( + FactNotificationStatus.bst_date, + FactNotificationStatus.notification_type + ).all() + + assert len(updated_fact_data) == 1 + assert updated_fact_data[0].notification_count == 2 # the job runs at 12:30am London time. 04/01 is in BST. diff --git a/tests/app/dao/test_fact_notification_status_dao.py b/tests/app/dao/test_fact_notification_status_dao.py index 53571ff0b..3007f1398 100644 --- a/tests/app/dao/test_fact_notification_status_dao.py +++ b/tests/app/dao/test_fact_notification_status_dao.py @@ -1,4 +1,4 @@ -from datetime import date, datetime, time, timedelta +from datetime import date, datetime, timedelta from unittest import mock from uuid import UUID @@ -8,7 +8,6 @@ from freezegun import freeze_time from app.dao.fact_notification_status_dao import ( fetch_monthly_notification_statuses_per_service, fetch_monthly_template_usage_for_service, - fetch_notification_status_for_day, fetch_notification_status_for_service_by_month, fetch_notification_status_for_service_for_day, fetch_notification_status_for_service_for_today_and_7_previous_days, @@ -16,11 +15,9 @@ from app.dao.fact_notification_status_dao import ( fetch_notification_statuses_for_job, fetch_stats_for_all_services_by_date_range, get_total_notifications_for_date_range, - update_fact_notification_status, ) from app.models import ( EMAIL_TYPE, - KEY_TYPE_NORMAL, KEY_TYPE_TEAM, KEY_TYPE_TEST, LETTER_TYPE, @@ -34,112 +31,16 @@ from app.models import ( NOTIFICATION_TECHNICAL_FAILURE, NOTIFICATION_TEMPORARY_FAILURE, SMS_TYPE, - FactNotificationStatus, ) from tests.app.db import ( create_ft_notification_status, create_job, create_notification, - create_notification_history, create_service, - create_service_data_retention, create_template, ) -def test_update_fact_notification_status(notify_db_session): - first_service = create_service(service_name='First Service') - first_template = create_template(service=first_service) - second_service = create_service(service_name='second Service') - second_template = create_template(service=second_service, template_type='email') - third_service = create_service(service_name='third Service') - third_template = create_template(service=third_service, template_type='letter') - - create_service_data_retention(second_service, 'email', days_of_retention=3) - - process_day = date.today() - timedelta(days=5) - with freeze_time(datetime.combine(process_day, time.min)): - create_notification(template=first_template, status='delivered') - - # 2nd service email has 3 day data retention - data has been moved to history and doesn't exist in notifications - create_notification_history(template=second_template, status='temporary-failure') - - # team API key notifications are included - create_notification(template=third_template, status='sending', key_type=KEY_TYPE_TEAM) - - # test notifications are ignored - create_notification(template=third_template, status='sending', key_type=KEY_TYPE_TEST) - - # these created notifications from a different day get ignored - with freeze_time(datetime.combine(date.today() - timedelta(days=4), time.min)): - create_notification(template=first_template) - create_notification_history(template=second_template) - create_notification(template=third_template) - - for notification_type in ('letter', 'sms', 'email'): - data = fetch_notification_status_for_day(process_day=process_day, notification_type=notification_type) - update_fact_notification_status(data=data, process_day=process_day, notification_type=notification_type) - - new_fact_data = FactNotificationStatus.query.order_by(FactNotificationStatus.bst_date, - FactNotificationStatus.notification_type - ).all() - - assert len(new_fact_data) == 3 - assert new_fact_data[0].bst_date == process_day - assert new_fact_data[0].template_id == second_template.id - assert new_fact_data[0].service_id == second_service.id - assert new_fact_data[0].job_id == UUID('00000000-0000-0000-0000-000000000000') - assert new_fact_data[0].notification_type == 'email' - assert new_fact_data[0].notification_status == 'temporary-failure' - assert new_fact_data[0].notification_count == 1 - assert new_fact_data[0].key_type == KEY_TYPE_NORMAL - - assert new_fact_data[1].bst_date == process_day - assert new_fact_data[1].template_id == third_template.id - assert new_fact_data[1].service_id == third_service.id - assert new_fact_data[1].job_id == UUID('00000000-0000-0000-0000-000000000000') - assert new_fact_data[1].notification_type == 'letter' - assert new_fact_data[1].notification_status == 'sending' - assert new_fact_data[1].notification_count == 1 - assert new_fact_data[1].key_type == KEY_TYPE_TEAM - - assert new_fact_data[2].bst_date == process_day - assert new_fact_data[2].template_id == first_template.id - assert new_fact_data[2].service_id == first_service.id - assert new_fact_data[2].job_id == UUID('00000000-0000-0000-0000-000000000000') - assert new_fact_data[2].notification_type == 'sms' - assert new_fact_data[2].notification_status == 'delivered' - assert new_fact_data[2].notification_count == 1 - assert new_fact_data[2].key_type == KEY_TYPE_NORMAL - - -def test__update_fact_notification_status_updates_row(notify_db_session): - first_service = create_service(service_name='First Service') - first_template = create_template(service=first_service) - create_notification(template=first_template, status='delivered') - - process_day = date.today() - data = fetch_notification_status_for_day(process_day=process_day, notification_type='sms') - update_fact_notification_status(data=data, process_day=process_day, notification_type='sms') - - new_fact_data = FactNotificationStatus.query.order_by(FactNotificationStatus.bst_date, - FactNotificationStatus.notification_type - ).all() - assert len(new_fact_data) == 1 - assert new_fact_data[0].notification_count == 1 - - create_notification(template=first_template, status='delivered') - - data = fetch_notification_status_for_day(process_day=process_day, notification_type='sms') - update_fact_notification_status(data=data, process_day=process_day, notification_type='sms') - - updated_fact_data = FactNotificationStatus.query.order_by(FactNotificationStatus.bst_date, - FactNotificationStatus.notification_type - ).all() - assert len(updated_fact_data) == 1 - assert updated_fact_data[0].notification_count == 2 - - def test_fetch_notification_status_for_service_by_month(notify_db_session): service_1 = create_service(service_name='service_1') service_2 = create_service(service_name='service_2') From ddbf55648646ac3e45ea669bf63fa2f471daeb32 Mon Sep 17 00:00:00 2001 From: Ben Thorner Date: Tue, 11 Jan 2022 16:33:56 +0000 Subject: [PATCH 2/8] Rewrite task to aggregate status by service This is a step towards parallelising the task by service and day. --- app/celery/reporting_tasks.py | 41 ++++++++++++++++--------- app/dao/fact_notification_status_dao.py | 40 ++++++++++-------------- 2 files changed, 42 insertions(+), 39 deletions(-) diff --git a/app/celery/reporting_tasks.py b/app/celery/reporting_tasks.py index fae3f148e..8ac841c9e 100644 --- a/app/celery/reporting_tasks.py +++ b/app/celery/reporting_tasks.py @@ -3,7 +3,7 @@ from datetime import datetime, timedelta from flask import current_app from notifications_utils.timezones import convert_utc_to_bst -from app import notify_celery +from app import db, notify_celery from app.config import QueueNames from app.cronitor import cronitor from app.dao.fact_billing_dao import ( @@ -11,10 +11,10 @@ from app.dao.fact_billing_dao import ( update_fact_billing, ) from app.dao.fact_notification_status_dao import ( - fetch_notification_status_for_day, + fetch_status_data_for_service_and_day, update_fact_notification_status, ) -from app.models import EMAIL_TYPE, LETTER_TYPE, SMS_TYPE +from app.models import EMAIL_TYPE, LETTER_TYPE, SMS_TYPE, Service @notify_celery.task(name="create-nightly-billing") @@ -124,17 +124,28 @@ def create_nightly_notification_status_for_day(process_day, notification_type): f'create-nightly-notification-status-for-day task for {process_day} type {notification_type}: started' ) - start = datetime.utcnow() - transit_data = fetch_notification_status_for_day(process_day=process_day, notification_type=notification_type) - end = datetime.utcnow() - current_app.logger.info( - f'create-nightly-notification-status-for-day task for {process_day} type {notification_type}: ' - f'data fetched in {(end - start).seconds} seconds' - ) + for (service_id,) in db.session.query(Service.id): + start = datetime.utcnow() + transit_data = fetch_status_data_for_service_and_day( + process_day=process_day, + notification_type=notification_type, + service_id=service_id, + ) - update_fact_notification_status(transit_data, process_day, notification_type) + end = datetime.utcnow() + current_app.logger.info( + f'create-nightly-notification-status-for-day task for {process_day} type {notification_type}: ' + f'data fetched in {(end - start).seconds} seconds' + ) - current_app.logger.info( - f'create-nightly-notification-status-for-day task for {process_day} type {notification_type}: ' - f'task complete - {len(transit_data)} rows updated' - ) + update_fact_notification_status( + transit_data=transit_data, + process_day=process_day, + notification_type=notification_type, + service_id=service_id + ) + + current_app.logger.info( + f'create-nightly-notification-status-for-day task for {process_day} type {notification_type}: ' + f'task complete - {len(transit_data)} rows updated' + ) diff --git a/app/dao/fact_notification_status_dao.py b/app/dao/fact_notification_status_dao.py index ad26ddd2d..3e315da5a 100644 --- a/app/dao/fact_notification_status_dao.py +++ b/app/dao/fact_notification_status_dao.py @@ -1,6 +1,5 @@ from datetime import datetime, time, timedelta -from flask import current_app from notifications_utils.timezones import convert_bst_to_utc from sqlalchemy import Date, case, func from sqlalchemy.dialects.postgresql import insert @@ -36,29 +35,21 @@ from app.utils import ( ) -def fetch_notification_status_for_day(process_day, notification_type): +def fetch_status_data_for_service_and_day(process_day, service_id, notification_type): start_date = convert_bst_to_utc(datetime.combine(process_day, time.min)) end_date = convert_bst_to_utc(datetime.combine(process_day + timedelta(days=1), time.min)) - current_app.logger.info("Fetch ft_notification_status for {} to {}".format(start_date, end_date)) + # query notifications or notification_history for the day, depending on their data retention + service = Service.query.get(service_id) + table = get_notification_table_to_use(service, notification_type, process_day, has_delete_task_run=False) - all_data_for_process_day = [] - services = Service.query.all() - # for each service query notifications or notification_history for the day, depending on their data retention - for service in services: - table = get_notification_table_to_use(service, notification_type, process_day, has_delete_task_run=False) - - data_for_service_and_type = query_for_fact_status_data( - table=table, - start_date=start_date, - end_date=end_date, - notification_type=notification_type, - service_id=service.id - ) - - all_data_for_process_day += data_for_service_and_type - - return all_data_for_process_day + return query_for_fact_status_data( + table=table, + start_date=start_date, + end_date=end_date, + notification_type=notification_type, + service_id=service.id + ) def query_for_fact_status_data(table, start_date, end_date, notification_type, service_id): @@ -86,18 +77,19 @@ def query_for_fact_status_data(table, start_date, end_date, notification_type, s @autocommit -def update_fact_notification_status(data, process_day, notification_type): +def update_fact_notification_status(transit_data, process_day, notification_type, service_id): table = FactNotificationStatus.__table__ FactNotificationStatus.query.filter( FactNotificationStatus.bst_date == process_day, - FactNotificationStatus.notification_type == notification_type + FactNotificationStatus.notification_type == notification_type, + FactNotificationStatus.service_id == service_id, ).delete() - for row in data: + for row in transit_data: stmt = insert(table).values( bst_date=process_day, template_id=row.template_id, - service_id=row.service_id, + service_id=service_id, job_id=row.job_id, notification_type=notification_type, key_type=row.key_type, From 4feed950c4c55de48a092710bb133aa902bd75eb Mon Sep 17 00:00:00 2001 From: Ben Thorner Date: Tue, 11 Jan 2022 16:42:41 +0000 Subject: [PATCH 3/8] DRY-up loops to kick off status aggregation tasks This will make it easier to parallelise by service in the following commits, since we only have one loop to change. --- app/celery/reporting_tasks.py | 24 +++++++----------------- 1 file changed, 7 insertions(+), 17 deletions(-) diff --git a/app/celery/reporting_tasks.py b/app/celery/reporting_tasks.py index 8ac841c9e..28f91cc7f 100644 --- a/app/celery/reporting_tasks.py +++ b/app/celery/reporting_tasks.py @@ -91,31 +91,21 @@ def create_nightly_notification_status(): yesterday = convert_utc_to_bst(datetime.utcnow()).date() - timedelta(days=1) - # email and sms - for i in range(4): - process_day = yesterday - timedelta(days=i) - for notification_type in [SMS_TYPE, EMAIL_TYPE]: + for notification_type in [SMS_TYPE, EMAIL_TYPE, LETTER_TYPE]: + days = 10 if notification_type == LETTER_TYPE else 4 + + for i in range(days): + process_day = yesterday - timedelta(days=i) + create_nightly_notification_status_for_day.apply_async( kwargs={'process_day': process_day.isoformat(), 'notification_type': notification_type}, queue=QueueNames.REPORTING ) current_app.logger.info( - f"create-nightly-notification-status task: create-nightly-notification-status-for-day task created " + f"create-nightly-notification-status-for-day task created " f"for type {notification_type} for {process_day}" ) - # letters - for i in range(10): - process_day = yesterday - timedelta(days=i) - create_nightly_notification_status_for_day.apply_async( - kwargs={'process_day': process_day.isoformat(), 'notification_type': LETTER_TYPE}, - queue=QueueNames.REPORTING - ) - current_app.logger.info( - f"create-nightly-notification-status task: create-nightly-notification-status-for-day task created " - f"for type letter for {process_day}" - ) - @notify_celery.task(name="create-nightly-notification-status-for-day") def create_nightly_notification_status_for_day(process_day, notification_type): From d772ae6b46cad48bc3ee9aa0657b7b2fda3de387 Mon Sep 17 00:00:00 2001 From: Ben Thorner Date: Tue, 11 Jan 2022 16:46:45 +0000 Subject: [PATCH 4/8] Standardise logs for status aggregation tasks This will make it easier to parallelise by service later on. --- app/celery/reporting_tasks.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/app/celery/reporting_tasks.py b/app/celery/reporting_tasks.py index 28f91cc7f..b11f08261 100644 --- a/app/celery/reporting_tasks.py +++ b/app/celery/reporting_tasks.py @@ -103,7 +103,7 @@ def create_nightly_notification_status(): ) current_app.logger.info( f"create-nightly-notification-status-for-day task created " - f"for type {notification_type} for {process_day}" + f"for {notification_type} and {process_day}" ) @@ -111,7 +111,8 @@ def create_nightly_notification_status(): def create_nightly_notification_status_for_day(process_day, notification_type): process_day = datetime.strptime(process_day, "%Y-%m-%d").date() current_app.logger.info( - f'create-nightly-notification-status-for-day task for {process_day} type {notification_type}: started' + f'create-nightly-notification-status-for-day task started ' + f'for {notification_type} and {process_day}' ) for (service_id,) in db.session.query(Service.id): @@ -124,7 +125,8 @@ def create_nightly_notification_status_for_day(process_day, notification_type): end = datetime.utcnow() current_app.logger.info( - f'create-nightly-notification-status-for-day task for {process_day} type {notification_type}: ' + f'create-nightly-notification-status-for-day task fetch ' + f'for {process_day} and {notification_type}: ' f'data fetched in {(end - start).seconds} seconds' ) @@ -136,6 +138,7 @@ def create_nightly_notification_status_for_day(process_day, notification_type): ) current_app.logger.info( - f'create-nightly-notification-status-for-day task for {process_day} type {notification_type}: ' - f'task complete - {len(transit_data)} rows updated' + f'create-nightly-notification-status-for-day task finished ' + f'for {process_day} and {notification_type}: ' + f'{len(transit_data)} rows updated' ) From c3da139e9c2aaaf62b3282bae9beda26b22cb941 Mon Sep 17 00:00:00 2001 From: Ben Thorner Date: Tue, 11 Jan 2022 16:58:38 +0000 Subject: [PATCH 5/8] Remove redundant migration tasks (esp. for status) These were added long ago [1][2] and aren't referenced in runbooks, so it should be safe to delete them. [1]: https://github.com/alphagov/notifications-api/commit/13f36620515894cd36731969b5526bd723806271 [2]: https://github.com/alphagov/notifications-api/commit/b9953dd005d45f0a7dcd1020090f54e642ee677b --- app/commands.py | 109 ------------------------------------------------ 1 file changed, 109 deletions(-) diff --git a/app/commands.py b/app/commands.py index 565b83bde..e44f96d04 100644 --- a/app/commands.py +++ b/app/commands.py @@ -23,9 +23,6 @@ from app.celery.letters_pdf_tasks import ( get_pdf_for_templated_letter, resanitise_pdf, ) -from app.celery.reporting_tasks import ( - create_nightly_notification_status_for_day, -) from app.celery.tasks import process_row, record_daily_sorted_counts from app.config import QueueNames from app.dao.annual_billing_dao import ( @@ -62,9 +59,7 @@ from app.dao.users_dao import ( get_user_by_email, ) from app.models import ( - EMAIL_TYPE, KEY_TYPE_TEST, - LETTER_TYPE, NOTIFICATION_CREATED, PROVIDERS, SMS_TYPE, @@ -281,87 +276,6 @@ def setup_commands(application): application.cli.add_command(command_group) -@notify_command(name='migrate-data-to-ft-billing') -@click.option('-s', '--start_date', required=True, help="start date inclusive", type=click_dt(format='%Y-%m-%d')) -@click.option('-e', '--end_date', required=True, help="end date inclusive", type=click_dt(format='%Y-%m-%d')) -@statsd(namespace="tasks") -def migrate_data_to_ft_billing(start_date, end_date): - - current_app.logger.info('Billing migration from date {} to {}'.format(start_date, end_date)) - - process_date = start_date - total_updated = 0 - - while process_date < end_date: - start_time = datetime.utcnow() - # migrate data into ft_billing, upserting the data if it the record already exists - sql = \ - """ - insert into ft_billing (bst_date, template_id, service_id, notification_type, provider, rate_multiplier, - international, billable_units, notifications_sent, rate, postage, created_at) - select bst_date, template_id, service_id, notification_type, provider, rate_multiplier, international, - sum(billable_units) as billable_units, sum(notifications_sent) as notification_sent, - case when notification_type = 'sms' then sms_rate else letter_rate end as rate, postage, created_at - from ( - select - n.id, - (n.created_at at time zone 'UTC' at time zone 'Europe/London')::timestamp::date as bst_date, - coalesce(n.template_id, '00000000-0000-0000-0000-000000000000') as template_id, - coalesce(n.service_id, '00000000-0000-0000-0000-000000000000') as service_id, - n.notification_type, - coalesce(n.sent_by, ( - case - when notification_type = 'sms' then - coalesce(sent_by, 'unknown') - when notification_type = 'letter' then - coalesce(sent_by, 'dvla') - else - coalesce(sent_by, 'ses') - end )) as provider, - coalesce(n.rate_multiplier,1) as rate_multiplier, - s.crown, - coalesce((select rates.rate from rates - where n.notification_type = rates.notification_type and n.created_at > rates.valid_from - order by rates.valid_from desc limit 1), 0) as sms_rate, - coalesce((select l.rate from letter_rates l where n.billable_units = l.sheet_count - and s.crown = l.crown and n.postage = l.post_class and n.created_at >= l.start_date - and n.created_at < coalesce(l.end_date, now()) and n.notification_type='letter'), 0) - as letter_rate, - coalesce(n.international, false) as international, - n.billable_units, - 1 as notifications_sent, - coalesce(n.postage, 'none') as postage, - now() as created_at - from public.notification_history n - left join services s on s.id = n.service_id - where n.key_type!='test' - and n.notification_status in - ('sending', 'sent', 'delivered', 'temporary-failure', 'permanent-failure', 'failed') - and n.created_at >= (date :start + time '00:00:00') at time zone 'Europe/London' - at time zone 'UTC' - and n.created_at < (date :end + time '00:00:00') at time zone 'Europe/London' at time zone 'UTC' - ) as individual_record - group by bst_date, template_id, service_id, notification_type, provider, rate_multiplier, international, - sms_rate, letter_rate, postage, created_at - order by bst_date - on conflict on constraint ft_billing_pkey do update set - billable_units = excluded.billable_units, - notifications_sent = excluded.notifications_sent, - rate = excluded.rate, - updated_at = now() - """ - - result = db.session.execute(sql, {"start": process_date, "end": process_date + timedelta(days=1)}) - db.session.commit() - current_app.logger.info('ft_billing: --- Completed took {}ms. Migrated {} rows for {}'.format( - datetime.now() - start_time, result.rowcount, process_date)) - - process_date += timedelta(days=1) - - total_updated += result.rowcount - current_app.logger.info('Total inserted/updated records = {}'.format(total_updated)) - - @notify_command(name='rebuild-ft-billing-for-day') @click.option('-s', '--service_id', required=False, type=click.UUID) @click.option('-d', '--day', help="The date to recalculate, as YYYY-MM-DD", required=True, @@ -401,29 +315,6 @@ def rebuild_ft_billing_for_day(service_id, day): rebuild_ft_data(day, row.service_id) -@notify_command(name='migrate-data-to-ft-notification-status') -@click.option('-s', '--start_date', required=True, help="start date inclusive", type=click_dt(format='%Y-%m-%d')) -@click.option('-e', '--end_date', required=True, help="end date inclusive", type=click_dt(format='%Y-%m-%d')) -@click.option('-t', '--notification-type', required=False, help="notification type (or leave blank for all types)") -@statsd(namespace="tasks") -def migrate_data_to_ft_notification_status(start_date, end_date, notification_type=None): - notification_types = [SMS_TYPE, LETTER_TYPE, EMAIL_TYPE] if notification_type is None else [notification_type] - - start_date = start_date.date() - end_date = end_date.date() - for day_diff in range((end_date - start_date).days + 1): - process_day = start_date + timedelta(days=day_diff) - for notification_type in notification_types: - print('create_nightly_notification_status_for_day triggered for {} and {}'.format( - process_day, - notification_type - )) - create_nightly_notification_status_for_day.apply_async( - kwargs={'process_day': process_day.strftime('%Y-%m-%d'), 'notification_type': notification_type}, - queue=QueueNames.REPORTING - ) - - @notify_command(name='bulk-invite-user-to-service') @click.option('-f', '--file_name', required=True, help="Full path of the file containing a list of email address for people to invite to a service") From 9182ebf4e568d5bf132fc2ac849779cd1bad3c5f Mon Sep 17 00:00:00 2001 From: Ben Thorner Date: Tue, 11 Jan 2022 17:01:06 +0000 Subject: [PATCH 6/8] Parallelise status aggregation by service and day This follows a similar approach as [1]. Recently we've seen lots of errors from this task, which we think are a consequence of it doing too much work and tripping Celery's visibility timeout. While we can optimise the query [2], it's likely the errors will return as the number of live services grows. Parallelising the aggregation now will make it more futureproof. [1]: https://github.com/alphagov/notifications-api/pull/3397 [2]: https://github.com/alphagov/notifications-api/pull/3417 --- app/celery/reporting_tasks.py | 82 +++++++++++++----------- tests/app/celery/test_reporting_tasks.py | 36 +++++++---- 2 files changed, 65 insertions(+), 53 deletions(-) diff --git a/app/celery/reporting_tasks.py b/app/celery/reporting_tasks.py index b11f08261..0036a076f 100644 --- a/app/celery/reporting_tasks.py +++ b/app/celery/reporting_tasks.py @@ -91,54 +91,58 @@ def create_nightly_notification_status(): yesterday = convert_utc_to_bst(datetime.utcnow()).date() - timedelta(days=1) - for notification_type in [SMS_TYPE, EMAIL_TYPE, LETTER_TYPE]: - days = 10 if notification_type == LETTER_TYPE else 4 + for (service_id,) in db.session.query(Service.id): + for notification_type in [SMS_TYPE, EMAIL_TYPE, LETTER_TYPE]: + days = 10 if notification_type == LETTER_TYPE else 4 - for i in range(days): - process_day = yesterday - timedelta(days=i) + for i in range(days): + process_day = yesterday - timedelta(days=i) - create_nightly_notification_status_for_day.apply_async( - kwargs={'process_day': process_day.isoformat(), 'notification_type': notification_type}, - queue=QueueNames.REPORTING - ) - current_app.logger.info( - f"create-nightly-notification-status-for-day task created " - f"for {notification_type} and {process_day}" - ) + create_nightly_notification_status_for_service_and_day.apply_async( + kwargs={ + 'process_day': process_day.isoformat(), + 'notification_type': notification_type, + 'service_id': service_id, + }, + queue=QueueNames.REPORTING + ) + current_app.logger.info( + f"create-nightly-notification-status-for-day task created " + f"for {service_id}, {notification_type} and {process_day}" + ) -@notify_celery.task(name="create-nightly-notification-status-for-day") -def create_nightly_notification_status_for_day(process_day, notification_type): +@notify_celery.task(name="create-nightly-notification-status-for-service-and-day") +def create_nightly_notification_status_for_service_and_day(process_day, service_id, notification_type): process_day = datetime.strptime(process_day, "%Y-%m-%d").date() current_app.logger.info( f'create-nightly-notification-status-for-day task started ' - f'for {notification_type} and {process_day}' + f'for {service_id}, {notification_type} and {process_day}' ) - for (service_id,) in db.session.query(Service.id): - start = datetime.utcnow() - transit_data = fetch_status_data_for_service_and_day( - process_day=process_day, - notification_type=notification_type, - service_id=service_id, - ) + start = datetime.utcnow() + transit_data = fetch_status_data_for_service_and_day( + process_day=process_day, + notification_type=notification_type, + service_id=service_id, + ) - end = datetime.utcnow() - current_app.logger.info( - f'create-nightly-notification-status-for-day task fetch ' - f'for {process_day} and {notification_type}: ' - f'data fetched in {(end - start).seconds} seconds' - ) + end = datetime.utcnow() + current_app.logger.info( + f'create-nightly-notification-status-for-day task fetch ' + f'for {service_id}, {process_day} and {notification_type}: ' + f'data fetched in {(end - start).seconds} seconds' + ) - update_fact_notification_status( - transit_data=transit_data, - process_day=process_day, - notification_type=notification_type, - service_id=service_id - ) + update_fact_notification_status( + transit_data=transit_data, + process_day=process_day, + notification_type=notification_type, + service_id=service_id + ) - current_app.logger.info( - f'create-nightly-notification-status-for-day task finished ' - f'for {process_day} and {notification_type}: ' - f'{len(transit_data)} rows updated' - ) + current_app.logger.info( + f'create-nightly-notification-status-for-day task finished ' + f'for {service_id}, {process_day} and {notification_type}: ' + f'{len(transit_data)} rows updated' + ) diff --git a/tests/app/celery/test_reporting_tasks.py b/tests/app/celery/test_reporting_tasks.py index dacab4db0..9e0345200 100644 --- a/tests/app/celery/test_reporting_tasks.py +++ b/tests/app/celery/test_reporting_tasks.py @@ -10,7 +10,7 @@ from app.celery.reporting_tasks import ( create_nightly_billing, create_nightly_billing_for_day, create_nightly_notification_status, - create_nightly_notification_status_for_day, + create_nightly_notification_status_for_service_and_day, ) from app.config import QueueNames from app.dao.fact_billing_dao import get_rate @@ -62,8 +62,8 @@ def test_create_nightly_billing_triggers_tasks_for_days(notify_api, mocker, day_ @freeze_time('2019-08-01') -def test_create_nightly_notification_status_triggers_tasks_for_days(notify_api, mocker): - mock_celery = mocker.patch('app.celery.reporting_tasks.create_nightly_notification_status_for_day') +def test_create_nightly_notification_status_triggers_tasks(notify_api, sample_service, mocker): + mock_celery = mocker.patch('app.celery.reporting_tasks.create_nightly_notification_status_for_service_and_day') create_nightly_notification_status() assert mock_celery.apply_async.call_count == ( @@ -77,13 +77,21 @@ def test_create_nightly_notification_status_triggers_tasks_for_days(notify_api, [SMS_TYPE, EMAIL_TYPE, LETTER_TYPE] ): mock_celery.apply_async.assert_any_call( - kwargs={'process_day': process_date, 'notification_type': notification_type}, + kwargs={ + 'process_day': process_date, + 'notification_type': notification_type, + 'service_id': sample_service.id, + }, queue=QueueNames.REPORTING ) for process_date in ['2019-07-27', '2019-07-26', '2019-07-25', '2019-07-24', '2019-07-23', '2019-07-22']: mock_celery.apply_async.assert_any_call( - kwargs={'process_day': process_date, 'notification_type': LETTER_TYPE}, + kwargs={ + 'process_day': process_date, + 'notification_type': LETTER_TYPE, + 'service_id': sample_service.id, + }, queue=QueueNames.REPORTING ) @@ -501,7 +509,7 @@ def test_create_nightly_billing_for_day_update_when_record_exists( assert records[0].updated_at -def test_create_nightly_notification_status_for_day(notify_db_session): +def test_create_nightly_notification_status_for_service_and_day(notify_db_session): first_service = create_service(service_name='First Service') first_template = create_template(service=first_service) second_service = create_service(service_name='second Service') @@ -532,9 +540,9 @@ def test_create_nightly_notification_status_for_day(notify_db_session): assert len(FactNotificationStatus.query.all()) == 0 - create_nightly_notification_status_for_day(str(process_day), 'sms') - create_nightly_notification_status_for_day(str(process_day), 'email') - create_nightly_notification_status_for_day(str(process_day), 'letter') + create_nightly_notification_status_for_service_and_day(str(process_day), first_service.id, 'sms') + create_nightly_notification_status_for_service_and_day(str(process_day), second_service.id, 'email') + create_nightly_notification_status_for_service_and_day(str(process_day), third_service.id, 'letter') new_fact_data = FactNotificationStatus.query.order_by( FactNotificationStatus.notification_type @@ -569,13 +577,13 @@ def test_create_nightly_notification_status_for_day(notify_db_session): assert new_fact_data[2].key_type == KEY_TYPE_NORMAL -def test_create_nightly_notification_status_for_day_overwrites_old_data(notify_db_session): +def test_create_nightly_notification_status_for_service_and_day_overwrites_old_data(notify_db_session): first_service = create_service(service_name='First Service') first_template = create_template(service=first_service) create_notification(template=first_template, status='delivered') process_day = date.today() - create_nightly_notification_status_for_day(str(process_day), 'sms') + create_nightly_notification_status_for_service_and_day(str(process_day), first_service.id, 'sms') new_fact_data = FactNotificationStatus.query.order_by( FactNotificationStatus.bst_date, @@ -586,7 +594,7 @@ def test_create_nightly_notification_status_for_day_overwrites_old_data(notify_d assert new_fact_data[0].notification_count == 1 create_notification(template=first_template, status='delivered') - create_nightly_notification_status_for_day(str(process_day), 'sms') + create_nightly_notification_status_for_service_and_day(str(process_day), first_service.id, 'sms') updated_fact_data = FactNotificationStatus.query.order_by( FactNotificationStatus.bst_date, @@ -599,7 +607,7 @@ def test_create_nightly_notification_status_for_day_overwrites_old_data(notify_d # the job runs at 12:30am London time. 04/01 is in BST. @freeze_time('2019-04-01T23:30') -def test_create_nightly_notification_status_for_day_respects_bst(sample_template): +def test_create_nightly_notification_status_for_service_and_day_respects_bst(sample_template): create_notification(sample_template, status='delivered', created_at=datetime(2019, 4, 1, 23, 0)) # too new create_notification(sample_template, status='created', created_at=datetime(2019, 4, 1, 22, 59)) @@ -607,7 +615,7 @@ def test_create_nightly_notification_status_for_day_respects_bst(sample_template create_notification(sample_template, status='delivered', created_at=datetime(2019, 3, 31, 22, 59)) # too old - create_nightly_notification_status_for_day('2019-04-01', 'sms') + create_nightly_notification_status_for_service_and_day('2019-04-01', sample_template.service_id, 'sms') noti_status = FactNotificationStatus.query.order_by(FactNotificationStatus.bst_date).all() assert len(noti_status) == 1 From 086f0f50a64a8216ce8ed7b6617f6bbdb219ad9a Mon Sep 17 00:00:00 2001 From: Ben Thorner Date: Tue, 11 Jan 2022 17:19:25 +0000 Subject: [PATCH 7/8] Remove unnecessary extra method in status DAO This makes it easier to see what is being queried. --- app/dao/fact_notification_status_dao.py | 15 ++------------- 1 file changed, 2 insertions(+), 13 deletions(-) diff --git a/app/dao/fact_notification_status_dao.py b/app/dao/fact_notification_status_dao.py index 3e315da5a..728dbe04c 100644 --- a/app/dao/fact_notification_status_dao.py +++ b/app/dao/fact_notification_status_dao.py @@ -43,17 +43,7 @@ def fetch_status_data_for_service_and_day(process_day, service_id, notification_ service = Service.query.get(service_id) table = get_notification_table_to_use(service, notification_type, process_day, has_delete_task_run=False) - return query_for_fact_status_data( - table=table, - start_date=start_date, - end_date=end_date, - notification_type=notification_type, - service_id=service.id - ) - - -def query_for_fact_status_data(table, start_date, end_date, notification_type, service_id): - query = db.session.query( + return db.session.query( table.template_id, table.service_id, func.coalesce(table.job_id, '00000000-0000-0000-0000-000000000000').label('job_id'), @@ -72,8 +62,7 @@ def query_for_fact_status_data(table, start_date, end_date, notification_type, s 'job_id', table.key_type, table.status - ) - return query.all() + ).all() @autocommit From 9686595fa85c1fc5458f26c3afcc893a8aa93351 Mon Sep 17 00:00:00 2001 From: Ben Thorner Date: Tue, 18 Jan 2022 16:56:53 +0000 Subject: [PATCH 8/8] Minor tweaks to address comments on the PR To address: - https://github.com/alphagov/notifications-api/pull/3425#discussion_r786867994 - https://github.com/alphagov/notifications-api/pull/3425#discussion_r786853329 - https://github.com/alphagov/notifications-api/pull/3425#discussion_r786848793 - https://github.com/alphagov/notifications-api/pull/3425#discussion_r786214794 --- app/celery/reporting_tasks.py | 12 +++++------ app/dao/fact_notification_status_dao.py | 27 ++++++++++++------------- 2 files changed, 19 insertions(+), 20 deletions(-) diff --git a/app/celery/reporting_tasks.py b/app/celery/reporting_tasks.py index 0036a076f..9d404edaa 100644 --- a/app/celery/reporting_tasks.py +++ b/app/celery/reporting_tasks.py @@ -117,11 +117,11 @@ def create_nightly_notification_status_for_service_and_day(process_day, service_ process_day = datetime.strptime(process_day, "%Y-%m-%d").date() current_app.logger.info( f'create-nightly-notification-status-for-day task started ' - f'for {service_id}, {notification_type} and {process_day}' + f'for {service_id}, {notification_type} for {process_day}' ) start = datetime.utcnow() - transit_data = fetch_status_data_for_service_and_day( + new_status_rows = fetch_status_data_for_service_and_day( process_day=process_day, notification_type=notification_type, service_id=service_id, @@ -130,12 +130,12 @@ def create_nightly_notification_status_for_service_and_day(process_day, service_ end = datetime.utcnow() current_app.logger.info( f'create-nightly-notification-status-for-day task fetch ' - f'for {service_id}, {process_day} and {notification_type}: ' + f'for {service_id}, {notification_type} for {process_day}: ' f'data fetched in {(end - start).seconds} seconds' ) update_fact_notification_status( - transit_data=transit_data, + new_status_rows=new_status_rows, process_day=process_day, notification_type=notification_type, service_id=service_id @@ -143,6 +143,6 @@ def create_nightly_notification_status_for_service_and_day(process_day, service_ current_app.logger.info( f'create-nightly-notification-status-for-day task finished ' - f'for {service_id}, {process_day} and {notification_type}: ' - f'{len(transit_data)} rows updated' + f'for {service_id}, {notification_type} for {process_day}: ' + f'{len(new_status_rows)} rows updated' ) diff --git a/app/dao/fact_notification_status_dao.py b/app/dao/fact_notification_status_dao.py index 728dbe04c..9c4f10da7 100644 --- a/app/dao/fact_notification_status_dao.py +++ b/app/dao/fact_notification_status_dao.py @@ -45,7 +45,6 @@ def fetch_status_data_for_service_and_day(process_day, service_id, notification_ return db.session.query( table.template_id, - table.service_id, func.coalesce(table.job_id, '00000000-0000-0000-0000-000000000000').label('job_id'), table.key_type, table.status, @@ -58,7 +57,6 @@ def fetch_status_data_for_service_and_day(process_day, service_id, notification_ table.key_type.in_((KEY_TYPE_NORMAL, KEY_TYPE_TEAM)), ).group_by( table.template_id, - table.service_id, 'job_id', table.key_type, table.status @@ -66,7 +64,7 @@ def fetch_status_data_for_service_and_day(process_day, service_id, notification_ @autocommit -def update_fact_notification_status(transit_data, process_day, notification_type, service_id): +def update_fact_notification_status(new_status_rows, process_day, notification_type, service_id): table = FactNotificationStatus.__table__ FactNotificationStatus.query.filter( FactNotificationStatus.bst_date == process_day, @@ -74,18 +72,19 @@ def update_fact_notification_status(transit_data, process_day, notification_type FactNotificationStatus.service_id == service_id, ).delete() - for row in transit_data: - stmt = insert(table).values( - bst_date=process_day, - template_id=row.template_id, - service_id=service_id, - job_id=row.job_id, - notification_type=notification_type, - key_type=row.key_type, - notification_status=row.status, - notification_count=row.notification_count, + for row in new_status_rows: + db.session.connection().execute( + insert(table).values( + bst_date=process_day, + template_id=row.template_id, + service_id=service_id, + job_id=row.job_id, + notification_type=notification_type, + key_type=row.key_type, + notification_status=row.status, + notification_count=row.notification_count, + ) ) - db.session.connection().execute(stmt) def fetch_notification_status_for_service_by_month(start_date, end_date, service_id):