From 36dd750637d2535d31562346244bdf96efaa4937 Mon Sep 17 00:00:00 2001 From: Leo Hemsted Date: Thu, 15 Aug 2019 16:57:31 +0100 Subject: [PATCH] split up reporting tasks in to separate tasks per day to try and speed up overall time by parallelising --- app/celery/reporting_tasks.py | 68 ++++++-- app/dao/fact_notification_status_dao.py | 3 +- tests/app/celery/test_reporting_tasks.py | 194 ++++++++++------------- 3 files changed, 141 insertions(+), 124 deletions(-) diff --git a/app/celery/reporting_tasks.py b/app/celery/reporting_tasks.py index 6f9ebb2c0..692aae20f 100644 --- a/app/celery/reporting_tasks.py +++ b/app/celery/reporting_tasks.py @@ -33,13 +33,35 @@ def create_nightly_billing(day_start=None): for i in range(0, 4): process_day = day_start - timedelta(days=i) - transit_data = fetch_billing_data_for_day(process_day=process_day) + create_nightly_billing_for_day.apply_async( + kwargs={'process_day': process_day.isoformat()}, + queue=QueueNames.REPORTING + ) - for data in transit_data: - update_fact_billing(data, process_day) - current_app.logger.info( - "create-nightly-billing task complete. {} rows updated for day: {}".format(len(transit_data), process_day)) +@notify_celery.task(name="create-nightly-billing-for-day") +@statsd(namespace="tasks") +def create_nightly_billing_for_day(process_day): + process_day = datetime.strptime(process_day, "%Y-%m-%d").date() + + start = datetime.utcnow() + transit_data = fetch_billing_data_for_day(process_day=process_day) + end = datetime.utcnow() + + current_app.logger.info('create-nightly-billing-for-day {} fetched in {} seconds'.format( + process_day, + (end - start).seconds) + ) + + for data in transit_data: + update_fact_billing(data, process_day) + + current_app.logger.info( + "create-nightly-billing-for-day task complete. {} rows updated for day: {}".format( + len(transit_data), + process_day + ) + ) @notify_celery.task(name="create-nightly-notification-status") @@ -56,17 +78,29 @@ def create_nightly_notification_status(day_start=None): for i in range(0, 4): process_day = day_start - timedelta(days=i) - transit_data = fetch_notification_status_for_day(process_day=process_day) - - update_fact_notification_status(transit_data, process_day) - - current_app.logger.info( - "create-nightly-notification-status task: {} rows updated for day: {}".format( - len(transit_data), process_day - ) + create_nightly_notification_status_for_day.apply_async( + kwargs={'process_day': process_day.isoformat()}, + queue=QueueNames.REPORTING ) - # delete jobs need to happen after nightly notification status is recorded to avoid conflict between the two tasks - delete_email_notifications_older_than_retention.apply_async(queue=QueueNames.PERIODIC) - delete_sms_notifications_older_than_retention.apply_async(queue=QueueNames.PERIODIC) - delete_letter_notifications_older_than_retention.apply_async(queue=QueueNames.PERIODIC) + +@notify_celery.task(name="create-nightly-notification-status-for-day") +@statsd(namespace="tasks") +def create_nightly_notification_status_for_day(process_day): + process_day = datetime.strptime(process_day, "%Y-%m-%d").date() + + start = datetime.utcnow() + transit_data = fetch_notification_status_for_day(process_day=process_day) + end = datetime.utcnow() + current_app.logger.info('create-nightly-notification-status-for-day {} fetched in {} seconds'.format( + process_day, + (end - start).seconds) + ) + + update_fact_notification_status(transit_data, process_day) + + current_app.logger.info( + "create-nightly-notification-status-for-day task complete: {} rows updated for day: {}".format( + len(transit_data), process_day + ) + ) diff --git a/app/dao/fact_notification_status_dao.py b/app/dao/fact_notification_status_dao.py index 9adbce024..9adfdd7a0 100644 --- a/app/dao/fact_notification_status_dao.py +++ b/app/dao/fact_notification_status_dao.py @@ -80,7 +80,8 @@ def query_for_fact_status_data(table, start_date, end_date, notification_type, s table.created_at >= start_date, table.created_at < end_date, table.notification_type == notification_type, - table.service_id == service_id + table.service_id == service_id, + table.key_type != KEY_TYPE_TEST ).group_by( table.template_id, table.service_id, diff --git a/tests/app/celery/test_reporting_tasks.py b/tests/app/celery/test_reporting_tasks.py index 3a5501ab9..bb57ee0e4 100644 --- a/tests/app/celery/test_reporting_tasks.py +++ b/tests/app/celery/test_reporting_tasks.py @@ -4,7 +4,12 @@ from decimal import Decimal import pytest from freezegun import freeze_time -from app.celery.reporting_tasks import create_nightly_billing, create_nightly_notification_status +from app.celery.reporting_tasks import ( + create_nightly_billing, + create_nightly_notification_status, + create_nightly_billing_for_day, + create_nightly_notification_status_for_day, +) from app.dao.fact_billing_dao import get_rate from app.models import ( FactBilling, @@ -28,10 +33,38 @@ def mocker_get_rate( return Decimal(0) +@freeze_time('2019-08-01') +@pytest.mark.parametrize('day_start, expected_kwargs', [ + (None, ['2019-07-31', '2019-07-30', '2019-07-29', '2019-07-28']), + ('2019-07-21', ['2019-07-21', '2019-07-20', '2019-07-19', '2019-07-18']), +]) +def test_create_nightly_billing_triggers_tasks_for_days(notify_api, mocker, day_start, expected_kwargs): + mock_celery = mocker.patch('app.celery.reporting_tasks.create_nightly_billing_for_day') + create_nightly_billing(day_start) + + assert mock_celery.apply_async.call_count == 4 + for i in range(4): + assert mock_celery.apply_async.call_args_list[i][1]['kwargs'] == {'process_day': expected_kwargs[i]} + + +@freeze_time('2019-08-01') +@pytest.mark.parametrize('day_start, expected_kwargs', [ + (None, ['2019-07-31', '2019-07-30', '2019-07-29', '2019-07-28']), + ('2019-07-21', ['2019-07-21', '2019-07-20', '2019-07-19', '2019-07-18']), +]) +def test_create_nightly_notification_status_triggers_tasks_for_days(notify_api, mocker, day_start, expected_kwargs): + mock_celery = mocker.patch('app.celery.reporting_tasks.create_nightly_notification_status_for_day') + create_nightly_notification_status(day_start) + + assert mock_celery.apply_async.call_count == 4 + for i in range(4): + assert mock_celery.apply_async.call_args_list[i][1]['kwargs'] == {'process_day': expected_kwargs[i]} + + @pytest.mark.parametrize('second_rate, records_num, billable_units, multiplier', [(1.0, 1, 2, [1]), (2.0, 2, 1, [1, 2])]) -def test_create_nightly_billing_sms_rate_multiplier( +def test_create_nightly_billing_for_day_sms_rate_multiplier( sample_service, sample_template, mocker, @@ -69,7 +102,7 @@ def test_create_nightly_billing_sms_rate_multiplier( # Celery expects the arguments to be a string or primitive type. yesterday_str = datetime.strftime(yesterday, "%Y-%m-%d") - create_nightly_billing(yesterday_str) + create_nightly_billing_for_day(yesterday_str) records = FactBilling.query.order_by('rate_multiplier').all() assert len(records) == records_num for i, record in enumerate(records): @@ -79,7 +112,7 @@ def test_create_nightly_billing_sms_rate_multiplier( assert record.rate_multiplier == multiplier[i] -def test_create_nightly_billing_different_templates( +def test_create_nightly_billing_for_day_different_templates( sample_service, sample_template, sample_email_template, @@ -111,7 +144,7 @@ def test_create_nightly_billing_different_templates( assert len(records) == 0 # Celery expects the arguments to be a string or primitive type. yesterday_str = datetime.strftime(yesterday, "%Y-%m-%d") - create_nightly_billing(yesterday_str) + create_nightly_billing_for_day(yesterday_str) records = FactBilling.query.order_by('rate_multiplier').all() assert len(records) == 2 @@ -125,7 +158,7 @@ def test_create_nightly_billing_different_templates( assert record.rate_multiplier == multiplier[i] -def test_create_nightly_billing_different_sent_by( +def test_create_nightly_billing_for_day_different_sent_by( sample_service, sample_template, sample_email_template, @@ -159,7 +192,7 @@ def test_create_nightly_billing_different_sent_by( # Celery expects the arguments to be a string or primitive type. yesterday_str = datetime.strftime(yesterday, "%Y-%m-%d") - create_nightly_billing(yesterday_str) + create_nightly_billing_for_day(yesterday_str) records = FactBilling.query.order_by('rate_multiplier').all() assert len(records) == 2 @@ -170,7 +203,7 @@ def test_create_nightly_billing_different_sent_by( assert record.rate_multiplier == 1.0 -def test_create_nightly_billing_different_letter_postage( +def test_create_nightly_billing_for_day_different_letter_postage( notify_db_session, sample_letter_template, mocker): @@ -199,7 +232,7 @@ def test_create_nightly_billing_different_letter_postage( assert len(records) == 0 # Celery expects the arguments to be a string or primitive type. yesterday_str = datetime.strftime(yesterday, "%Y-%m-%d") - create_nightly_billing(yesterday_str) + create_nightly_billing_for_day(yesterday_str) records = FactBilling.query.order_by('postage').all() assert len(records) == 2 @@ -216,7 +249,7 @@ def test_create_nightly_billing_different_letter_postage( assert records[1].billable_units == 2 -def test_create_nightly_billing_letter( +def test_create_nightly_billing_for_day_letter( sample_service, sample_letter_template, mocker): @@ -238,7 +271,7 @@ def test_create_nightly_billing_letter( assert len(records) == 0 # Celery expects the arguments to be a string or primitive type. yesterday_str = datetime.strftime(yesterday, "%Y-%m-%d") - create_nightly_billing(yesterday_str) + create_nightly_billing_for_day(yesterday_str) records = FactBilling.query.order_by('rate_multiplier').all() assert len(records) == 1 record = records[0] @@ -249,7 +282,7 @@ def test_create_nightly_billing_letter( assert record.rate_multiplier == 2.0 -def test_create_nightly_billing_null_sent_by_sms( +def test_create_nightly_billing_for_day_null_sent_by_sms( sample_service, sample_template, mocker): @@ -272,7 +305,7 @@ def test_create_nightly_billing_null_sent_by_sms( # Celery expects the arguments to be a string or primitive type. yesterday_str = datetime.strftime(yesterday, "%Y-%m-%d") - create_nightly_billing(yesterday_str) + create_nightly_billing_for_day(yesterday_str) records = FactBilling.query.all() assert len(records) == 1 @@ -284,39 +317,6 @@ def test_create_nightly_billing_null_sent_by_sms( assert record.provider == 'unknown' -@freeze_time('2018-01-15T03:30:00') -def test_create_nightly_billing_consolidate_from_3_days_delta( - sample_template, - mocker): - - mocker.patch('app.dao.fact_billing_dao.get_rate', side_effect=mocker_get_rate) - - # create records from 11th to 15th - for i in range(0, 5): - create_notification( - created_at=datetime.now() - timedelta(days=i), - template=sample_template, - status='delivered', - sent_by=None, - international=False, - rate_multiplier=1.0, - billable_units=1, - ) - - notification = Notification.query.order_by(Notification.created_at).all() - assert datetime.date(notification[0].created_at) == date(2018, 1, 11) - - records = FactBilling.query.all() - assert len(records) == 0 - - create_nightly_billing() - records = FactBilling.query.order_by(FactBilling.bst_date).all() - - assert len(records) == 4 - assert records[0].bst_date == date(2018, 1, 11) - assert records[-1].bst_date == date(2018, 1, 14) - - def test_get_rate_for_letter_latest(notify_db_session): # letter rates should be passed into the get_rate function as a tuple of start_date, crown, sheet_count, # rate and post_class @@ -341,50 +341,54 @@ def test_get_rate_for_sms_and_email(notify_db_session): assert rate == Decimal(0) -@freeze_time('2018-03-26T23:30:00') +@freeze_time('2018-03-30T01:00:00') # summer time starts on 2018-03-25 -def test_create_nightly_billing_use_BST( +def test_create_nightly_billing_for_day_use_BST( sample_service, sample_template, mocker): mocker.patch('app.dao.fact_billing_dao.get_rate', side_effect=mocker_get_rate) + # too late create_notification( - created_at=datetime(2018, 3, 25, 12, 0), + created_at=datetime(2018, 3, 25, 23, 1), template=sample_template, status='delivered', - sent_by=None, - international=False, rate_multiplier=1.0, billable_units=1, ) create_notification( - created_at=datetime(2018, 3, 25, 23, 5), + created_at=datetime(2018, 3, 25, 22, 59), template=sample_template, status='delivered', - sent_by=None, - international=False, rate_multiplier=1.0, - billable_units=1, + billable_units=2, ) - notifications = Notification.query.order_by(Notification.created_at).all() - assert len(notifications) == 2 - records = FactBilling.query.all() - assert len(records) == 0 + # too early + create_notification( + created_at=datetime(2018, 3, 24, 23, 59), + template=sample_template, + status='delivered', + rate_multiplier=1.0, + billable_units=4, + ) - create_nightly_billing() + assert Notification.query.count() == 3 + assert FactBilling.query.count() == 0 + + create_nightly_billing_for_day('2018-03-25') records = FactBilling.query.order_by(FactBilling.bst_date).all() - assert len(records) == 2 + assert len(records) == 1 assert records[0].bst_date == date(2018, 3, 25) - assert records[-1].bst_date == date(2018, 3, 26) + assert records[0].billable_units == 2 @freeze_time('2018-01-15T03:30:00') -def test_create_nightly_billing_update_when_record_exists( +def test_create_nightly_billing_for_day_update_when_record_exists( sample_service, sample_template, mocker): @@ -404,7 +408,7 @@ def test_create_nightly_billing_update_when_record_exists( records = FactBilling.query.all() assert len(records) == 0 - create_nightly_billing() + create_nightly_billing_for_day('2018-01-14') records = FactBilling.query.order_by(FactBilling.bst_date).all() assert len(records) == 1 @@ -423,13 +427,14 @@ def test_create_nightly_billing_update_when_record_exists( ) # run again, make sure create_nightly_billing() updates with no error - create_nightly_billing() + create_nightly_billing_for_day('2018-01-14') assert len(records) == 1 assert records[0].billable_units == 2 assert records[0].updated_at -def test_create_nightly_notification_status(notify_db_session, mocker): +@freeze_time('2019-01-05') +def test_create_nightly_notification_status_for_day(notify_db_session, mocker): mocks = [ mocker.patch('app.celery.reporting_tasks.delete_email_notifications_older_than_retention'), mocker.patch('app.celery.reporting_tasks.delete_sms_notifications_older_than_retention'), @@ -444,36 +449,24 @@ def test_create_nightly_notification_status(notify_db_session, mocker): third_template = create_template(service=third_service, template_type='letter') create_notification(template=first_template, status='delivered') - create_notification(template=first_template, status='delivered', created_at=datetime.utcnow() - timedelta(days=1)) - create_notification(template=first_template, status='delivered', created_at=datetime.utcnow() - timedelta(days=2)) - create_notification(template=first_template, status='delivered', created_at=datetime.utcnow() - timedelta(days=10)) - create_notification(template=first_template, status='delivered', created_at=datetime.utcnow() - timedelta(days=10)) + create_notification(template=first_template, status='delivered', created_at=datetime(2019, 1, 1, 12, 0)) create_notification(template=second_template, status='temporary-failure') - create_notification(template=second_template, status='temporary-failure', created_at=datetime.utcnow() - timedelta(days=1)) # noqa - create_notification(template=second_template, status='temporary-failure', created_at=datetime.utcnow() - timedelta(days=2)) # noqa - create_notification(template=second_template, status='temporary-failure', created_at=datetime.utcnow() - timedelta(days=10)) # noqa - create_notification(template=second_template, status='temporary-failure', created_at=datetime.utcnow() - timedelta(days=10)) # noqa + create_notification(template=second_template, status='temporary-failure', created_at=datetime(2019, 1, 1, 12, 0)) create_notification(template=third_template, status='created') - create_notification(template=third_template, status='created', created_at=datetime.utcnow() - timedelta(days=1)) - create_notification(template=third_template, status='created', created_at=datetime.utcnow() - timedelta(days=2)) - create_notification(template=third_template, status='created', created_at=datetime.utcnow() - timedelta(days=10)) - create_notification(template=third_template, status='created', created_at=datetime.utcnow() - timedelta(days=10)) + create_notification(template=third_template, status='created', created_at=datetime(2019, 1, 1, 12, 0)) assert len(FactNotificationStatus.query.all()) == 0 - create_nightly_notification_status() - new_data = FactNotificationStatus.query.order_by( - FactNotificationStatus.bst_date, - ).all() - assert len(new_data) == 6 # - assert str(new_data[0].bst_date) == datetime.strftime(datetime.utcnow() - timedelta(days=2), "%Y-%m-%d") - assert str(new_data[1].bst_date) == datetime.strftime(datetime.utcnow() - timedelta(days=2), "%Y-%m-%d") - assert str(new_data[2].bst_date) == datetime.strftime(datetime.utcnow() - timedelta(days=2), "%Y-%m-%d") - assert str(new_data[3].bst_date) == datetime.strftime(datetime.utcnow() - timedelta(days=1), "%Y-%m-%d") - assert str(new_data[4].bst_date) == datetime.strftime(datetime.utcnow() - timedelta(days=1), "%Y-%m-%d") - assert str(new_data[5].bst_date) == datetime.strftime(datetime.utcnow() - timedelta(days=1), "%Y-%m-%d") + create_nightly_notification_status_for_day('2019-01-01') + + new_data = FactNotificationStatus.query.all() + + assert len(new_data) == 3 + assert new_data[0].bst_date == date(2019, 1, 1) + assert new_data[1].bst_date == date(2019, 1, 1) + assert new_data[2].bst_date == date(2019, 1, 1) for mock in mocks: mock.apply_async.assert_called_once_with(queue='periodic-tasks') @@ -481,7 +474,7 @@ def test_create_nightly_notification_status(notify_db_session, mocker): # the job runs at 12:30am London time. 04/01 is in BST. @freeze_time('2019-04-01T23:30') -def test_create_nightly_notification_status_respects_bst(sample_template, mocker): +def test_create_nightly_notification_status_for_day_respects_bst(sample_template, mocker): mocker.patch('app.celery.reporting_tasks.delete_email_notifications_older_than_retention') mocker.patch('app.celery.reporting_tasks.delete_sms_notifications_older_than_retention') mocker.patch('app.celery.reporting_tasks.delete_letter_notifications_older_than_retention') @@ -491,23 +484,12 @@ def test_create_nightly_notification_status_respects_bst(sample_template, mocker create_notification(sample_template, status='created', created_at=datetime(2019, 4, 1, 22, 59)) create_notification(sample_template, status='created', created_at=datetime(2019, 3, 31, 23, 0)) - create_notification(sample_template, status='temporary-failure', created_at=datetime(2019, 3, 31, 22, 59)) + create_notification(sample_template, status='delivered', created_at=datetime(2019, 3, 31, 22, 59)) # too old - # we create records for last ten days - create_notification(sample_template, status='sending', created_at=datetime(2019, 3, 29, 0, 0)) - - create_notification(sample_template, status='delivered', created_at=datetime(2019, 3, 22, 23, 59)) # too old - - create_nightly_notification_status() + create_nightly_notification_status_for_day('2019-04-01') noti_status = FactNotificationStatus.query.order_by(FactNotificationStatus.bst_date).all() - assert len(noti_status) == 3 + assert len(noti_status) == 1 - assert noti_status[0].bst_date == date(2019, 3, 29) - assert noti_status[0].notification_status == 'sending' - - assert noti_status[1].bst_date == date(2019, 3, 31) - assert noti_status[1].notification_status == 'temporary-failure' - - assert noti_status[2].bst_date == date(2019, 4, 1) - assert noti_status[2].notification_status == 'created' + assert noti_status[0].bst_date == date(2019, 4, 1) + assert noti_status[0].notification_status == 'created'