From b9953dd005d45f0a7dcd1020090f54e642ee677b Mon Sep 17 00:00:00 2001 From: venusbb Date: Wed, 21 Mar 2018 15:21:16 +0000 Subject: [PATCH 1/3] Command to migrate data to ft_billing --- app/commands.py | 89 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 89 insertions(+) diff --git a/app/commands.py b/app/commands.py index 23fc18c0a..5e778601f 100644 --- a/app/commands.py +++ b/app/commands.py @@ -367,3 +367,92 @@ def replay_service_callbacks(file_name, service_id): def setup_commands(application): application.cli.add_command(command_group) + + +@notify_command(name='migrate-data-to-ft-billing') +@click.option('-s', '--start_date', required=True, help="start date inclusive", type=click_dt(format='%Y-%m-%d')) +@click.option('-e', '--end_date', required=True, help="end date inclusive", type=click_dt(format='%Y-%m-%d')) +def migrate_data_to_ft_billing(start_date, end_date): + + print('Billing migration from date {} to {}'.format(start_date, end_date)) + + process_date = start_date + while (process_date <= end_date): + sql = \ + """ + select count(*) from + (select distinct date_part('day', created_at) as utc_date, service_id, template_id, rate_multiplier, + sent_by from notification_history + where notification_status!='technical-failure' + and key_type!='test' + and notification_status!='created' + and created_at >= '{}' + and created_at <'{}'order by utc_date) as distinct_records + """.format(process_date, process_date + timedelta(days=1)) + + predicted_records = db.session.execute(sql).fetchall()[0][0] + start_time = datetime.now() + print('{}: Migrating date: {}, expecting {} rows' + .format(start_time, process_date, predicted_records)) + + # migrate data into ft_billing, ignore if records already exist - do not do upsert + sql = \ + """ + insert into ft_billing (bst_date, template_id, service_id, notification_type, provider, rate_multiplier, + international, billable_units, notifications_sent, rate) + select bst_date, template_id, service_id, notification_type, provider, rate_multiplier, international, + sum(billable_units) as billable_units, sum(notifications_sent) as notification_sent, + case when notification_type = 'sms' then sms_rate else letter_rate end as rate + from ( + select + n.id, + da.bst_date, + coalesce(n.template_id, '00000000-0000-0000-0000-000000000000') as template_id, + coalesce(n.service_id, '00000000-0000-0000-0000-000000000000') as service_id, + n.notification_type, + coalesce(n.sent_by, ( + case + when notification_type = 'sms' then + coalesce(sent_by, 'unknown') + when notification_type = 'letter' then + coalesce(sent_by, 'dvla') + else + coalesce(sent_by, 'ses') + end )) as provider, + coalesce(n.rate_multiplier,1) as rate_multiplier, + s.crown, + coalesce((select rates.rate from rates + where n.notification_type = rates.notification_type and n.sent_at > rates.valid_from + order by rates.valid_from desc limit 1), 0) as sms_rate, + coalesce((select l.rate from letter_rates l where n.rate_multiplier = l.sheet_count + and s.crown = l.crown and n.notification_type='letter'), 0) as letter_rate, + coalesce(n.international, false) as international, + n.billable_units, + 1 as notifications_sent + from public.notification_history n + left join templates t on t.id = n.template_id + left join dm_datetime da on n.created_at> da.utc_daytime_start + and n.created_at < da.utc_daytime_end + left join services s on s.id = n.service_id + where n.notification_status!='technical-failure' + and n.key_type!='test' + and n.notification_status!='created' + and n.created_at >= '{}' + and n.created_at < '{}' + ) as individual_record + group by bst_date, template_id, service_id, notification_type, provider, rate_multiplier, international, + sms_rate, letter_rate + order by bst_date + on conflict do nothing + """.format(process_date, process_date + timedelta(days=1)) + + result = db.session.execute(sql) + db.session.commit() + print('{}: --- Completed took {}ms. Migrated {} rows.'.format(datetime.now(), datetime.now() - start_time, + result.rowcount, + )) + if predicted_records != result.rowcount: + print(' : --- Result mismatch by {} rows' + .format(predicted_records - result.rowcount)) + + process_date += timedelta(days=1) From 4b25654cbf30c0d31d29df174a843548b06ab9c3 Mon Sep 17 00:00:00 2001 From: venusbb Date: Wed, 21 Mar 2018 15:37:49 +0000 Subject: [PATCH 2/3] update record rather than ignore when duplicate --- app/commands.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/app/commands.py b/app/commands.py index 5e778601f..4e49f6e02 100644 --- a/app/commands.py +++ b/app/commands.py @@ -443,7 +443,10 @@ def migrate_data_to_ft_billing(start_date, end_date): group by bst_date, template_id, service_id, notification_type, provider, rate_multiplier, international, sms_rate, letter_rate order by bst_date - on conflict do nothing + on conflict on constraint ft_billing_pkey do update set + billable_units = excluded.billable_units, + notifications_sent = excluded.notifications_sent, + rate = excluded.rate """.format(process_date, process_date + timedelta(days=1)) result = db.session.execute(sql) From 9aa2536997583aa98a8baf068d811abe8af5200c Mon Sep 17 00:00:00 2001 From: venusbb Date: Wed, 21 Mar 2018 17:04:51 +0000 Subject: [PATCH 3/3] use sql parametrize rather than python format --- app/commands.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/app/commands.py b/app/commands.py index 4e49f6e02..f500951b8 100644 --- a/app/commands.py +++ b/app/commands.py @@ -386,11 +386,12 @@ def migrate_data_to_ft_billing(start_date, end_date): where notification_status!='technical-failure' and key_type!='test' and notification_status!='created' - and created_at >= '{}' - and created_at <'{}'order by utc_date) as distinct_records - """.format(process_date, process_date + timedelta(days=1)) + and created_at >= :start + and created_at < :end order by utc_date) as distinct_records + """ - predicted_records = db.session.execute(sql).fetchall()[0][0] + predicted_records = db.session.execute(sql, {"start": process_date, + "end": process_date + timedelta(days=1)}).fetchall()[0][0] start_time = datetime.now() print('{}: Migrating date: {}, expecting {} rows' .format(start_time, process_date, predicted_records)) @@ -437,8 +438,8 @@ def migrate_data_to_ft_billing(start_date, end_date): where n.notification_status!='technical-failure' and n.key_type!='test' and n.notification_status!='created' - and n.created_at >= '{}' - and n.created_at < '{}' + and n.created_at >= :start + and n.created_at < :end ) as individual_record group by bst_date, template_id, service_id, notification_type, provider, rate_multiplier, international, sms_rate, letter_rate @@ -447,9 +448,9 @@ def migrate_data_to_ft_billing(start_date, end_date): billable_units = excluded.billable_units, notifications_sent = excluded.notifications_sent, rate = excluded.rate - """.format(process_date, process_date + timedelta(days=1)) + """ - result = db.session.execute(sql) + result = db.session.execute(sql, {"start": process_date, "end": process_date + timedelta(days=1)}) db.session.commit() print('{}: --- Completed took {}ms. Migrated {} rows.'.format(datetime.now(), datetime.now() - start_time, result.rowcount,