From ed611f982c4f11e9aca434ca8db7922f2e6ec9be Mon Sep 17 00:00:00 2001 From: Rebecca Law Date: Thu, 18 Jul 2019 15:29:54 +0100 Subject: [PATCH 1/3] We found a problem with the report tasks that populate the fact tables (or statistic tables). It is possible that the notification status can change for notifications after 4 days. This PR updates those queries to look in either Notification or NotificationHistory. Since the data does not exist in both tables at the same time we can do with and not worry about the data retention. The query will iterate over each service, then each notification type and query the data if no results then try the history table. --- app/dao/fact_billing_dao.py | 136 ++++++++++-------- app/dao/fact_notification_status_dao.py | 51 +++++-- .../dao/test_fact_notification_status_dao.py | 10 +- 3 files changed, 125 insertions(+), 72 deletions(-) diff --git a/app/dao/fact_billing_dao.py b/app/dao/fact_billing_dao.py index 8997626fb..92e27189e 100644 --- a/app/dao/fact_billing_dao.py +++ b/app/dao/fact_billing_dao.py @@ -146,69 +146,88 @@ def delete_billing_data_for_service_for_day(process_day, service_id): def fetch_billing_data_for_day(process_day, service_id=None): start_date = convert_bst_to_utc(datetime.combine(process_day, time.min)) end_date = convert_bst_to_utc(datetime.combine(process_day + timedelta(days=1), time.min)) - # use notification_history if process day is older than 7 days - # this is useful if we need to rebuild the ft_billing table for a date older than 7 days ago. current_app.logger.info("Populate ft_billing for {} to {}".format(start_date, end_date)) - table = Notification - if start_date < datetime.utcnow() - timedelta(days=7): - table = NotificationHistory transit_data = [] - for notification_type in (SMS_TYPE, EMAIL_TYPE, LETTER_TYPE): - billable_type_list = { - SMS_TYPE: NOTIFICATION_STATUS_TYPES_BILLABLE, - EMAIL_TYPE: NOTIFICATION_STATUS_TYPES_BILLABLE, - LETTER_TYPE: NOTIFICATION_STATUS_TYPES_BILLABLE_FOR_LETTERS - } - query = db.session.query( - table.template_id, - table.service_id, - table.notification_type, - func.coalesce(table.sent_by, - case( - [ - (table.notification_type == 'letter', 'dvla'), - (table.notification_type == 'sms', 'unknown'), - (table.notification_type == 'email', 'ses') - ]), - ).label('sent_by'), - func.coalesce(table.rate_multiplier, 1).cast(Integer).label('rate_multiplier'), - func.coalesce(table.international, False).label('international'), - case( - [ - (table.notification_type == 'letter', table.billable_units), - ] - ).label('letter_page_count'), - func.sum(table.billable_units).label('billable_units'), - func.count().label('notifications_sent'), - Service.crown, - func.coalesce(table.postage, 'none').label('postage') - ).filter( - table.status.in_(billable_type_list[notification_type]), - table.key_type != KEY_TYPE_TEST, - table.created_at >= start_date, - table.created_at < end_date, - table.notification_type == notification_type - ).group_by( - table.template_id, - table.service_id, - table.notification_type, - 'sent_by', - 'letter_page_count', - table.rate_multiplier, - table.international, - Service.crown, - table.postage, - ).join( - Service - ) - if service_id: - query = query.filter(table.service_id == service_id) + if not service_id: + service_ids = [x.id for x in Service.query.all()] + else: + service_ids = [service_id] + for id_of_service in service_ids: + for notification_type in (SMS_TYPE, EMAIL_TYPE, LETTER_TYPE): + results = _query_for_billing_data( + table=Notification, + notification_type=notification_type, + start_date=start_date, + end_date=end_date, + service_id = id_of_service + ) + # If data has been purged from Notification then use NotificationHistory + if len(results) == 0: + results = _query_for_billing_data( + table=NotificationHistory, + notification_type=notification_type, + start_date=start_date, + end_date=end_date, + service_id=id_of_service + ) - transit_data = transit_data + query.all() + transit_data = transit_data + results return transit_data +def _query_for_billing_data(table, notification_type, start_date, end_date, service_id): + billable_type_list = { + SMS_TYPE: NOTIFICATION_STATUS_TYPES_BILLABLE, + EMAIL_TYPE: NOTIFICATION_STATUS_TYPES_BILLABLE, + LETTER_TYPE: NOTIFICATION_STATUS_TYPES_BILLABLE_FOR_LETTERS + } + query = db.session.query( + table.template_id, + table.service_id, + table.notification_type, + func.coalesce(table.sent_by, + case( + [ + (table.notification_type == 'letter', 'dvla'), + (table.notification_type == 'sms', 'unknown'), + (table.notification_type == 'email', 'ses') + ]), + ).label('sent_by'), + func.coalesce(table.rate_multiplier, 1).cast(Integer).label('rate_multiplier'), + func.coalesce(table.international, False).label('international'), + case( + [ + (table.notification_type == 'letter', table.billable_units), + ] + ).label('letter_page_count'), + func.sum(table.billable_units).label('billable_units'), + func.count().label('notifications_sent'), + Service.crown, + func.coalesce(table.postage, 'none').label('postage') + ).filter( + table.status.in_(billable_type_list[notification_type]), + table.key_type != KEY_TYPE_TEST, + table.created_at >= start_date, + table.created_at < end_date, + table.notification_type == notification_type, + table.service_id == service_id + ).group_by( + table.template_id, + table.service_id, + table.notification_type, + 'sent_by', + 'letter_page_count', + table.rate_multiplier, + table.international, + Service.crown, + table.postage, + ).join( + Service + ) + return query.all() + + def get_rates_for_billing(): non_letter_rates = Rate.query.order_by(desc(Rate.valid_from)).all() letter_rates = LetterRate.query.order_by(desc(LetterRate.start_date)).all() @@ -265,6 +284,11 @@ def update_fact_billing(data, process_day): data.letter_page_count, data.postage) billing_record = create_billing_record(data, rate, process_day) + + FactBilling.query.filter( + FactBilling.bst_date == process_day + ).delete() + table = FactBilling.__table__ ''' This uses the Postgres upsert to avoid race conditions when two threads try to insert diff --git a/app/dao/fact_notification_status_dao.py b/app/dao/fact_notification_status_dao.py index df509ecd9..1b9164b73 100644 --- a/app/dao/fact_notification_status_dao.py +++ b/app/dao/fact_notification_status_dao.py @@ -9,23 +9,49 @@ from sqlalchemy.types import DateTime, Integer from app import db from app.models import ( - Notification, NotificationHistory, FactNotificationStatus, KEY_TYPE_TEST, Service, Template, - NOTIFICATION_CANCELLED + EMAIL_TYPE, + FactNotificationStatus, + KEY_TYPE_TEST, + LETTER_TYPE, + Notification, + NotificationHistory, + NOTIFICATION_CANCELLED, + Service, + SMS_TYPE, + Template, ) from app.utils import get_london_midnight_in_utc, midnight_n_days_ago, get_london_month_from_utc_column -def fetch_notification_status_for_day(process_day, service_id=None): +def fetch_notification_status_for_day(process_day): start_date = convert_bst_to_utc(datetime.combine(process_day, time.min)) end_date = convert_bst_to_utc(datetime.combine(process_day + timedelta(days=1), time.min)) # use notification_history if process day is older than 7 days # this is useful if we need to rebuild the ft_billing table for a date older than 7 days ago. current_app.logger.info("Fetch ft_notification_status for {} to {}".format(start_date, end_date)) - table = Notification - if start_date < datetime.utcnow() - timedelta(days=7): - table = NotificationHistory - transit_data = db.session.query( + all_data_for_process_day = [] + service_ids = [x.id for x in Service.query.all()] + # for each service + # for each notification type + # query notifications for day + # if no rows try notificationHistory + for service_id in service_ids: + for notification_type in [EMAIL_TYPE, SMS_TYPE, LETTER_TYPE]: + table = Notification + data_for_service_and_type = query_for_fact_status_data(table, start_date, end_date, notification_type, service_id) + + if len(data_for_service_and_type) == 0: + table = NotificationHistory + data_for_service_and_type = query_for_fact_status_data(table, start_date, end_date, notification_type, service_id) + + all_data_for_process_day = all_data_for_process_day + data_for_service_and_type + + return all_data_for_process_day + + +def query_for_fact_status_data(table, start_date, end_date, notification_type, service_id): + query = db.session.query( table.template_id, table.service_id, func.coalesce(table.job_id, '00000000-0000-0000-0000-000000000000').label('job_id'), @@ -35,7 +61,9 @@ def fetch_notification_status_for_day(process_day, service_id=None): func.count().label('notification_count') ).filter( table.created_at >= start_date, - table.created_at < end_date + table.created_at < end_date, + table.notification_type == notification_type, + table.service_id == service_id ).group_by( table.template_id, table.service_id, @@ -44,11 +72,7 @@ def fetch_notification_status_for_day(process_day, service_id=None): table.key_type, table.status ) - - if service_id: - transit_data = transit_data.filter(table.service_id == service_id) - - return transit_data.all() + return query.all() def update_fact_notification_status(data, process_day): @@ -56,6 +80,7 @@ def update_fact_notification_status(data, process_day): FactNotificationStatus.query.filter( FactNotificationStatus.bst_date == process_day ).delete() + for row in data: stmt = insert(table).values( bst_date=process_day, diff --git a/tests/app/dao/test_fact_notification_status_dao.py b/tests/app/dao/test_fact_notification_status_dao.py index 3406c7ec3..bc9ffa698 100644 --- a/tests/app/dao/test_fact_notification_status_dao.py +++ b/tests/app/dao/test_fact_notification_status_dao.py @@ -18,7 +18,10 @@ from app.dao.fact_notification_status_dao import ( from app.models import FactNotificationStatus, KEY_TYPE_TEST, KEY_TYPE_TEAM, EMAIL_TYPE, SMS_TYPE, LETTER_TYPE from freezegun import freeze_time -from tests.app.db import create_notification, create_service, create_template, create_ft_notification_status, create_job +from tests.app.db import ( + create_notification, create_service, create_template, create_ft_notification_status, + create_job, create_notification_history +) def test_update_fact_notification_status(notify_db_session): @@ -31,8 +34,9 @@ def test_update_fact_notification_status(notify_db_session): create_notification(template=first_template, status='delivered') create_notification(template=first_template, created_at=datetime.utcnow() - timedelta(days=1)) - create_notification(template=second_template, status='temporary-failure') - create_notification(template=second_template, created_at=datetime.utcnow() - timedelta(days=1)) + # simulate a service with data retention - data has been moved to history and does not exist in notifications + create_notification_history(template=second_template, status='temporary-failure') + create_notification_history(template=second_template, created_at=datetime.utcnow() - timedelta(days=1)) create_notification(template=third_template, status='created') create_notification(template=third_template, created_at=datetime.utcnow() - timedelta(days=1)) From a52c65ea2940f2b9efea1d80bd30f68008985bd6 Mon Sep 17 00:00:00 2001 From: Rebecca Law Date: Thu, 18 Jul 2019 16:24:06 +0100 Subject: [PATCH 2/3] Remove the delete query when updating the ft_billing. It's in the wrong place and we also should not need it. --- app/dao/fact_billing_dao.py | 6 +----- app/dao/fact_notification_status_dao.py | 19 ++++++++++++++----- 2 files changed, 15 insertions(+), 10 deletions(-) diff --git a/app/dao/fact_billing_dao.py b/app/dao/fact_billing_dao.py index 92e27189e..b27ac7ea1 100644 --- a/app/dao/fact_billing_dao.py +++ b/app/dao/fact_billing_dao.py @@ -159,7 +159,7 @@ def fetch_billing_data_for_day(process_day, service_id=None): notification_type=notification_type, start_date=start_date, end_date=end_date, - service_id = id_of_service + service_id=id_of_service ) # If data has been purged from Notification then use NotificationHistory if len(results) == 0: @@ -285,10 +285,6 @@ def update_fact_billing(data, process_day): data.postage) billing_record = create_billing_record(data, rate, process_day) - FactBilling.query.filter( - FactBilling.bst_date == process_day - ).delete() - table = FactBilling.__table__ ''' This uses the Postgres upsert to avoid race conditions when two threads try to insert diff --git a/app/dao/fact_notification_status_dao.py b/app/dao/fact_notification_status_dao.py index 1b9164b73..2836e62df 100644 --- a/app/dao/fact_notification_status_dao.py +++ b/app/dao/fact_notification_status_dao.py @@ -38,13 +38,22 @@ def fetch_notification_status_for_day(process_day): # if no rows try notificationHistory for service_id in service_ids: for notification_type in [EMAIL_TYPE, SMS_TYPE, LETTER_TYPE]: - table = Notification - data_for_service_and_type = query_for_fact_status_data(table, start_date, end_date, notification_type, service_id) + data_for_service_and_type = query_for_fact_status_data( + table=Notification, + start_date=start_date, + end_date=end_date, + notification_type=notification_type, + service_id=service_id + ) if len(data_for_service_and_type) == 0: - table = NotificationHistory - data_for_service_and_type = query_for_fact_status_data(table, start_date, end_date, notification_type, service_id) - + data_for_service_and_type = query_for_fact_status_data( + table=NotificationHistory, + start_date=start_date, + end_date=end_date, + notification_type=notification_type, + service_id=service_id + ) all_data_for_process_day = all_data_for_process_day + data_for_service_and_type return all_data_for_process_day From 996dcdd88c00d46ad65f8bfab37dd3fea7fd1e5f Mon Sep 17 00:00:00 2001 From: Rebecca Law Date: Thu, 18 Jul 2019 16:45:27 +0100 Subject: [PATCH 3/3] Increase the number of days we rebuild the tables for --- app/celery/reporting_tasks.py | 4 ++-- tests/app/celery/test_reporting_tasks.py | 18 +++++++++--------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/app/celery/reporting_tasks.py b/app/celery/reporting_tasks.py index 94e8135b4..07e0a66c6 100644 --- a/app/celery/reporting_tasks.py +++ b/app/celery/reporting_tasks.py @@ -24,7 +24,7 @@ def create_nightly_billing(day_start=None): else: # When calling the task its a string in the format of "YYYY-MM-DD" day_start = datetime.strptime(day_start, "%Y-%m-%d").date() - for i in range(0, 4): + for i in range(0, 10): process_day = day_start - timedelta(days=i) transit_data = fetch_billing_data_for_day(process_day=process_day) @@ -47,7 +47,7 @@ def create_nightly_notification_status(day_start=None): else: # When calling the task its a string in the format of "YYYY-MM-DD" day_start = datetime.strptime(day_start, "%Y-%m-%d").date() - for i in range(0, 4): + for i in range(0, 10): process_day = day_start - timedelta(days=i) transit_data = fetch_notification_status_for_day(process_day=process_day) diff --git a/tests/app/celery/test_reporting_tasks.py b/tests/app/celery/test_reporting_tasks.py index a4481990d..d4d11773c 100644 --- a/tests/app/celery/test_reporting_tasks.py +++ b/tests/app/celery/test_reporting_tasks.py @@ -440,8 +440,8 @@ def test_create_nightly_notification_status(notify_db_session): create_notification(template=first_template, status='delivered') create_notification(template=first_template, status='delivered', created_at=datetime.utcnow() - timedelta(days=1)) create_notification(template=first_template, status='delivered', created_at=datetime.utcnow() - timedelta(days=2)) - create_notification(template=first_template, status='delivered', created_at=datetime.utcnow() - timedelta(days=4)) - create_notification(template=first_template, status='delivered', created_at=datetime.utcnow() - timedelta(days=5)) + create_notification(template=first_template, status='delivered', created_at=datetime.utcnow() - timedelta(days=10)) + create_notification(template=first_template, status='delivered', created_at=datetime.utcnow() - timedelta(days=10)) create_notification(template=second_template, status='temporary-failure') create_notification(template=second_template, status='temporary-failure', @@ -449,15 +449,15 @@ def test_create_nightly_notification_status(notify_db_session): create_notification(template=second_template, status='temporary-failure', created_at=datetime.utcnow() - timedelta(days=2)) create_notification(template=second_template, status='temporary-failure', - created_at=datetime.utcnow() - timedelta(days=4)) + created_at=datetime.utcnow() - timedelta(days=10)) create_notification(template=second_template, status='temporary-failure', - created_at=datetime.utcnow() - timedelta(days=5)) + created_at=datetime.utcnow() - timedelta(days=10)) create_notification(template=third_template, status='created') create_notification(template=third_template, status='created', created_at=datetime.utcnow() - timedelta(days=1)) create_notification(template=third_template, status='created', created_at=datetime.utcnow() - timedelta(days=2)) - create_notification(template=third_template, status='created', created_at=datetime.utcnow() - timedelta(days=4)) - create_notification(template=third_template, status='created', created_at=datetime.utcnow() - timedelta(days=5)) + create_notification(template=third_template, status='created', created_at=datetime.utcnow() - timedelta(days=10)) + create_notification(template=third_template, status='created', created_at=datetime.utcnow() - timedelta(days=10)) assert len(FactNotificationStatus.query.all()) == 0 @@ -467,7 +467,7 @@ def test_create_nightly_notification_status(notify_db_session): FactNotificationStatus.notification_type ).all() assert len(new_data) == 9 - assert str(new_data[0].bst_date) == datetime.strftime(datetime.utcnow() - timedelta(days=4), "%Y-%m-%d") + assert str(new_data[0].bst_date) == datetime.strftime(datetime.utcnow() - timedelta(days=10), "%Y-%m-%d") assert str(new_data[3].bst_date) == datetime.strftime(datetime.utcnow() - timedelta(days=2), "%Y-%m-%d") assert str(new_data[6].bst_date) == datetime.strftime(datetime.utcnow() - timedelta(days=1), "%Y-%m-%d") @@ -482,10 +482,10 @@ def test_create_nightly_notification_status_respects_bst(sample_template): create_notification(sample_template, status='temporary-failure', created_at=datetime(2019, 3, 31, 22, 59)) - # we create records for last four days + # we create records for last ten days create_notification(sample_template, status='sending', created_at=datetime(2019, 3, 29, 0, 0)) - create_notification(sample_template, status='delivered', created_at=datetime(2019, 3, 28, 23, 59)) # too old + create_notification(sample_template, status='delivered', created_at=datetime(2019, 3, 22, 23, 59)) # too old create_nightly_notification_status()