From 0f6dea0debc7aeb817e643bd95b5feeabfaad639 Mon Sep 17 00:00:00 2001 From: Ben Thorner Date: Thu, 20 Jan 2022 10:39:23 +0000 Subject: [PATCH] Revert running status aggregation in parallel The top-level task didn't run successfully after this was deployed due to the worker being killed due to heavy disk usage. While the more parallel version does log much more, it doesn't totally explain the disk behaviour. Nonetheless, reverting it is sensible to give us the time we need to investigate more. --- app/celery/reporting_tasks.py | 58 ++++++++++++------------ app/dao/fact_notification_status_dao.py | 7 +-- tests/app/celery/test_reporting_tasks.py | 24 +++++----- 3 files changed, 44 insertions(+), 45 deletions(-) diff --git a/app/celery/reporting_tasks.py b/app/celery/reporting_tasks.py index 9d404edaa..b1fd308b8 100644 --- a/app/celery/reporting_tasks.py +++ b/app/celery/reporting_tasks.py @@ -3,7 +3,7 @@ from datetime import datetime, timedelta from flask import current_app from notifications_utils.timezones import convert_utc_to_bst -from app import db, notify_celery +from app import notify_celery from app.config import QueueNames from app.cronitor import cronitor from app.dao.fact_billing_dao import ( @@ -91,46 +91,47 @@ def create_nightly_notification_status(): yesterday = convert_utc_to_bst(datetime.utcnow()).date() - timedelta(days=1) - for (service_id,) in db.session.query(Service.id): - for notification_type in [SMS_TYPE, EMAIL_TYPE, LETTER_TYPE]: - days = 10 if notification_type == LETTER_TYPE else 4 + for notification_type in [SMS_TYPE, EMAIL_TYPE, LETTER_TYPE]: + days = 10 if notification_type == LETTER_TYPE else 4 - for i in range(days): - process_day = yesterday - timedelta(days=i) + for i in range(days): + process_day = yesterday - timedelta(days=i) - create_nightly_notification_status_for_service_and_day.apply_async( - kwargs={ - 'process_day': process_day.isoformat(), - 'notification_type': notification_type, - 'service_id': service_id, - }, - queue=QueueNames.REPORTING - ) - current_app.logger.info( - f"create-nightly-notification-status-for-day task created " - f"for {service_id}, {notification_type} and {process_day}" - ) + create_nightly_notification_status_for_day.apply_async( + kwargs={ + 'process_day': process_day.isoformat(), + 'notification_type': notification_type, + }, + queue=QueueNames.REPORTING + ) + current_app.logger.info( + f"create-nightly-notification-status-for-day task created " + f"for {notification_type} and {process_day}" + ) -@notify_celery.task(name="create-nightly-notification-status-for-service-and-day") -def create_nightly_notification_status_for_service_and_day(process_day, service_id, notification_type): +@notify_celery.task(name="create-nightly-notification-status-for-day") +def create_nightly_notification_status_for_day(process_day, notification_type): process_day = datetime.strptime(process_day, "%Y-%m-%d").date() current_app.logger.info( f'create-nightly-notification-status-for-day task started ' - f'for {service_id}, {notification_type} for {process_day}' + f'for {notification_type} for {process_day}' ) start = datetime.utcnow() - new_status_rows = fetch_status_data_for_service_and_day( - process_day=process_day, - notification_type=notification_type, - service_id=service_id, - ) + new_status_rows = [] + + for service in Service.query.all(): + new_status_rows += fetch_status_data_for_service_and_day( + process_day=process_day, + notification_type=notification_type, + service_id=service.id, + ) end = datetime.utcnow() current_app.logger.info( f'create-nightly-notification-status-for-day task fetch ' - f'for {service_id}, {notification_type} for {process_day}: ' + f'for {notification_type} for {process_day}: ' f'data fetched in {(end - start).seconds} seconds' ) @@ -138,11 +139,10 @@ def create_nightly_notification_status_for_service_and_day(process_day, service_ new_status_rows=new_status_rows, process_day=process_day, notification_type=notification_type, - service_id=service_id ) current_app.logger.info( f'create-nightly-notification-status-for-day task finished ' - f'for {service_id}, {notification_type} for {process_day}: ' + f'for {notification_type} for {process_day}: ' f'{len(new_status_rows)} rows updated' ) diff --git a/app/dao/fact_notification_status_dao.py b/app/dao/fact_notification_status_dao.py index 9c4f10da7..318a4e397 100644 --- a/app/dao/fact_notification_status_dao.py +++ b/app/dao/fact_notification_status_dao.py @@ -45,6 +45,7 @@ def fetch_status_data_for_service_and_day(process_day, service_id, notification_ return db.session.query( table.template_id, + table.service_id, func.coalesce(table.job_id, '00000000-0000-0000-0000-000000000000').label('job_id'), table.key_type, table.status, @@ -57,6 +58,7 @@ def fetch_status_data_for_service_and_day(process_day, service_id, notification_ table.key_type.in_((KEY_TYPE_NORMAL, KEY_TYPE_TEAM)), ).group_by( table.template_id, + table.service_id, 'job_id', table.key_type, table.status @@ -64,12 +66,11 @@ def fetch_status_data_for_service_and_day(process_day, service_id, notification_ @autocommit -def update_fact_notification_status(new_status_rows, process_day, notification_type, service_id): +def update_fact_notification_status(new_status_rows, process_day, notification_type): table = FactNotificationStatus.__table__ FactNotificationStatus.query.filter( FactNotificationStatus.bst_date == process_day, FactNotificationStatus.notification_type == notification_type, - FactNotificationStatus.service_id == service_id, ).delete() for row in new_status_rows: @@ -77,7 +78,7 @@ def update_fact_notification_status(new_status_rows, process_day, notification_t insert(table).values( bst_date=process_day, template_id=row.template_id, - service_id=service_id, + service_id=row.service_id, job_id=row.job_id, notification_type=notification_type, key_type=row.key_type, diff --git a/tests/app/celery/test_reporting_tasks.py b/tests/app/celery/test_reporting_tasks.py index 9e0345200..a8de19465 100644 --- a/tests/app/celery/test_reporting_tasks.py +++ b/tests/app/celery/test_reporting_tasks.py @@ -10,7 +10,7 @@ from app.celery.reporting_tasks import ( create_nightly_billing, create_nightly_billing_for_day, create_nightly_notification_status, - create_nightly_notification_status_for_service_and_day, + create_nightly_notification_status_for_day, ) from app.config import QueueNames from app.dao.fact_billing_dao import get_rate @@ -63,7 +63,7 @@ def test_create_nightly_billing_triggers_tasks_for_days(notify_api, mocker, day_ @freeze_time('2019-08-01') def test_create_nightly_notification_status_triggers_tasks(notify_api, sample_service, mocker): - mock_celery = mocker.patch('app.celery.reporting_tasks.create_nightly_notification_status_for_service_and_day') + mock_celery = mocker.patch('app.celery.reporting_tasks.create_nightly_notification_status_for_day') create_nightly_notification_status() assert mock_celery.apply_async.call_count == ( @@ -80,7 +80,6 @@ def test_create_nightly_notification_status_triggers_tasks(notify_api, sample_se kwargs={ 'process_day': process_date, 'notification_type': notification_type, - 'service_id': sample_service.id, }, queue=QueueNames.REPORTING ) @@ -90,7 +89,6 @@ def test_create_nightly_notification_status_triggers_tasks(notify_api, sample_se kwargs={ 'process_day': process_date, 'notification_type': LETTER_TYPE, - 'service_id': sample_service.id, }, queue=QueueNames.REPORTING ) @@ -509,7 +507,7 @@ def test_create_nightly_billing_for_day_update_when_record_exists( assert records[0].updated_at -def test_create_nightly_notification_status_for_service_and_day(notify_db_session): +def test_create_nightly_notification_status_for_day(notify_db_session): first_service = create_service(service_name='First Service') first_template = create_template(service=first_service) second_service = create_service(service_name='second Service') @@ -540,9 +538,9 @@ def test_create_nightly_notification_status_for_service_and_day(notify_db_sessio assert len(FactNotificationStatus.query.all()) == 0 - create_nightly_notification_status_for_service_and_day(str(process_day), first_service.id, 'sms') - create_nightly_notification_status_for_service_and_day(str(process_day), second_service.id, 'email') - create_nightly_notification_status_for_service_and_day(str(process_day), third_service.id, 'letter') + create_nightly_notification_status_for_day(str(process_day), 'sms') + create_nightly_notification_status_for_day(str(process_day), 'email') + create_nightly_notification_status_for_day(str(process_day), 'letter') new_fact_data = FactNotificationStatus.query.order_by( FactNotificationStatus.notification_type @@ -577,13 +575,13 @@ def test_create_nightly_notification_status_for_service_and_day(notify_db_sessio assert new_fact_data[2].key_type == KEY_TYPE_NORMAL -def test_create_nightly_notification_status_for_service_and_day_overwrites_old_data(notify_db_session): +def test_create_nightly_notification_status_for_day_overwrites_old_data(notify_db_session): first_service = create_service(service_name='First Service') first_template = create_template(service=first_service) create_notification(template=first_template, status='delivered') process_day = date.today() - create_nightly_notification_status_for_service_and_day(str(process_day), first_service.id, 'sms') + create_nightly_notification_status_for_day(str(process_day), 'sms') new_fact_data = FactNotificationStatus.query.order_by( FactNotificationStatus.bst_date, @@ -594,7 +592,7 @@ def test_create_nightly_notification_status_for_service_and_day_overwrites_old_d assert new_fact_data[0].notification_count == 1 create_notification(template=first_template, status='delivered') - create_nightly_notification_status_for_service_and_day(str(process_day), first_service.id, 'sms') + create_nightly_notification_status_for_day(str(process_day), 'sms') updated_fact_data = FactNotificationStatus.query.order_by( FactNotificationStatus.bst_date, @@ -607,7 +605,7 @@ def test_create_nightly_notification_status_for_service_and_day_overwrites_old_d # the job runs at 12:30am London time. 04/01 is in BST. @freeze_time('2019-04-01T23:30') -def test_create_nightly_notification_status_for_service_and_day_respects_bst(sample_template): +def test_create_nightly_notification_status_for_day_respects_bst(sample_template): create_notification(sample_template, status='delivered', created_at=datetime(2019, 4, 1, 23, 0)) # too new create_notification(sample_template, status='created', created_at=datetime(2019, 4, 1, 22, 59)) @@ -615,7 +613,7 @@ def test_create_nightly_notification_status_for_service_and_day_respects_bst(sam create_notification(sample_template, status='delivered', created_at=datetime(2019, 3, 31, 22, 59)) # too old - create_nightly_notification_status_for_service_and_day('2019-04-01', sample_template.service_id, 'sms') + create_nightly_notification_status_for_day('2019-04-01', 'sms') noti_status = FactNotificationStatus.query.order_by(FactNotificationStatus.bst_date).all() assert len(noti_status) == 1