mirror of
https://github.com/GSA/notifications-api.git
synced 2026-02-04 10:21:14 -05:00
Merge pull request #1807 from alphagov/vb-report-tasks
Billing database tweak and BST bug fix
This commit is contained in:
@@ -1,4 +1,4 @@
|
||||
from datetime import datetime, timedelta
|
||||
from datetime import datetime, timedelta, time
|
||||
from app.models import (Notification,
|
||||
Rate,
|
||||
NOTIFICATION_CREATED,
|
||||
@@ -10,10 +10,10 @@ from app.models import (Notification,
|
||||
LETTER_TYPE, SMS_TYPE)
|
||||
from app import db
|
||||
from sqlalchemy import func, desc, case
|
||||
from app.dao.dao_utils import transactional
|
||||
from notifications_utils.statsd_decorators import statsd
|
||||
from app import notify_celery
|
||||
from flask import current_app
|
||||
from app.utils import convert_bst_to_utc
|
||||
|
||||
|
||||
def get_rate(non_letter_rates, letter_rates, notification_type, date, crown=None, rate_multiplier=None):
|
||||
@@ -28,91 +28,94 @@ def get_rate(non_letter_rates, letter_rates, notification_type, date, crown=None
|
||||
|
||||
@notify_celery.task(name="create-nightly-billing")
|
||||
@statsd(namespace="tasks")
|
||||
@transactional
|
||||
def create_nightly_billing(day_start=None):
|
||||
# day_start is a datetime.date() object. e.g.
|
||||
# 3 days of data counting back from day_start is consolidated
|
||||
if day_start is None:
|
||||
day_start = datetime.date(datetime.utcnow()) - timedelta(days=3) # Nightly jobs consolidating last 3 days
|
||||
# Task to be run after mid-night
|
||||
day_start = datetime.today() - timedelta(days=1)
|
||||
|
||||
non_letter_rates = [(r.notification_type, r.valid_from, r.rate) for r in
|
||||
Rate.query.order_by(desc(Rate.valid_from)).all()]
|
||||
letter_rates = [(r.start_date, r.crown, r.sheet_count, r.rate) for r in
|
||||
LetterRate.query.order_by(desc(LetterRate.start_date)).all()]
|
||||
|
||||
transit_data = db.session.query(
|
||||
func.date_trunc('day', Notification.created_at).label('day_created'),
|
||||
Notification.template_id,
|
||||
Notification.service_id,
|
||||
Notification.notification_type,
|
||||
func.coalesce(Notification.sent_by,
|
||||
case(
|
||||
[
|
||||
(Notification.notification_type == 'letter', 'dvla'),
|
||||
(Notification.notification_type == 'sms', 'unknown'),
|
||||
(Notification.notification_type == 'email', 'ses')
|
||||
]),
|
||||
).label('sent_by'),
|
||||
func.coalesce(Notification.rate_multiplier, 1).label('rate_multiplier'),
|
||||
func.coalesce(Notification.international, False).label('international'),
|
||||
func.sum(Notification.billable_units).label('billable_units'),
|
||||
func.count().label('notifications_sent'),
|
||||
Service.crown,
|
||||
).filter(
|
||||
Notification.status != NOTIFICATION_CREATED, # at created status, provider information is not available
|
||||
Notification.status != NOTIFICATION_TECHNICAL_FAILURE,
|
||||
Notification.key_type != KEY_TYPE_TEST,
|
||||
Notification.created_at >= day_start
|
||||
).group_by(
|
||||
'day_created',
|
||||
Notification.template_id,
|
||||
Notification.service_id,
|
||||
Notification.notification_type,
|
||||
'sent_by',
|
||||
Notification.rate_multiplier,
|
||||
Notification.international,
|
||||
Service.crown
|
||||
).join(
|
||||
Service
|
||||
).order_by(
|
||||
'day_created'
|
||||
).all()
|
||||
for i in range(0, 3):
|
||||
process_day = day_start - timedelta(days=i)
|
||||
ds = convert_bst_to_utc(datetime.combine(process_day, time.min))
|
||||
de = convert_bst_to_utc(datetime.combine(process_day + timedelta(days=1), time.min))
|
||||
|
||||
updated_records = 0
|
||||
inserted_records = 0
|
||||
transit_data = db.session.query(
|
||||
Notification.template_id,
|
||||
Notification.service_id,
|
||||
Notification.notification_type,
|
||||
func.coalesce(Notification.sent_by,
|
||||
case(
|
||||
[
|
||||
(Notification.notification_type == 'letter', 'dvla'),
|
||||
(Notification.notification_type == 'sms', 'unknown'),
|
||||
(Notification.notification_type == 'email', 'ses')
|
||||
]),
|
||||
).label('sent_by'),
|
||||
func.coalesce(Notification.rate_multiplier, 1).label('rate_multiplier'),
|
||||
func.coalesce(Notification.international, False).label('international'),
|
||||
func.sum(Notification.billable_units).label('billable_units'),
|
||||
func.count().label('notifications_sent'),
|
||||
Service.crown,
|
||||
).filter(
|
||||
Notification.status != NOTIFICATION_CREATED, # at created status, provider information is not available
|
||||
Notification.status != NOTIFICATION_TECHNICAL_FAILURE,
|
||||
Notification.key_type != KEY_TYPE_TEST,
|
||||
Notification.created_at >= ds,
|
||||
Notification.created_at < de
|
||||
).group_by(
|
||||
Notification.template_id,
|
||||
Notification.service_id,
|
||||
Notification.notification_type,
|
||||
'sent_by',
|
||||
Notification.rate_multiplier,
|
||||
Notification.international,
|
||||
Service.crown
|
||||
).join(
|
||||
Service
|
||||
).all()
|
||||
|
||||
for data in transit_data:
|
||||
update_count = FactBilling.query.filter(
|
||||
FactBilling.bst_date == data.day_created,
|
||||
FactBilling.template_id == data.template_id,
|
||||
FactBilling.provider == data.sent_by, # This could be zero - this is a bug that needs to be fixed.
|
||||
FactBilling.rate_multiplier == data.rate_multiplier,
|
||||
FactBilling.notification_type == data.notification_type,
|
||||
).update(
|
||||
{"notifications_sent": data.notifications_sent,
|
||||
"billable_units": data.billable_units},
|
||||
synchronize_session=False)
|
||||
if update_count == 0:
|
||||
billing_record = FactBilling(
|
||||
bst_date=data.day_created,
|
||||
template_id=data.template_id,
|
||||
service_id=data.service_id,
|
||||
notification_type=data.notification_type,
|
||||
provider=data.sent_by,
|
||||
rate_multiplier=data.rate_multiplier,
|
||||
international=data.international,
|
||||
billable_units=data.billable_units,
|
||||
notifications_sent=data.notifications_sent,
|
||||
rate=get_rate(non_letter_rates,
|
||||
letter_rates,
|
||||
data.notification_type,
|
||||
data.day_created,
|
||||
data.crown,
|
||||
data.rate_multiplier)
|
||||
)
|
||||
db.session.add(billing_record)
|
||||
inserted_records += 1
|
||||
updated_records = 0
|
||||
inserted_records = 0
|
||||
|
||||
updated_records += update_count
|
||||
for data in transit_data:
|
||||
update_count = FactBilling.query.filter(
|
||||
FactBilling.bst_date == process_day,
|
||||
FactBilling.template_id == data.template_id,
|
||||
FactBilling.provider == data.sent_by, # This could be zero - this is a bug that needs to be fixed.
|
||||
FactBilling.rate_multiplier == data.rate_multiplier,
|
||||
FactBilling.notification_type == data.notification_type,
|
||||
).update(
|
||||
{"notifications_sent": data.notifications_sent,
|
||||
"billable_units": data.billable_units},
|
||||
synchronize_session=False)
|
||||
if update_count == 0:
|
||||
billing_record = FactBilling(
|
||||
bst_date=process_day,
|
||||
template_id=data.template_id,
|
||||
service_id=data.service_id,
|
||||
notification_type=data.notification_type,
|
||||
provider=data.sent_by,
|
||||
rate_multiplier=data.rate_multiplier,
|
||||
international=data.international,
|
||||
billable_units=data.billable_units,
|
||||
notifications_sent=data.notifications_sent,
|
||||
rate=get_rate(non_letter_rates,
|
||||
letter_rates,
|
||||
data.notification_type,
|
||||
process_day,
|
||||
data.crown,
|
||||
data.rate_multiplier)
|
||||
)
|
||||
db.session.add(billing_record)
|
||||
inserted_records += 1
|
||||
|
||||
current_app.logger.info('ft_billing: {} rows updated, {} rows inserted'
|
||||
.format(updated_records, inserted_records))
|
||||
updated_records += update_count
|
||||
db.session.commit()
|
||||
|
||||
current_app.logger.info('ft_billing {} to {}: {} rows updated, {} rows inserted'
|
||||
.format(ds, de, updated_records, inserted_records))
|
||||
|
||||
@@ -388,24 +388,39 @@ def migrate_data_to_ft_billing(start_date, end_date):
|
||||
print('Billing migration from date {} to {}'.format(start_date, end_date))
|
||||
|
||||
process_date = start_date
|
||||
while (process_date <= end_date):
|
||||
total_updated = 0
|
||||
|
||||
while process_date < end_date:
|
||||
|
||||
sql = \
|
||||
"""
|
||||
select count(*) from notification_history where notification_status!='technical-failure'
|
||||
and key_type!='test'
|
||||
and notification_status!='created'
|
||||
and created_at >= (date :start + time '00:00:00') at time zone 'Europe/London' at time zone 'UTC'
|
||||
and created_at < (date :end + time '00:00:00') at time zone 'Europe/London' at time zone 'UTC'
|
||||
"""
|
||||
num_notifications = db.session.execute(sql, {"start": process_date,
|
||||
"end": process_date + timedelta(days=1)}).fetchall()[0][0]
|
||||
sql = \
|
||||
"""
|
||||
select count(*) from
|
||||
(select distinct date_part('day', created_at) as utc_date, service_id, template_id, rate_multiplier,
|
||||
(select distinct service_id, template_id, rate_multiplier,
|
||||
sent_by from notification_history
|
||||
where notification_status!='technical-failure'
|
||||
and key_type!='test'
|
||||
and notification_status!='created'
|
||||
and created_at >= :start
|
||||
and created_at < :end order by utc_date) as distinct_records
|
||||
and created_at >= (date :start + time '00:00:00') at time zone 'Europe/London' at time zone 'UTC'
|
||||
and created_at < (date :end + time '00:00:00') at time zone 'Europe/London' at time zone 'UTC'
|
||||
) as distinct_records
|
||||
"""
|
||||
|
||||
predicted_records = db.session.execute(sql, {"start": process_date,
|
||||
"end": process_date + timedelta(days=1)}).fetchall()[0][0]
|
||||
|
||||
start_time = datetime.now()
|
||||
print('{}: Migrating date: {}, expecting {} rows'
|
||||
.format(start_time, process_date, predicted_records))
|
||||
print('ft_billing: Migrating date: {}, notifications: {}, expecting {} ft_billing rows'
|
||||
.format(process_date.date(), num_notifications, predicted_records))
|
||||
|
||||
# migrate data into ft_billing, ignore if records already exist - do not do upsert
|
||||
sql = \
|
||||
@@ -443,14 +458,15 @@ def migrate_data_to_ft_billing(start_date, end_date):
|
||||
1 as notifications_sent
|
||||
from public.notification_history n
|
||||
left join templates t on t.id = n.template_id
|
||||
left join dm_datetime da on n.created_at> da.utc_daytime_start
|
||||
left join dm_datetime da on n.created_at>= da.utc_daytime_start
|
||||
and n.created_at < da.utc_daytime_end
|
||||
left join services s on s.id = n.service_id
|
||||
where n.notification_status!='technical-failure'
|
||||
and n.key_type!='test'
|
||||
and n.notification_status!='created'
|
||||
and n.created_at >= :start
|
||||
and n.created_at < :end
|
||||
and n.created_at >= (date :start + time '00:00:00') at time zone 'Europe/London'
|
||||
at time zone 'UTC'
|
||||
and n.created_at < (date :end + time '00:00:00') at time zone 'Europe/London' at time zone 'UTC'
|
||||
) as individual_record
|
||||
group by bst_date, template_id, service_id, notification_type, provider, rate_multiplier, international,
|
||||
sms_rate, letter_rate
|
||||
@@ -463,11 +479,13 @@ def migrate_data_to_ft_billing(start_date, end_date):
|
||||
|
||||
result = db.session.execute(sql, {"start": process_date, "end": process_date + timedelta(days=1)})
|
||||
db.session.commit()
|
||||
print('{}: --- Completed took {}ms. Migrated {} rows.'.format(datetime.now(), datetime.now() - start_time,
|
||||
result.rowcount,
|
||||
))
|
||||
print('ft_billing: --- Completed took {}ms. Migrated {} rows.'.format(datetime.now() - start_time,
|
||||
result.rowcount))
|
||||
if predicted_records != result.rowcount:
|
||||
print(' : --- Result mismatch by {} rows'
|
||||
print(' : ^^^ Result mismatch by {} rows ^^^'
|
||||
.format(predicted_records - result.rowcount))
|
||||
|
||||
process_date += timedelta(days=1)
|
||||
|
||||
total_updated += result.rowcount
|
||||
print('Total inserted/updated records = {}'.format(total_updated))
|
||||
|
||||
Reference in New Issue
Block a user