Update the command to migrate data for ft_billing.

There is a massive performance improvement.
Another commit will need to alter the pk to include rate.
This commit is contained in:
Rebecca Law
2018-05-18 17:22:51 +01:00
parent 4b36fe0d9e
commit 64aec4a64c

View File

@@ -401,43 +401,13 @@ def setup_commands(application):
@statsd(namespace="tasks")
def migrate_data_to_ft_billing(start_date, end_date):
print('Billing migration from date {} to {}'.format(start_date, end_date))
current_app.logger.info('Billing migration from date {} to {}'.format(start_date, end_date))
process_date = start_date
total_updated = 0
while process_date < end_date:
sql = \
"""
select count(*) from notification_history where notification_status!='technical-failure'
and key_type!='test'
and notification_status!='created'
and created_at >= (date :start + time '00:00:00') at time zone 'Europe/London' at time zone 'UTC'
and created_at < (date :end + time '00:00:00') at time zone 'Europe/London' at time zone 'UTC'
"""
num_notifications = db.session.execute(sql, {"start": process_date,
"end": process_date + timedelta(days=1)}).fetchall()[0][0]
sql = \
"""
select count(*) from
(select distinct service_id, template_id, rate_multiplier,
sent_by from notification_history
where notification_status!='technical-failure'
and key_type!='test'
and notification_status!='created'
and created_at >= (date :start + time '00:00:00') at time zone 'Europe/London' at time zone 'UTC'
and created_at < (date :end + time '00:00:00') at time zone 'Europe/London' at time zone 'UTC'
) as distinct_records
"""
predicted_records = db.session.execute(sql, {"start": process_date,
"end": process_date + timedelta(days=1)}).fetchall()[0][0]
start_time = datetime.now()
print('ft_billing: Migrating date: {}, notifications: {}, expecting {} ft_billing rows'
.format(process_date.date(), num_notifications, predicted_records))
start_time = datetime.utcnow()
# migrate data into ft_billing, ignore if records already exist - do not do upsert
sql = \
"""
@@ -449,7 +419,7 @@ def migrate_data_to_ft_billing(start_date, end_date):
from (
select
n.id,
da.bst_date,
(n.created_at at time zone 'UTC' at time zone 'Europe/London')::timestamp::date as bst_date,
coalesce(n.template_id, '00000000-0000-0000-0000-000000000000') as template_id,
coalesce(n.service_id, '00000000-0000-0000-0000-000000000000') as service_id,
n.notification_type,
@@ -473,9 +443,6 @@ def migrate_data_to_ft_billing(start_date, end_date):
n.billable_units,
1 as notifications_sent
from public.notification_history n
left join templates t on t.id = n.template_id
left join dm_datetime da on n.created_at>= da.utc_daytime_start
and n.created_at < da.utc_daytime_end
left join services s on s.id = n.service_id
where n.notification_status!='technical-failure'
and n.key_type!='test'
@@ -495,16 +462,13 @@ def migrate_data_to_ft_billing(start_date, end_date):
result = db.session.execute(sql, {"start": process_date, "end": process_date + timedelta(days=1)})
db.session.commit()
print('ft_billing: --- Completed took {}ms. Migrated {} rows.'.format(datetime.now() - start_time,
current_app.logger.info('ft_billing: --- Completed took {}ms. Migrated {} rows.'.format(datetime.now() - start_time,
result.rowcount))
if predicted_records != result.rowcount:
print(' : ^^^ Result mismatch by {} rows ^^^'
.format(predicted_records - result.rowcount))
process_date += timedelta(days=1)
total_updated += result.rowcount
print('Total inserted/updated records = {}'.format(total_updated))
current_app.logger.info('Total inserted/updated records = {}'.format(total_updated))
@notify_command()