Created a task to update ft_notification_status for the last three days.

This commit is contained in:
Rebecca Law
2018-06-20 16:45:20 +01:00
parent 5cb5d017b9
commit 709a6c38c7
4 changed files with 234 additions and 6 deletions

View File

@@ -8,6 +8,7 @@ from app.dao.fact_billing_dao import (
fetch_billing_data_for_day,
update_fact_billing
)
from app.dao.fact_notification_status_dao import fetch_notification_status_for_day, update_fact_notification_status
@notify_celery.task(name="create-nightly-billing")
@@ -30,3 +31,24 @@ def create_nightly_billing(day_start=None):
current_app.logger.info(
"create-nightly-billing task complete. {} rows updated for day: {}".format(len(transit_data), process_day))
@notify_celery.task(name="create-nightly-notification-status")
@statsd(namespace="tasks")
def create_nightly_notification_status(day_start=None):
# day_start is a datetime.date() object. e.g.
# 3 days of data counting back from day_start is consolidated
if day_start is None:
day_start = datetime.today() - timedelta(days=1)
else:
# When calling the task its a string in the format of "YYYY-MM-DD"
day_start = datetime.strptime(day_start, "%Y-%m-%d")
for i in range(0, 3):
process_day = day_start - timedelta(days=i)
transit_data = fetch_notification_status_for_day(process_day=process_day)
update_fact_notification_status(transit_data, process_day)
current_app.logger.info(
"create-nightly-notification-status task: {} rows updated for day: {}")

View File

@@ -0,0 +1,76 @@
from datetime import datetime, timedelta, time
from flask import current_app
from sqlalchemy import func
from sqlalchemy.dialects.postgresql import insert
from app import db
from app.models import Notification, NotificationHistory, FactNotificationStatus
from app.utils import convert_bst_to_utc
def fetch_notification_status_for_day(process_day, service_id=None):
start_date = convert_bst_to_utc(datetime.combine(process_day, time.min))
end_date = convert_bst_to_utc(datetime.combine(process_day + timedelta(days=1), time.min))
# use notification_history if process day is older than 7 days
# this is useful if we need to rebuild the ft_billing table for a date older than 7 days ago.
current_app.logger.info("Fetch ft_notification_status for {} to {}".format(start_date, end_date))
table = Notification
if start_date < datetime.utcnow() - timedelta(days=7):
table = NotificationHistory
transit_data = db.session.query(
table.template_id,
table.service_id,
func.coalesce(table.job_id, '00000000-0000-0000-0000-000000000000').label('job_id'),
table.notification_type,
table.key_type,
table.status,
func.count().label('notification_count')
).filter(
table.created_at >= start_date,
table.created_at < end_date
).group_by(
table.template_id,
table.service_id,
'job_id',
table.notification_type,
table.key_type,
table.status
)
if service_id:
transit_data = transit_data.filter(table.service_id == service_id)
return transit_data.all()
def update_fact_notification_status(data, process_day):
table = FactNotificationStatus.__table__
'''
This uses the Postgres upsert to avoid race conditions when two threads try to insert
at the same row. The excluded object refers to values that we tried to insert but were
rejected.
http://docs.sqlalchemy.org/en/latest/dialects/postgresql.html#insert-on-conflict-upsert
'''
for row in data:
stmt = insert(table).values(
bst_date=process_day.date(),
template_id=row.template_id,
service_id=row.service_id,
job_id=row.job_id,
notification_type=row.notification_type,
key_type=row.key_type,
notification_status=row.status,
notification_count=row.notification_count,
)
stmt = stmt.on_conflict_do_update(
constraint="ft_notification_status_pkey",
set_={"notification_count": stmt.excluded.notification_count,
"updated_at": datetime.utcnow()
}
)
db.session.connection().execute(stmt)
db.session.commit()