From 2262db4f622ea024b19b3682136ec1de066663e5 Mon Sep 17 00:00:00 2001 From: venusbb Date: Tue, 27 Mar 2018 10:37:56 +0100 Subject: [PATCH 1/2] Database tweak and BST bug fix --- app/celery/reporting_tasks.py | 158 +++++++++--------- app/commands.py | 44 +++-- migrations/versions/0183_alter_primary_key.py | 78 +++++++++ tests/app/celery/test_reporting_tasks.py | 56 ++++++- 4 files changed, 243 insertions(+), 93 deletions(-) create mode 100644 migrations/versions/0183_alter_primary_key.py diff --git a/app/celery/reporting_tasks.py b/app/celery/reporting_tasks.py index 719fb966f..b1636474b 100644 --- a/app/celery/reporting_tasks.py +++ b/app/celery/reporting_tasks.py @@ -10,10 +10,11 @@ from app.models import (Notification, LETTER_TYPE, SMS_TYPE) from app import db from sqlalchemy import func, desc, case -from app.dao.dao_utils import transactional from notifications_utils.statsd_decorators import statsd from app import notify_celery from flask import current_app +from app.utils import convert_bst_to_utc +from dateutil import parser def get_rate(non_letter_rates, letter_rates, notification_type, date, crown=None, rate_multiplier=None): @@ -28,91 +29,94 @@ def get_rate(non_letter_rates, letter_rates, notification_type, date, crown=None @notify_celery.task(name="create-nightly-billing") @statsd(namespace="tasks") -@transactional def create_nightly_billing(day_start=None): + # day_start is a datetime.date() object. e.g. + # 3 days of data counting back from day_start is consolidated if day_start is None: - day_start = datetime.date(datetime.utcnow()) - timedelta(days=3) # Nightly jobs consolidating last 3 days - # Task to be run after mid-night + day_start = datetime.today() - timedelta(days=1) non_letter_rates = [(r.notification_type, r.valid_from, r.rate) for r in Rate.query.order_by(desc(Rate.valid_from)).all()] letter_rates = [(r.start_date, r.crown, r.sheet_count, r.rate) for r in LetterRate.query.order_by(desc(LetterRate.start_date)).all()] - transit_data = db.session.query( - func.date_trunc('day', Notification.created_at).label('day_created'), - Notification.template_id, - Notification.service_id, - Notification.notification_type, - func.coalesce(Notification.sent_by, - case( - [ - (Notification.notification_type == 'letter', 'dvla'), - (Notification.notification_type == 'sms', 'unknown'), - (Notification.notification_type == 'email', 'ses') - ]), - ).label('sent_by'), - func.coalesce(Notification.rate_multiplier, 1).label('rate_multiplier'), - func.coalesce(Notification.international, False).label('international'), - func.sum(Notification.billable_units).label('billable_units'), - func.count().label('notifications_sent'), - Service.crown, - ).filter( - Notification.status != NOTIFICATION_CREATED, # at created status, provider information is not available - Notification.status != NOTIFICATION_TECHNICAL_FAILURE, - Notification.key_type != KEY_TYPE_TEST, - Notification.created_at >= day_start - ).group_by( - 'day_created', - Notification.template_id, - Notification.service_id, - Notification.notification_type, - 'sent_by', - Notification.rate_multiplier, - Notification.international, - Service.crown - ).join( - Service - ).order_by( - 'day_created' - ).all() + for i in range(0, 3): + process_day = day_start - timedelta(days=i) + ds = convert_bst_to_utc(parser.parse("{:%Y-%m-%d}".format(process_day) + ' 00:00:00')) + de = convert_bst_to_utc(parser.parse("{:%Y-%m-%d}".format(process_day + timedelta(days=1)) + ' 00:00:00')) - updated_records = 0 - inserted_records = 0 + transit_data = db.session.query( + Notification.template_id, + Notification.service_id, + Notification.notification_type, + func.coalesce(Notification.sent_by, + case( + [ + (Notification.notification_type == 'letter', 'dvla'), + (Notification.notification_type == 'sms', 'unknown'), + (Notification.notification_type == 'email', 'ses') + ]), + ).label('sent_by'), + func.coalesce(Notification.rate_multiplier, 1).label('rate_multiplier'), + func.coalesce(Notification.international, False).label('international'), + func.sum(Notification.billable_units).label('billable_units'), + func.count().label('notifications_sent'), + Service.crown, + ).filter( + Notification.status != NOTIFICATION_CREATED, # at created status, provider information is not available + Notification.status != NOTIFICATION_TECHNICAL_FAILURE, + Notification.key_type != KEY_TYPE_TEST, + Notification.created_at >= ds, + Notification.created_at < de + ).group_by( + Notification.template_id, + Notification.service_id, + Notification.notification_type, + 'sent_by', + Notification.rate_multiplier, + Notification.international, + Service.crown + ).join( + Service + ).all() - for data in transit_data: - update_count = FactBilling.query.filter( - FactBilling.bst_date == data.day_created, - FactBilling.template_id == data.template_id, - FactBilling.provider == data.sent_by, # This could be zero - this is a bug that needs to be fixed. - FactBilling.rate_multiplier == data.rate_multiplier, - FactBilling.notification_type == data.notification_type, - ).update( - {"notifications_sent": data.notifications_sent, - "billable_units": data.billable_units}, - synchronize_session=False) - if update_count == 0: - billing_record = FactBilling( - bst_date=data.day_created, - template_id=data.template_id, - service_id=data.service_id, - notification_type=data.notification_type, - provider=data.sent_by, - rate_multiplier=data.rate_multiplier, - international=data.international, - billable_units=data.billable_units, - notifications_sent=data.notifications_sent, - rate=get_rate(non_letter_rates, - letter_rates, - data.notification_type, - data.day_created, - data.crown, - data.rate_multiplier) - ) - db.session.add(billing_record) - inserted_records += 1 + updated_records = 0 + inserted_records = 0 - updated_records += update_count + for data in transit_data: + update_count = FactBilling.query.filter( + FactBilling.bst_date == process_day, + FactBilling.template_id == data.template_id, + FactBilling.provider == data.sent_by, # This could be zero - this is a bug that needs to be fixed. + FactBilling.rate_multiplier == data.rate_multiplier, + FactBilling.notification_type == data.notification_type, + ).update( + {"notifications_sent": data.notifications_sent, + "billable_units": data.billable_units}, + synchronize_session=False) + if update_count == 0: + billing_record = FactBilling( + bst_date=process_day, + template_id=data.template_id, + service_id=data.service_id, + notification_type=data.notification_type, + provider=data.sent_by, + rate_multiplier=data.rate_multiplier, + international=data.international, + billable_units=data.billable_units, + notifications_sent=data.notifications_sent, + rate=get_rate(non_letter_rates, + letter_rates, + data.notification_type, + process_day, + data.crown, + data.rate_multiplier) + ) + db.session.add(billing_record) + inserted_records += 1 - current_app.logger.info('ft_billing: {} rows updated, {} rows inserted' - .format(updated_records, inserted_records)) + updated_records += update_count + db.session.commit() + + current_app.logger.info('ft_billing {} to {}: {} rows updated, {} rows inserted' + .format(ds, de, updated_records, inserted_records)) diff --git a/app/commands.py b/app/commands.py index bf90623ca..a0a00ca32 100644 --- a/app/commands.py +++ b/app/commands.py @@ -388,24 +388,39 @@ def migrate_data_to_ft_billing(start_date, end_date): print('Billing migration from date {} to {}'.format(start_date, end_date)) process_date = start_date - while (process_date <= end_date): + total_updated = 0 + + while process_date < end_date: + + sql = \ + """ + select count(*) from notification_history where notification_status!='technical-failure' + and key_type!='test' + and notification_status!='created' + and created_at >= (date :start + time '00:00:00') at time zone 'Europe/London' at time zone 'UTC' + and created_at < (date :end + time '00:00:00') at time zone 'Europe/London' at time zone 'UTC' + """ + num_notifications = db.session.execute(sql, {"start": process_date, + "end": process_date + timedelta(days=1)}).fetchall()[0][0] sql = \ """ select count(*) from - (select distinct date_part('day', created_at) as utc_date, service_id, template_id, rate_multiplier, + (select distinct service_id, template_id, rate_multiplier, sent_by from notification_history where notification_status!='technical-failure' and key_type!='test' and notification_status!='created' - and created_at >= :start - and created_at < :end order by utc_date) as distinct_records + and created_at >= (date :start + time '00:00:00') at time zone 'Europe/London' at time zone 'UTC' + and created_at < (date :end + time '00:00:00') at time zone 'Europe/London' at time zone 'UTC' + ) as distinct_records """ predicted_records = db.session.execute(sql, {"start": process_date, "end": process_date + timedelta(days=1)}).fetchall()[0][0] + start_time = datetime.now() - print('{}: Migrating date: {}, expecting {} rows' - .format(start_time, process_date, predicted_records)) + print('ft_billing: Migrating date: {}, notifications: {}, expecting {} ft_billing rows' + .format(process_date.date(), num_notifications, predicted_records)) # migrate data into ft_billing, ignore if records already exist - do not do upsert sql = \ @@ -443,14 +458,15 @@ def migrate_data_to_ft_billing(start_date, end_date): 1 as notifications_sent from public.notification_history n left join templates t on t.id = n.template_id - left join dm_datetime da on n.created_at> da.utc_daytime_start + left join dm_datetime da on n.created_at>= da.utc_daytime_start and n.created_at < da.utc_daytime_end left join services s on s.id = n.service_id where n.notification_status!='technical-failure' and n.key_type!='test' and n.notification_status!='created' - and n.created_at >= :start - and n.created_at < :end + and n.created_at >= (date :start + time '00:00:00') at time zone 'Europe/London' + at time zone 'UTC' + and n.created_at < (date :end + time '00:00:00') at time zone 'Europe/London' at time zone 'UTC' ) as individual_record group by bst_date, template_id, service_id, notification_type, provider, rate_multiplier, international, sms_rate, letter_rate @@ -463,11 +479,13 @@ def migrate_data_to_ft_billing(start_date, end_date): result = db.session.execute(sql, {"start": process_date, "end": process_date + timedelta(days=1)}) db.session.commit() - print('{}: --- Completed took {}ms. Migrated {} rows.'.format(datetime.now(), datetime.now() - start_time, - result.rowcount, - )) + print('ft_billing: --- Completed took {}ms. Migrated {} rows.'.format(datetime.now() - start_time, + result.rowcount)) if predicted_records != result.rowcount: - print(' : --- Result mismatch by {} rows' + print(' : ^^^ Result mismatch by {} rows ^^^' .format(predicted_records - result.rowcount)) process_date += timedelta(days=1) + + total_updated += result.rowcount + print('Total inserted/updated records = {}'.format(total_updated)) diff --git a/migrations/versions/0183_alter_primary_key.py b/migrations/versions/0183_alter_primary_key.py new file mode 100644 index 000000000..a183f9562 --- /dev/null +++ b/migrations/versions/0183_alter_primary_key.py @@ -0,0 +1,78 @@ +""" + +Revision ID: 0183_alter_primary_key +Revises: 0182_add_upload_document_perm +Create Date: 2018-03-25 21:23:32.403212 + +""" +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + +revision = '0183_alter_primary_key' +down_revision = '0182_add_upload_document_perm' + + +def upgrade(): + # Drop the old dm_datetime table and create a new one + op.execute( + """ + delete from dm_datetime where 1=1; + """) + + op.execute( + """ + INSERT into dm_datetime ( + SELECT + datum AS bst_date, + EXTRACT(YEAR FROM datum) AS year, + EXTRACT(MONTH FROM datum) AS month, + -- Localized month name + to_char(datum, 'TMMonth') AS month_name, + EXTRACT(DAY FROM datum) AS day, + EXTRACT(DAY FROM datum) AS bst_day, + EXTRACT(DOY FROM datum) AS day_of_year, + -- Localized weekday + to_char(datum, 'TMDay') AS week_day_name, + -- ISO calendar week + EXTRACT(week FROM datum) AS calendar_week, + 'Q' || to_char(datum, 'Q') AS quartal, + to_char(datum, 'yyyy/"Q"Q') AS year_quartal, + to_char(datum, 'yyyy/mm') AS year_month, + -- ISO calendar year and week + to_char(datum, 'iyyy/IW') AS year_calendar_week, + (SELECT CASE WHEN (extract(month from datum) <= 3) THEN (extract(year FROM datum) -1) + ELSE (extract(year FROM datum)) end), + (datum + TIME '00:00:00') at Time zone 'Europe/London' at TIME zone 'utc' as utc_daytime_start, -- convert bst time to utc time + (datum + TIME '24:00:00') at Time zone 'Europe/London' at TIME zone 'utc' as utc_daytime_end + FROM ( + -- There are 10 leap years in this range, so calculate 365 * 50 + 5 records + SELECT '2015-01-01'::date + SEQUENCE.DAY AS datum + FROM generate_series(0,365*50+10) AS SEQUENCE(DAY) + GROUP BY SEQUENCE.day + ) DQ + ORDER BY bst_date + ); + """ + ) + + op.drop_constraint('ft_billing_pkey', 'ft_billing', type_='primary') + + op.create_primary_key('ft_billing_pkey', 'ft_billing', ['bst_date', + 'template_id', + 'service_id', + 'rate_multiplier', + 'provider', + 'notification_type']) + + +def downgrade(): + # We don't downgrade populated data + op.drop_constraint('ft_billing_pkey', 'ft_billing', type_='primary') + + op.create_primary_key('ft_billing_pkey', 'ft_billing', ['bst_date', + 'template_id', + 'rate_multiplier', + 'provider', + 'notification_type']) + diff --git a/tests/app/celery/test_reporting_tasks.py b/tests/app/celery/test_reporting_tasks.py index eb4c8453c..b9ce112ec 100644 --- a/tests/app/celery/test_reporting_tasks.py +++ b/tests/app/celery/test_reporting_tasks.py @@ -76,7 +76,7 @@ def test_create_nightly_billing_sms_rate_multiplier( assert len(records) == 0 create_nightly_billing(yesterday) - records = FactBilling.query.all() + records = FactBilling.query.order_by('rate_multiplier').all() assert len(records) == records_num for i, record in enumerate(records): assert record.bst_date == datetime.date(yesterday) @@ -298,9 +298,9 @@ def test_create_nightly_billing_consolidate_from_3_days_delta( create_nightly_billing() records = FactBilling.query.order_by(FactBilling.bst_date).all() - assert len(records) == 4 + assert len(records) == 3 assert records[0].bst_date == date(2018, 1, 12) - assert records[-1].bst_date == date(2018, 1, 15) + assert records[-1].bst_date == date(2018, 1, 14) def test_get_rate_for_letter_latest(notify_db, notify_db_session): @@ -354,3 +354,53 @@ def test_get_rate_for_sms_and_email(notify_db, notify_db_session): rate = get_rate(non_letter_rates, letter_rates, EMAIL_TYPE, datetime(2018, 1, 1)) assert rate == Decimal(0) + + +@freeze_time('2018-03-27T03:30:00') +# summer time starts on 2018-03-25 +def test_create_nightly_billing_use_BST( + notify_db, + notify_db_session, + sample_service, + sample_template, + mocker): + + mocker.patch('app.celery.reporting_tasks.get_rate', side_effect=mocker_get_rate) + + sample_notification( + notify_db, + notify_db_session, + created_at=datetime(2018, 3, 25, 12, 0), + service=sample_service, + template=sample_template, + status='delivered', + sent_by=None, + international=False, + rate_multiplier=1.0, + billable_units=1, + ) + + sample_notification( + notify_db, + notify_db_session, + created_at=datetime(2018, 3, 25, 23, 5), + service=sample_service, + template=sample_template, + status='delivered', + sent_by=None, + international=False, + rate_multiplier=1.0, + billable_units=1, + ) + + notifications = Notification.query.order_by(Notification.created_at).all() + assert len(notifications) == 2 + records = FactBilling.query.all() + assert len(records) == 0 + + create_nightly_billing() + records = FactBilling.query.order_by(FactBilling.bst_date).all() + + assert len(records) == 2 + assert records[0].bst_date == date(2018, 3, 25) + assert records[-1].bst_date == date(2018, 3, 26) From 83aa1b9fa3fa8ed68f0c38ec72ac6e6d5fa1f1f8 Mon Sep 17 00:00:00 2001 From: venusbb Date: Wed, 28 Mar 2018 10:39:25 +0100 Subject: [PATCH 2/2] use datetime.combine rather than parser.parse --- app/celery/reporting_tasks.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/app/celery/reporting_tasks.py b/app/celery/reporting_tasks.py index b1636474b..960264fc6 100644 --- a/app/celery/reporting_tasks.py +++ b/app/celery/reporting_tasks.py @@ -1,4 +1,4 @@ -from datetime import datetime, timedelta +from datetime import datetime, timedelta, time from app.models import (Notification, Rate, NOTIFICATION_CREATED, @@ -14,7 +14,6 @@ from notifications_utils.statsd_decorators import statsd from app import notify_celery from flask import current_app from app.utils import convert_bst_to_utc -from dateutil import parser def get_rate(non_letter_rates, letter_rates, notification_type, date, crown=None, rate_multiplier=None): @@ -42,8 +41,8 @@ def create_nightly_billing(day_start=None): for i in range(0, 3): process_day = day_start - timedelta(days=i) - ds = convert_bst_to_utc(parser.parse("{:%Y-%m-%d}".format(process_day) + ' 00:00:00')) - de = convert_bst_to_utc(parser.parse("{:%Y-%m-%d}".format(process_day + timedelta(days=1)) + ' 00:00:00')) + ds = convert_bst_to_utc(datetime.combine(process_day, time.min)) + de = convert_bst_to_utc(datetime.combine(process_day + timedelta(days=1), time.min)) transit_data = db.session.query( Notification.template_id,