mirror of
https://github.com/GSA/notifications-api.git
synced 2025-12-16 10:12:32 -05:00
Database tweak and BST bug fix
This commit is contained in:
@@ -10,10 +10,11 @@ from app.models import (Notification,
|
|||||||
LETTER_TYPE, SMS_TYPE)
|
LETTER_TYPE, SMS_TYPE)
|
||||||
from app import db
|
from app import db
|
||||||
from sqlalchemy import func, desc, case
|
from sqlalchemy import func, desc, case
|
||||||
from app.dao.dao_utils import transactional
|
|
||||||
from notifications_utils.statsd_decorators import statsd
|
from notifications_utils.statsd_decorators import statsd
|
||||||
from app import notify_celery
|
from app import notify_celery
|
||||||
from flask import current_app
|
from flask import current_app
|
||||||
|
from app.utils import convert_bst_to_utc
|
||||||
|
from dateutil import parser
|
||||||
|
|
||||||
|
|
||||||
def get_rate(non_letter_rates, letter_rates, notification_type, date, crown=None, rate_multiplier=None):
|
def get_rate(non_letter_rates, letter_rates, notification_type, date, crown=None, rate_multiplier=None):
|
||||||
@@ -28,19 +29,23 @@ def get_rate(non_letter_rates, letter_rates, notification_type, date, crown=None
|
|||||||
|
|
||||||
@notify_celery.task(name="create-nightly-billing")
|
@notify_celery.task(name="create-nightly-billing")
|
||||||
@statsd(namespace="tasks")
|
@statsd(namespace="tasks")
|
||||||
@transactional
|
|
||||||
def create_nightly_billing(day_start=None):
|
def create_nightly_billing(day_start=None):
|
||||||
|
# day_start is a datetime.date() object. e.g.
|
||||||
|
# 3 days of data counting back from day_start is consolidated
|
||||||
if day_start is None:
|
if day_start is None:
|
||||||
day_start = datetime.date(datetime.utcnow()) - timedelta(days=3) # Nightly jobs consolidating last 3 days
|
day_start = datetime.today() - timedelta(days=1)
|
||||||
# Task to be run after mid-night
|
|
||||||
|
|
||||||
non_letter_rates = [(r.notification_type, r.valid_from, r.rate) for r in
|
non_letter_rates = [(r.notification_type, r.valid_from, r.rate) for r in
|
||||||
Rate.query.order_by(desc(Rate.valid_from)).all()]
|
Rate.query.order_by(desc(Rate.valid_from)).all()]
|
||||||
letter_rates = [(r.start_date, r.crown, r.sheet_count, r.rate) for r in
|
letter_rates = [(r.start_date, r.crown, r.sheet_count, r.rate) for r in
|
||||||
LetterRate.query.order_by(desc(LetterRate.start_date)).all()]
|
LetterRate.query.order_by(desc(LetterRate.start_date)).all()]
|
||||||
|
|
||||||
|
for i in range(0, 3):
|
||||||
|
process_day = day_start - timedelta(days=i)
|
||||||
|
ds = convert_bst_to_utc(parser.parse("{:%Y-%m-%d}".format(process_day) + ' 00:00:00'))
|
||||||
|
de = convert_bst_to_utc(parser.parse("{:%Y-%m-%d}".format(process_day + timedelta(days=1)) + ' 00:00:00'))
|
||||||
|
|
||||||
transit_data = db.session.query(
|
transit_data = db.session.query(
|
||||||
func.date_trunc('day', Notification.created_at).label('day_created'),
|
|
||||||
Notification.template_id,
|
Notification.template_id,
|
||||||
Notification.service_id,
|
Notification.service_id,
|
||||||
Notification.notification_type,
|
Notification.notification_type,
|
||||||
@@ -61,9 +66,9 @@ def create_nightly_billing(day_start=None):
|
|||||||
Notification.status != NOTIFICATION_CREATED, # at created status, provider information is not available
|
Notification.status != NOTIFICATION_CREATED, # at created status, provider information is not available
|
||||||
Notification.status != NOTIFICATION_TECHNICAL_FAILURE,
|
Notification.status != NOTIFICATION_TECHNICAL_FAILURE,
|
||||||
Notification.key_type != KEY_TYPE_TEST,
|
Notification.key_type != KEY_TYPE_TEST,
|
||||||
Notification.created_at >= day_start
|
Notification.created_at >= ds,
|
||||||
|
Notification.created_at < de
|
||||||
).group_by(
|
).group_by(
|
||||||
'day_created',
|
|
||||||
Notification.template_id,
|
Notification.template_id,
|
||||||
Notification.service_id,
|
Notification.service_id,
|
||||||
Notification.notification_type,
|
Notification.notification_type,
|
||||||
@@ -73,8 +78,6 @@ def create_nightly_billing(day_start=None):
|
|||||||
Service.crown
|
Service.crown
|
||||||
).join(
|
).join(
|
||||||
Service
|
Service
|
||||||
).order_by(
|
|
||||||
'day_created'
|
|
||||||
).all()
|
).all()
|
||||||
|
|
||||||
updated_records = 0
|
updated_records = 0
|
||||||
@@ -82,7 +85,7 @@ def create_nightly_billing(day_start=None):
|
|||||||
|
|
||||||
for data in transit_data:
|
for data in transit_data:
|
||||||
update_count = FactBilling.query.filter(
|
update_count = FactBilling.query.filter(
|
||||||
FactBilling.bst_date == data.day_created,
|
FactBilling.bst_date == process_day,
|
||||||
FactBilling.template_id == data.template_id,
|
FactBilling.template_id == data.template_id,
|
||||||
FactBilling.provider == data.sent_by, # This could be zero - this is a bug that needs to be fixed.
|
FactBilling.provider == data.sent_by, # This could be zero - this is a bug that needs to be fixed.
|
||||||
FactBilling.rate_multiplier == data.rate_multiplier,
|
FactBilling.rate_multiplier == data.rate_multiplier,
|
||||||
@@ -93,7 +96,7 @@ def create_nightly_billing(day_start=None):
|
|||||||
synchronize_session=False)
|
synchronize_session=False)
|
||||||
if update_count == 0:
|
if update_count == 0:
|
||||||
billing_record = FactBilling(
|
billing_record = FactBilling(
|
||||||
bst_date=data.day_created,
|
bst_date=process_day,
|
||||||
template_id=data.template_id,
|
template_id=data.template_id,
|
||||||
service_id=data.service_id,
|
service_id=data.service_id,
|
||||||
notification_type=data.notification_type,
|
notification_type=data.notification_type,
|
||||||
@@ -105,7 +108,7 @@ def create_nightly_billing(day_start=None):
|
|||||||
rate=get_rate(non_letter_rates,
|
rate=get_rate(non_letter_rates,
|
||||||
letter_rates,
|
letter_rates,
|
||||||
data.notification_type,
|
data.notification_type,
|
||||||
data.day_created,
|
process_day,
|
||||||
data.crown,
|
data.crown,
|
||||||
data.rate_multiplier)
|
data.rate_multiplier)
|
||||||
)
|
)
|
||||||
@@ -113,6 +116,7 @@ def create_nightly_billing(day_start=None):
|
|||||||
inserted_records += 1
|
inserted_records += 1
|
||||||
|
|
||||||
updated_records += update_count
|
updated_records += update_count
|
||||||
|
db.session.commit()
|
||||||
|
|
||||||
current_app.logger.info('ft_billing: {} rows updated, {} rows inserted'
|
current_app.logger.info('ft_billing {} to {}: {} rows updated, {} rows inserted'
|
||||||
.format(updated_records, inserted_records))
|
.format(ds, de, updated_records, inserted_records))
|
||||||
|
|||||||
@@ -388,24 +388,39 @@ def migrate_data_to_ft_billing(start_date, end_date):
|
|||||||
print('Billing migration from date {} to {}'.format(start_date, end_date))
|
print('Billing migration from date {} to {}'.format(start_date, end_date))
|
||||||
|
|
||||||
process_date = start_date
|
process_date = start_date
|
||||||
while (process_date <= end_date):
|
total_updated = 0
|
||||||
|
|
||||||
|
while process_date < end_date:
|
||||||
|
|
||||||
|
sql = \
|
||||||
|
"""
|
||||||
|
select count(*) from notification_history where notification_status!='technical-failure'
|
||||||
|
and key_type!='test'
|
||||||
|
and notification_status!='created'
|
||||||
|
and created_at >= (date :start + time '00:00:00') at time zone 'Europe/London' at time zone 'UTC'
|
||||||
|
and created_at < (date :end + time '00:00:00') at time zone 'Europe/London' at time zone 'UTC'
|
||||||
|
"""
|
||||||
|
num_notifications = db.session.execute(sql, {"start": process_date,
|
||||||
|
"end": process_date + timedelta(days=1)}).fetchall()[0][0]
|
||||||
sql = \
|
sql = \
|
||||||
"""
|
"""
|
||||||
select count(*) from
|
select count(*) from
|
||||||
(select distinct date_part('day', created_at) as utc_date, service_id, template_id, rate_multiplier,
|
(select distinct service_id, template_id, rate_multiplier,
|
||||||
sent_by from notification_history
|
sent_by from notification_history
|
||||||
where notification_status!='technical-failure'
|
where notification_status!='technical-failure'
|
||||||
and key_type!='test'
|
and key_type!='test'
|
||||||
and notification_status!='created'
|
and notification_status!='created'
|
||||||
and created_at >= :start
|
and created_at >= (date :start + time '00:00:00') at time zone 'Europe/London' at time zone 'UTC'
|
||||||
and created_at < :end order by utc_date) as distinct_records
|
and created_at < (date :end + time '00:00:00') at time zone 'Europe/London' at time zone 'UTC'
|
||||||
|
) as distinct_records
|
||||||
"""
|
"""
|
||||||
|
|
||||||
predicted_records = db.session.execute(sql, {"start": process_date,
|
predicted_records = db.session.execute(sql, {"start": process_date,
|
||||||
"end": process_date + timedelta(days=1)}).fetchall()[0][0]
|
"end": process_date + timedelta(days=1)}).fetchall()[0][0]
|
||||||
|
|
||||||
start_time = datetime.now()
|
start_time = datetime.now()
|
||||||
print('{}: Migrating date: {}, expecting {} rows'
|
print('ft_billing: Migrating date: {}, notifications: {}, expecting {} ft_billing rows'
|
||||||
.format(start_time, process_date, predicted_records))
|
.format(process_date.date(), num_notifications, predicted_records))
|
||||||
|
|
||||||
# migrate data into ft_billing, ignore if records already exist - do not do upsert
|
# migrate data into ft_billing, ignore if records already exist - do not do upsert
|
||||||
sql = \
|
sql = \
|
||||||
@@ -443,14 +458,15 @@ def migrate_data_to_ft_billing(start_date, end_date):
|
|||||||
1 as notifications_sent
|
1 as notifications_sent
|
||||||
from public.notification_history n
|
from public.notification_history n
|
||||||
left join templates t on t.id = n.template_id
|
left join templates t on t.id = n.template_id
|
||||||
left join dm_datetime da on n.created_at> da.utc_daytime_start
|
left join dm_datetime da on n.created_at>= da.utc_daytime_start
|
||||||
and n.created_at < da.utc_daytime_end
|
and n.created_at < da.utc_daytime_end
|
||||||
left join services s on s.id = n.service_id
|
left join services s on s.id = n.service_id
|
||||||
where n.notification_status!='technical-failure'
|
where n.notification_status!='technical-failure'
|
||||||
and n.key_type!='test'
|
and n.key_type!='test'
|
||||||
and n.notification_status!='created'
|
and n.notification_status!='created'
|
||||||
and n.created_at >= :start
|
and n.created_at >= (date :start + time '00:00:00') at time zone 'Europe/London'
|
||||||
and n.created_at < :end
|
at time zone 'UTC'
|
||||||
|
and n.created_at < (date :end + time '00:00:00') at time zone 'Europe/London' at time zone 'UTC'
|
||||||
) as individual_record
|
) as individual_record
|
||||||
group by bst_date, template_id, service_id, notification_type, provider, rate_multiplier, international,
|
group by bst_date, template_id, service_id, notification_type, provider, rate_multiplier, international,
|
||||||
sms_rate, letter_rate
|
sms_rate, letter_rate
|
||||||
@@ -463,11 +479,13 @@ def migrate_data_to_ft_billing(start_date, end_date):
|
|||||||
|
|
||||||
result = db.session.execute(sql, {"start": process_date, "end": process_date + timedelta(days=1)})
|
result = db.session.execute(sql, {"start": process_date, "end": process_date + timedelta(days=1)})
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
print('{}: --- Completed took {}ms. Migrated {} rows.'.format(datetime.now(), datetime.now() - start_time,
|
print('ft_billing: --- Completed took {}ms. Migrated {} rows.'.format(datetime.now() - start_time,
|
||||||
result.rowcount,
|
result.rowcount))
|
||||||
))
|
|
||||||
if predicted_records != result.rowcount:
|
if predicted_records != result.rowcount:
|
||||||
print(' : --- Result mismatch by {} rows'
|
print(' : ^^^ Result mismatch by {} rows ^^^'
|
||||||
.format(predicted_records - result.rowcount))
|
.format(predicted_records - result.rowcount))
|
||||||
|
|
||||||
process_date += timedelta(days=1)
|
process_date += timedelta(days=1)
|
||||||
|
|
||||||
|
total_updated += result.rowcount
|
||||||
|
print('Total inserted/updated records = {}'.format(total_updated))
|
||||||
|
|||||||
78
migrations/versions/0183_alter_primary_key.py
Normal file
78
migrations/versions/0183_alter_primary_key.py
Normal file
@@ -0,0 +1,78 @@
|
|||||||
|
"""
|
||||||
|
|
||||||
|
Revision ID: 0183_alter_primary_key
|
||||||
|
Revises: 0182_add_upload_document_perm
|
||||||
|
Create Date: 2018-03-25 21:23:32.403212
|
||||||
|
|
||||||
|
"""
|
||||||
|
from alembic import op
|
||||||
|
import sqlalchemy as sa
|
||||||
|
from sqlalchemy.dialects import postgresql
|
||||||
|
|
||||||
|
revision = '0183_alter_primary_key'
|
||||||
|
down_revision = '0182_add_upload_document_perm'
|
||||||
|
|
||||||
|
|
||||||
|
def upgrade():
|
||||||
|
# Drop the old dm_datetime table and create a new one
|
||||||
|
op.execute(
|
||||||
|
"""
|
||||||
|
delete from dm_datetime where 1=1;
|
||||||
|
""")
|
||||||
|
|
||||||
|
op.execute(
|
||||||
|
"""
|
||||||
|
INSERT into dm_datetime (
|
||||||
|
SELECT
|
||||||
|
datum AS bst_date,
|
||||||
|
EXTRACT(YEAR FROM datum) AS year,
|
||||||
|
EXTRACT(MONTH FROM datum) AS month,
|
||||||
|
-- Localized month name
|
||||||
|
to_char(datum, 'TMMonth') AS month_name,
|
||||||
|
EXTRACT(DAY FROM datum) AS day,
|
||||||
|
EXTRACT(DAY FROM datum) AS bst_day,
|
||||||
|
EXTRACT(DOY FROM datum) AS day_of_year,
|
||||||
|
-- Localized weekday
|
||||||
|
to_char(datum, 'TMDay') AS week_day_name,
|
||||||
|
-- ISO calendar week
|
||||||
|
EXTRACT(week FROM datum) AS calendar_week,
|
||||||
|
'Q' || to_char(datum, 'Q') AS quartal,
|
||||||
|
to_char(datum, 'yyyy/"Q"Q') AS year_quartal,
|
||||||
|
to_char(datum, 'yyyy/mm') AS year_month,
|
||||||
|
-- ISO calendar year and week
|
||||||
|
to_char(datum, 'iyyy/IW') AS year_calendar_week,
|
||||||
|
(SELECT CASE WHEN (extract(month from datum) <= 3) THEN (extract(year FROM datum) -1)
|
||||||
|
ELSE (extract(year FROM datum)) end),
|
||||||
|
(datum + TIME '00:00:00') at Time zone 'Europe/London' at TIME zone 'utc' as utc_daytime_start, -- convert bst time to utc time
|
||||||
|
(datum + TIME '24:00:00') at Time zone 'Europe/London' at TIME zone 'utc' as utc_daytime_end
|
||||||
|
FROM (
|
||||||
|
-- There are 10 leap years in this range, so calculate 365 * 50 + 5 records
|
||||||
|
SELECT '2015-01-01'::date + SEQUENCE.DAY AS datum
|
||||||
|
FROM generate_series(0,365*50+10) AS SEQUENCE(DAY)
|
||||||
|
GROUP BY SEQUENCE.day
|
||||||
|
) DQ
|
||||||
|
ORDER BY bst_date
|
||||||
|
);
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
|
||||||
|
op.drop_constraint('ft_billing_pkey', 'ft_billing', type_='primary')
|
||||||
|
|
||||||
|
op.create_primary_key('ft_billing_pkey', 'ft_billing', ['bst_date',
|
||||||
|
'template_id',
|
||||||
|
'service_id',
|
||||||
|
'rate_multiplier',
|
||||||
|
'provider',
|
||||||
|
'notification_type'])
|
||||||
|
|
||||||
|
|
||||||
|
def downgrade():
|
||||||
|
# We don't downgrade populated data
|
||||||
|
op.drop_constraint('ft_billing_pkey', 'ft_billing', type_='primary')
|
||||||
|
|
||||||
|
op.create_primary_key('ft_billing_pkey', 'ft_billing', ['bst_date',
|
||||||
|
'template_id',
|
||||||
|
'rate_multiplier',
|
||||||
|
'provider',
|
||||||
|
'notification_type'])
|
||||||
|
|
||||||
@@ -76,7 +76,7 @@ def test_create_nightly_billing_sms_rate_multiplier(
|
|||||||
assert len(records) == 0
|
assert len(records) == 0
|
||||||
|
|
||||||
create_nightly_billing(yesterday)
|
create_nightly_billing(yesterday)
|
||||||
records = FactBilling.query.all()
|
records = FactBilling.query.order_by('rate_multiplier').all()
|
||||||
assert len(records) == records_num
|
assert len(records) == records_num
|
||||||
for i, record in enumerate(records):
|
for i, record in enumerate(records):
|
||||||
assert record.bst_date == datetime.date(yesterday)
|
assert record.bst_date == datetime.date(yesterday)
|
||||||
@@ -298,9 +298,9 @@ def test_create_nightly_billing_consolidate_from_3_days_delta(
|
|||||||
create_nightly_billing()
|
create_nightly_billing()
|
||||||
records = FactBilling.query.order_by(FactBilling.bst_date).all()
|
records = FactBilling.query.order_by(FactBilling.bst_date).all()
|
||||||
|
|
||||||
assert len(records) == 4
|
assert len(records) == 3
|
||||||
assert records[0].bst_date == date(2018, 1, 12)
|
assert records[0].bst_date == date(2018, 1, 12)
|
||||||
assert records[-1].bst_date == date(2018, 1, 15)
|
assert records[-1].bst_date == date(2018, 1, 14)
|
||||||
|
|
||||||
|
|
||||||
def test_get_rate_for_letter_latest(notify_db, notify_db_session):
|
def test_get_rate_for_letter_latest(notify_db, notify_db_session):
|
||||||
@@ -354,3 +354,53 @@ def test_get_rate_for_sms_and_email(notify_db, notify_db_session):
|
|||||||
|
|
||||||
rate = get_rate(non_letter_rates, letter_rates, EMAIL_TYPE, datetime(2018, 1, 1))
|
rate = get_rate(non_letter_rates, letter_rates, EMAIL_TYPE, datetime(2018, 1, 1))
|
||||||
assert rate == Decimal(0)
|
assert rate == Decimal(0)
|
||||||
|
|
||||||
|
|
||||||
|
@freeze_time('2018-03-27T03:30:00')
|
||||||
|
# summer time starts on 2018-03-25
|
||||||
|
def test_create_nightly_billing_use_BST(
|
||||||
|
notify_db,
|
||||||
|
notify_db_session,
|
||||||
|
sample_service,
|
||||||
|
sample_template,
|
||||||
|
mocker):
|
||||||
|
|
||||||
|
mocker.patch('app.celery.reporting_tasks.get_rate', side_effect=mocker_get_rate)
|
||||||
|
|
||||||
|
sample_notification(
|
||||||
|
notify_db,
|
||||||
|
notify_db_session,
|
||||||
|
created_at=datetime(2018, 3, 25, 12, 0),
|
||||||
|
service=sample_service,
|
||||||
|
template=sample_template,
|
||||||
|
status='delivered',
|
||||||
|
sent_by=None,
|
||||||
|
international=False,
|
||||||
|
rate_multiplier=1.0,
|
||||||
|
billable_units=1,
|
||||||
|
)
|
||||||
|
|
||||||
|
sample_notification(
|
||||||
|
notify_db,
|
||||||
|
notify_db_session,
|
||||||
|
created_at=datetime(2018, 3, 25, 23, 5),
|
||||||
|
service=sample_service,
|
||||||
|
template=sample_template,
|
||||||
|
status='delivered',
|
||||||
|
sent_by=None,
|
||||||
|
international=False,
|
||||||
|
rate_multiplier=1.0,
|
||||||
|
billable_units=1,
|
||||||
|
)
|
||||||
|
|
||||||
|
notifications = Notification.query.order_by(Notification.created_at).all()
|
||||||
|
assert len(notifications) == 2
|
||||||
|
records = FactBilling.query.all()
|
||||||
|
assert len(records) == 0
|
||||||
|
|
||||||
|
create_nightly_billing()
|
||||||
|
records = FactBilling.query.order_by(FactBilling.bst_date).all()
|
||||||
|
|
||||||
|
assert len(records) == 2
|
||||||
|
assert records[0].bst_date == date(2018, 3, 25)
|
||||||
|
assert records[-1].bst_date == date(2018, 3, 26)
|
||||||
|
|||||||
Reference in New Issue
Block a user