From a69d1635a1fa72bb6dff3f13283f32aaf584ceaf Mon Sep 17 00:00:00 2001 From: Ben Thorner Date: Thu, 10 Feb 2022 15:00:57 +0000 Subject: [PATCH] Update FactStatus table in bulk for each service Previously we were looping over data from the Notifications/History table and then shovelling it into the status table, one row at a time - plus an extra delete to clean up any existing data. This replaces that with a batch insertion, similar to how we archive notifications [1], but using a simple subquery (via "from_select" [2]) instead of a temporary table. To make the select compatible with the insert, I've used "literal" to inject the constant pieces of data, so each row has everything it needs to go into the status table. [1]: https://github.com/alphagov/notifications-api/blob/9ce6d2fe924626c578cdb77467d8df0e842e9821/app/dao/notifications_dao.py#L295 [2]: https://docs.sqlalchemy.org/en/14/core/dml.html#sqlalchemy.sql.expression.Insert.from_select --- app/celery/reporting_tasks.py | 22 +---- app/dao/fact_notification_status_dao.py | 87 ++++++++++--------- .../dao/test_fact_notification_status_dao.py | 13 ++- 3 files changed, 58 insertions(+), 64 deletions(-) diff --git a/app/celery/reporting_tasks.py b/app/celery/reporting_tasks.py index b847f4e62..63b41e674 100644 --- a/app/celery/reporting_tasks.py +++ b/app/celery/reporting_tasks.py @@ -10,10 +10,7 @@ from app.dao.fact_billing_dao import ( fetch_billing_data_for_day, update_fact_billing, ) -from app.dao.fact_notification_status_dao import ( - fetch_status_data_for_service_and_day, - update_fact_notification_status, -) +from app.dao.fact_notification_status_dao import update_fact_notification_status from app.dao.notifications_dao import get_service_ids_with_notifications_on_date from app.models import EMAIL_TYPE, LETTER_TYPE, SMS_TYPE @@ -117,23 +114,8 @@ def create_nightly_notification_status(): def create_nightly_notification_status_for_service_and_day(process_day, service_id, notification_type): process_day = datetime.strptime(process_day, "%Y-%m-%d").date() - start = datetime.utcnow() - new_status_rows = fetch_status_data_for_service_and_day( - process_day=process_day, - notification_type=notification_type, - service_id=service_id, - ) - - end = datetime.utcnow() - current_app.logger.info( - f'create-nightly-notification-status-for-service-and-day task fetch ' - f'for {service_id}, {notification_type} for {process_day}: ' - f'data fetched in {(end - start).seconds} seconds' - ) - start = datetime.utcnow() update_fact_notification_status( - new_status_rows=new_status_rows, process_day=process_day, notification_type=notification_type, service_id=service_id @@ -143,5 +125,5 @@ def create_nightly_notification_status_for_service_and_day(process_day, service_ current_app.logger.info( f'create-nightly-notification-status-for-service-and-day task update ' f'for {service_id}, {notification_type} for {process_day}: ' - f'data updated in {(end - start).seconds} seconds' + f'updated in {(end - start).seconds} seconds' ) diff --git a/app/dao/fact_notification_status_dao.py b/app/dao/fact_notification_status_dao.py index 52adfd64c..61b997688 100644 --- a/app/dao/fact_notification_status_dao.py +++ b/app/dao/fact_notification_status_dao.py @@ -34,59 +34,66 @@ from app.utils import ( ) -def fetch_status_data_for_service_and_day(process_day, service_id, notification_type): +@autocommit +def update_fact_notification_status(process_day, notification_type, service_id): start_date = get_london_midnight_in_utc(process_day) end_date = get_london_midnight_in_utc(process_day + timedelta(days=1)) # query notifications or notification_history for the day, depending on their data retention service = Service.query.get(service_id) - table = get_notification_table_to_use(service, notification_type, process_day, has_delete_task_run=False) + source_table = get_notification_table_to_use( + service, + notification_type, + process_day, + has_delete_task_run=False + ) - return db.session.query( - table.template_id, - func.coalesce(table.job_id, '00000000-0000-0000-0000-000000000000').label('job_id'), - table.key_type, - table.status, + query = db.session.query( + literal(process_day).label("process_day"), + source_table.template_id, + literal(service_id).label("service_id"), + func.coalesce(source_table.job_id, '00000000-0000-0000-0000-000000000000').label('job_id'), + literal(notification_type).label("notification_type"), + source_table.key_type, + source_table.status, func.count().label('notification_count') ).filter( - table.created_at >= start_date, - table.created_at < end_date, - table.notification_type == notification_type, - table.service_id == service_id, - table.key_type.in_((KEY_TYPE_NORMAL, KEY_TYPE_TEAM)), + source_table.created_at >= start_date, + source_table.created_at < end_date, + source_table.notification_type == notification_type, + source_table.service_id == service_id, + source_table.key_type.in_((KEY_TYPE_NORMAL, KEY_TYPE_TEAM)), ).group_by( - table.template_id, + source_table.template_id, + source_table.template_id, 'job_id', - table.key_type, - table.status - ).all() + source_table.key_type, + source_table.status + ) + stmt = insert(FactNotificationStatus.__table__).from_select( + [ + FactNotificationStatus.bst_date, + FactNotificationStatus.template_id, + FactNotificationStatus.service_id, + FactNotificationStatus.job_id, + FactNotificationStatus.notification_type, + FactNotificationStatus.key_type, + FactNotificationStatus.notification_status, + FactNotificationStatus.notification_count + ], + query + ) -@autocommit -def update_fact_notification_status(new_status_rows, process_day, notification_type, service_id): - table = FactNotificationStatus.__table__ + stmt = stmt.on_conflict_do_update( + constraint="ft_notification_status_pkey", + set_={ + FactNotificationStatus.notification_count: stmt.excluded.notification_count, + FactNotificationStatus.updated_at: datetime.utcnow() + } + ) - for row in new_status_rows: - stmt = insert(table).values( - bst_date=process_day, - template_id=row.template_id, - service_id=service_id, - job_id=row.job_id, - notification_type=notification_type, - key_type=row.key_type, - notification_status=row.status, - notification_count=row.notification_count, - ) - - stmt = stmt.on_conflict_do_update( - constraint="ft_notification_status_pkey", - set_={ - FactNotificationStatus.notification_count: stmt.excluded.notification_count, - FactNotificationStatus.updated_at: datetime.utcnow() - } - ) - - db.session.connection().execute(stmt) + db.session.connection().execute(stmt) def fetch_notification_status_for_service_by_month(start_date, end_date, service_id): diff --git a/tests/app/dao/test_fact_notification_status_dao.py b/tests/app/dao/test_fact_notification_status_dao.py index b86760c85..8dbdb3b3d 100644 --- a/tests/app/dao/test_fact_notification_status_dao.py +++ b/tests/app/dao/test_fact_notification_status_dao.py @@ -14,8 +14,8 @@ from app.dao.fact_notification_status_dao import ( fetch_notification_status_totals_for_all_services, fetch_notification_statuses_for_job, fetch_stats_for_all_services_by_date_range, - fetch_status_data_for_service_and_day, get_total_notifications_for_date_range, + update_fact_notification_status, ) from app.models import ( EMAIL_TYPE, @@ -32,6 +32,7 @@ from app.models import ( NOTIFICATION_TECHNICAL_FAILURE, NOTIFICATION_TEMPORARY_FAILURE, SMS_TYPE, + FactNotificationStatus, ) from tests.app.db import ( create_ft_notification_status, @@ -618,7 +619,7 @@ def test_get_total_notifications_for_date_range(sample_service): ('2022-03-27T23:30', date(2022, 3, 27), 0), # 28/03 00:30 BST ('2022-03-26T23:30', date(2022, 3, 26), 1), # 26/03 23:30 GMT ]) -def test_fetch_status_data_for_service_and_day_respects_gmt_bst( +def test_update_fact_notification_status_respects_gmt_bst( sample_template, sample_service, created_at_utc, @@ -626,5 +627,9 @@ def test_fetch_status_data_for_service_and_day_respects_gmt_bst( expected_count, ): create_notification(template=sample_template, created_at=created_at_utc) - rows = fetch_status_data_for_service_and_day(process_day, sample_service.id, SMS_TYPE) - assert len(rows) == expected_count + update_fact_notification_status(process_day, SMS_TYPE, sample_service.id) + + assert FactNotificationStatus.query.filter_by( + service_id=sample_service.id, + bst_date=process_day + ).count() == expected_count