We found a problem with the report tasks that populate the fact tables (or statistic tables). It is possible that the notification status can change for notifications after 4 days.

This PR updates those queries to look in either Notification or NotificationHistory. Since the data does not exist in both tables at the same time we can do with and not worry about the data retention.
The query will iterate over each service, then each notification type and query the data if no results then try the history table.
This commit is contained in:
Rebecca Law
2019-07-18 15:29:54 +01:00
parent 663ab6d96b
commit ed611f982c
3 changed files with 125 additions and 72 deletions

View File

@@ -146,69 +146,88 @@ def delete_billing_data_for_service_for_day(process_day, service_id):
def fetch_billing_data_for_day(process_day, service_id=None):
start_date = convert_bst_to_utc(datetime.combine(process_day, time.min))
end_date = convert_bst_to_utc(datetime.combine(process_day + timedelta(days=1), time.min))
# use notification_history if process day is older than 7 days
# this is useful if we need to rebuild the ft_billing table for a date older than 7 days ago.
current_app.logger.info("Populate ft_billing for {} to {}".format(start_date, end_date))
table = Notification
if start_date < datetime.utcnow() - timedelta(days=7):
table = NotificationHistory
transit_data = []
for notification_type in (SMS_TYPE, EMAIL_TYPE, LETTER_TYPE):
billable_type_list = {
SMS_TYPE: NOTIFICATION_STATUS_TYPES_BILLABLE,
EMAIL_TYPE: NOTIFICATION_STATUS_TYPES_BILLABLE,
LETTER_TYPE: NOTIFICATION_STATUS_TYPES_BILLABLE_FOR_LETTERS
}
query = db.session.query(
table.template_id,
table.service_id,
table.notification_type,
func.coalesce(table.sent_by,
case(
[
(table.notification_type == 'letter', 'dvla'),
(table.notification_type == 'sms', 'unknown'),
(table.notification_type == 'email', 'ses')
]),
).label('sent_by'),
func.coalesce(table.rate_multiplier, 1).cast(Integer).label('rate_multiplier'),
func.coalesce(table.international, False).label('international'),
case(
[
(table.notification_type == 'letter', table.billable_units),
]
).label('letter_page_count'),
func.sum(table.billable_units).label('billable_units'),
func.count().label('notifications_sent'),
Service.crown,
func.coalesce(table.postage, 'none').label('postage')
).filter(
table.status.in_(billable_type_list[notification_type]),
table.key_type != KEY_TYPE_TEST,
table.created_at >= start_date,
table.created_at < end_date,
table.notification_type == notification_type
).group_by(
table.template_id,
table.service_id,
table.notification_type,
'sent_by',
'letter_page_count',
table.rate_multiplier,
table.international,
Service.crown,
table.postage,
).join(
Service
)
if service_id:
query = query.filter(table.service_id == service_id)
if not service_id:
service_ids = [x.id for x in Service.query.all()]
else:
service_ids = [service_id]
for id_of_service in service_ids:
for notification_type in (SMS_TYPE, EMAIL_TYPE, LETTER_TYPE):
results = _query_for_billing_data(
table=Notification,
notification_type=notification_type,
start_date=start_date,
end_date=end_date,
service_id = id_of_service
)
# If data has been purged from Notification then use NotificationHistory
if len(results) == 0:
results = _query_for_billing_data(
table=NotificationHistory,
notification_type=notification_type,
start_date=start_date,
end_date=end_date,
service_id=id_of_service
)
transit_data = transit_data + query.all()
transit_data = transit_data + results
return transit_data
def _query_for_billing_data(table, notification_type, start_date, end_date, service_id):
billable_type_list = {
SMS_TYPE: NOTIFICATION_STATUS_TYPES_BILLABLE,
EMAIL_TYPE: NOTIFICATION_STATUS_TYPES_BILLABLE,
LETTER_TYPE: NOTIFICATION_STATUS_TYPES_BILLABLE_FOR_LETTERS
}
query = db.session.query(
table.template_id,
table.service_id,
table.notification_type,
func.coalesce(table.sent_by,
case(
[
(table.notification_type == 'letter', 'dvla'),
(table.notification_type == 'sms', 'unknown'),
(table.notification_type == 'email', 'ses')
]),
).label('sent_by'),
func.coalesce(table.rate_multiplier, 1).cast(Integer).label('rate_multiplier'),
func.coalesce(table.international, False).label('international'),
case(
[
(table.notification_type == 'letter', table.billable_units),
]
).label('letter_page_count'),
func.sum(table.billable_units).label('billable_units'),
func.count().label('notifications_sent'),
Service.crown,
func.coalesce(table.postage, 'none').label('postage')
).filter(
table.status.in_(billable_type_list[notification_type]),
table.key_type != KEY_TYPE_TEST,
table.created_at >= start_date,
table.created_at < end_date,
table.notification_type == notification_type,
table.service_id == service_id
).group_by(
table.template_id,
table.service_id,
table.notification_type,
'sent_by',
'letter_page_count',
table.rate_multiplier,
table.international,
Service.crown,
table.postage,
).join(
Service
)
return query.all()
def get_rates_for_billing():
non_letter_rates = Rate.query.order_by(desc(Rate.valid_from)).all()
letter_rates = LetterRate.query.order_by(desc(LetterRate.start_date)).all()
@@ -265,6 +284,11 @@ def update_fact_billing(data, process_day):
data.letter_page_count,
data.postage)
billing_record = create_billing_record(data, rate, process_day)
FactBilling.query.filter(
FactBilling.bst_date == process_day
).delete()
table = FactBilling.__table__
'''
This uses the Postgres upsert to avoid race conditions when two threads try to insert

View File

@@ -9,23 +9,49 @@ from sqlalchemy.types import DateTime, Integer
from app import db
from app.models import (
Notification, NotificationHistory, FactNotificationStatus, KEY_TYPE_TEST, Service, Template,
NOTIFICATION_CANCELLED
EMAIL_TYPE,
FactNotificationStatus,
KEY_TYPE_TEST,
LETTER_TYPE,
Notification,
NotificationHistory,
NOTIFICATION_CANCELLED,
Service,
SMS_TYPE,
Template,
)
from app.utils import get_london_midnight_in_utc, midnight_n_days_ago, get_london_month_from_utc_column
def fetch_notification_status_for_day(process_day, service_id=None):
def fetch_notification_status_for_day(process_day):
start_date = convert_bst_to_utc(datetime.combine(process_day, time.min))
end_date = convert_bst_to_utc(datetime.combine(process_day + timedelta(days=1), time.min))
# use notification_history if process day is older than 7 days
# this is useful if we need to rebuild the ft_billing table for a date older than 7 days ago.
current_app.logger.info("Fetch ft_notification_status for {} to {}".format(start_date, end_date))
table = Notification
if start_date < datetime.utcnow() - timedelta(days=7):
table = NotificationHistory
transit_data = db.session.query(
all_data_for_process_day = []
service_ids = [x.id for x in Service.query.all()]
# for each service
# for each notification type
# query notifications for day
# if no rows try notificationHistory
for service_id in service_ids:
for notification_type in [EMAIL_TYPE, SMS_TYPE, LETTER_TYPE]:
table = Notification
data_for_service_and_type = query_for_fact_status_data(table, start_date, end_date, notification_type, service_id)
if len(data_for_service_and_type) == 0:
table = NotificationHistory
data_for_service_and_type = query_for_fact_status_data(table, start_date, end_date, notification_type, service_id)
all_data_for_process_day = all_data_for_process_day + data_for_service_and_type
return all_data_for_process_day
def query_for_fact_status_data(table, start_date, end_date, notification_type, service_id):
query = db.session.query(
table.template_id,
table.service_id,
func.coalesce(table.job_id, '00000000-0000-0000-0000-000000000000').label('job_id'),
@@ -35,7 +61,9 @@ def fetch_notification_status_for_day(process_day, service_id=None):
func.count().label('notification_count')
).filter(
table.created_at >= start_date,
table.created_at < end_date
table.created_at < end_date,
table.notification_type == notification_type,
table.service_id == service_id
).group_by(
table.template_id,
table.service_id,
@@ -44,11 +72,7 @@ def fetch_notification_status_for_day(process_day, service_id=None):
table.key_type,
table.status
)
if service_id:
transit_data = transit_data.filter(table.service_id == service_id)
return transit_data.all()
return query.all()
def update_fact_notification_status(data, process_day):
@@ -56,6 +80,7 @@ def update_fact_notification_status(data, process_day):
FactNotificationStatus.query.filter(
FactNotificationStatus.bst_date == process_day
).delete()
for row in data:
stmt = insert(table).values(
bst_date=process_day,

View File

@@ -18,7 +18,10 @@ from app.dao.fact_notification_status_dao import (
from app.models import FactNotificationStatus, KEY_TYPE_TEST, KEY_TYPE_TEAM, EMAIL_TYPE, SMS_TYPE, LETTER_TYPE
from freezegun import freeze_time
from tests.app.db import create_notification, create_service, create_template, create_ft_notification_status, create_job
from tests.app.db import (
create_notification, create_service, create_template, create_ft_notification_status,
create_job, create_notification_history
)
def test_update_fact_notification_status(notify_db_session):
@@ -31,8 +34,9 @@ def test_update_fact_notification_status(notify_db_session):
create_notification(template=first_template, status='delivered')
create_notification(template=first_template, created_at=datetime.utcnow() - timedelta(days=1))
create_notification(template=second_template, status='temporary-failure')
create_notification(template=second_template, created_at=datetime.utcnow() - timedelta(days=1))
# simulate a service with data retention - data has been moved to history and does not exist in notifications
create_notification_history(template=second_template, status='temporary-failure')
create_notification_history(template=second_template, created_at=datetime.utcnow() - timedelta(days=1))
create_notification(template=third_template, status='created')
create_notification(template=third_template, created_at=datetime.utcnow() - timedelta(days=1))