diff --git a/app/celery/reporting_tasks.py b/app/celery/reporting_tasks.py index 572625e35..2b90eb623 100644 --- a/app/celery/reporting_tasks.py +++ b/app/celery/reporting_tasks.py @@ -8,6 +8,7 @@ from app.dao.fact_billing_dao import ( fetch_billing_data_for_day, update_fact_billing ) +from app.dao.fact_notification_status_dao import fetch_notification_status_for_day, update_fact_notification_status @notify_celery.task(name="create-nightly-billing") @@ -30,3 +31,24 @@ def create_nightly_billing(day_start=None): current_app.logger.info( "create-nightly-billing task complete. {} rows updated for day: {}".format(len(transit_data), process_day)) + + +@notify_celery.task(name="create-nightly-notification-status") +@statsd(namespace="tasks") +def create_nightly_notification_status(day_start=None): + # day_start is a datetime.date() object. e.g. + # 3 days of data counting back from day_start is consolidated + if day_start is None: + day_start = datetime.today() - timedelta(days=1) + else: + # When calling the task its a string in the format of "YYYY-MM-DD" + day_start = datetime.strptime(day_start, "%Y-%m-%d") + for i in range(0, 3): + process_day = day_start - timedelta(days=i) + + transit_data = fetch_notification_status_for_day(process_day=process_day) + + update_fact_notification_status(transit_data, process_day) + + current_app.logger.info( + "create-nightly-notification-status task: {} rows updated for day: {}") diff --git a/app/dao/fact_notification_status_dao.py b/app/dao/fact_notification_status_dao.py new file mode 100644 index 000000000..4e7a6a035 --- /dev/null +++ b/app/dao/fact_notification_status_dao.py @@ -0,0 +1,76 @@ +from datetime import datetime, timedelta, time + +from flask import current_app +from sqlalchemy import func +from sqlalchemy.dialects.postgresql import insert + +from app import db +from app.models import Notification, NotificationHistory, FactNotificationStatus +from app.utils import convert_bst_to_utc + + +def fetch_notification_status_for_day(process_day, service_id=None): + start_date = convert_bst_to_utc(datetime.combine(process_day, time.min)) + end_date = convert_bst_to_utc(datetime.combine(process_day + timedelta(days=1), time.min)) + # use notification_history if process day is older than 7 days + # this is useful if we need to rebuild the ft_billing table for a date older than 7 days ago. + current_app.logger.info("Fetch ft_notification_status for {} to {}".format(start_date, end_date)) + table = Notification + if start_date < datetime.utcnow() - timedelta(days=7): + table = NotificationHistory + + transit_data = db.session.query( + table.template_id, + table.service_id, + func.coalesce(table.job_id, '00000000-0000-0000-0000-000000000000').label('job_id'), + table.notification_type, + table.key_type, + table.status, + func.count().label('notification_count') + ).filter( + table.created_at >= start_date, + table.created_at < end_date + ).group_by( + table.template_id, + table.service_id, + 'job_id', + table.notification_type, + table.key_type, + table.status + ) + + if service_id: + transit_data = transit_data.filter(table.service_id == service_id) + + return transit_data.all() + + +def update_fact_notification_status(data, process_day): + table = FactNotificationStatus.__table__ + ''' + This uses the Postgres upsert to avoid race conditions when two threads try to insert + at the same row. The excluded object refers to values that we tried to insert but were + rejected. + http://docs.sqlalchemy.org/en/latest/dialects/postgresql.html#insert-on-conflict-upsert + ''' + for row in data: + stmt = insert(table).values( + bst_date=process_day.date(), + template_id=row.template_id, + service_id=row.service_id, + job_id=row.job_id, + notification_type=row.notification_type, + key_type=row.key_type, + notification_status=row.status, + notification_count=row.notification_count, + ) + + stmt = stmt.on_conflict_do_update( + constraint="ft_notification_status_pkey", + set_={"notification_count": stmt.excluded.notification_count, + "updated_at": datetime.utcnow() + } + ) + db.session.connection().execute(stmt) + db.session.commit() + diff --git a/tests/app/celery/test_reporting_tasks.py b/tests/app/celery/test_reporting_tasks.py index c0f105f7c..eb0292f74 100644 --- a/tests/app/celery/test_reporting_tasks.py +++ b/tests/app/celery/test_reporting_tasks.py @@ -1,12 +1,14 @@ from datetime import datetime, timedelta, date from tests.app.conftest import sample_notification -from app.celery.reporting_tasks import create_nightly_billing +from app.celery.reporting_tasks import create_nightly_billing, create_nightly_notification_status from app.dao.fact_billing_dao import get_rate -from app.models import (FactBilling, - Notification, - LETTER_TYPE, - EMAIL_TYPE, - SMS_TYPE) +from app.models import ( + FactBilling, + Notification, + LETTER_TYPE, + EMAIL_TYPE, + SMS_TYPE, FactNotificationStatus +) from decimal import Decimal import pytest from app.dao.letter_rate_dao import dao_create_letter_rate @@ -15,6 +17,8 @@ from app import db from freezegun import freeze_time from sqlalchemy import desc +from tests.app.db import create_service, create_template, create_notification + def test_reporting_should_have_decorated_tasks_functions(): assert create_nightly_billing.__wrapped__.__name__ == 'create_nightly_billing' @@ -467,3 +471,46 @@ def test_create_nightly_billing_update_when_record_exists( assert len(records) == 1 assert records[0].billable_units == 2 assert records[0].updated_at + + +def test_create_nightly_notification_status(notify_db_session): + first_service = create_service(service_name='First Service') + first_template = create_template(service=first_service) + second_service = create_service(service_name='second Service') + second_template = create_template(service=second_service, template_type='email') + third_service = create_service(service_name='third Service') + third_template = create_template(service=third_service, template_type='letter') + + create_notification(template=first_template, status='delivered') + create_notification(template=first_template, status='delivered', created_at=datetime.utcnow() - timedelta(days=1)) + create_notification(template=first_template, status='delivered', created_at=datetime.utcnow() - timedelta(days=2)) + create_notification(template=first_template, status='delivered', created_at=datetime.utcnow() - timedelta(days=3)) + create_notification(template=first_template, status='delivered', created_at=datetime.utcnow() - timedelta(days=4)) + + create_notification(template=second_template, status='temporary-failure') + create_notification(template=second_template, status='temporary-failure', + created_at=datetime.utcnow() - timedelta(days=1)) + create_notification(template=second_template, status='temporary-failure', + created_at=datetime.utcnow() - timedelta(days=2)) + create_notification(template=second_template, status='temporary-failure', + created_at=datetime.utcnow() - timedelta(days=3)) + create_notification(template=second_template, status='temporary-failure', + created_at=datetime.utcnow() - timedelta(days=4)) + + create_notification(template=third_template, status='created') + create_notification(template=third_template, status='created', created_at=datetime.utcnow() - timedelta(days=1)) + create_notification(template=third_template, status='created', created_at=datetime.utcnow() - timedelta(days=2)) + create_notification(template=third_template, status='created', created_at=datetime.utcnow() - timedelta(days=3)) + create_notification(template=third_template, status='created', created_at=datetime.utcnow() - timedelta(days=4)) + + assert len(FactNotificationStatus.query.all()) == 0 + + create_nightly_notification_status() + new_data = FactNotificationStatus.query.order_by( + FactNotificationStatus.bst_date, + FactNotificationStatus.notification_type + ).all() + assert len(new_data) == 9 + assert str(new_data[0].bst_date) == datetime.strftime(datetime.utcnow() - timedelta(days=3), "%Y-%m-%d") + assert str(new_data[3].bst_date) == datetime.strftime(datetime.utcnow() - timedelta(days=2), "%Y-%m-%d") + assert str(new_data[6].bst_date) == datetime.strftime(datetime.utcnow() - timedelta(days=1), "%Y-%m-%d") diff --git a/tests/app/dao/test_fact_notification_status_dao.py b/tests/app/dao/test_fact_notification_status_dao.py new file mode 100644 index 000000000..24cafe951 --- /dev/null +++ b/tests/app/dao/test_fact_notification_status_dao.py @@ -0,0 +1,83 @@ +from datetime import timedelta, datetime +from uuid import UUID + +from app.dao.fact_notification_status_dao import update_fact_notification_status, fetch_notification_status_for_day +from app.models import FactNotificationStatus +from tests.app.db import create_notification, create_service, create_template + + +def test_update_fact_notification_status(notify_db_session): + first_service = create_service(service_name='First Service') + first_template = create_template(service=first_service) + second_service = create_service(service_name='second Service') + second_template = create_template(service=second_service, template_type='email') + third_service = create_service(service_name='third Service') + third_template = create_template(service=third_service, template_type='letter') + + create_notification(template=first_template, status='delivered') + create_notification(template=first_template, created_at=datetime.utcnow() - timedelta(days=1)) + create_notification(template=second_template, status='temporary-failure') + create_notification(template=second_template, created_at=datetime.utcnow() - timedelta(days=1)) + create_notification(template=third_template, status='created') + create_notification(template=third_template, created_at=datetime.utcnow() - timedelta(days=1)) + + process_day = datetime.utcnow() + data = fetch_notification_status_for_day(process_day=process_day) + update_fact_notification_status(data=data, process_day=process_day) + + new_fact_data = FactNotificationStatus.query.order_by(FactNotificationStatus.bst_date, + FactNotificationStatus.notification_type + ).all() + + assert len(new_fact_data) == 3 + assert new_fact_data[0].bst_date == process_day.date() + assert new_fact_data[0].template_id == second_template.id + assert new_fact_data[0].service_id == second_service.id + assert new_fact_data[0].job_id == UUID('00000000-0000-0000-0000-000000000000') + assert new_fact_data[0].notification_type == 'email' + assert new_fact_data[0].notification_status == 'temporary-failure' + assert new_fact_data[0].notification_count == 1 + + assert new_fact_data[1].bst_date == process_day.date() + assert new_fact_data[1].template_id == third_template.id + assert new_fact_data[1].service_id == third_service.id + assert new_fact_data[1].job_id == UUID('00000000-0000-0000-0000-000000000000') + assert new_fact_data[1].notification_type == 'letter' + assert new_fact_data[1].notification_status == 'created' + assert new_fact_data[1].notification_count == 1 + + assert new_fact_data[2].bst_date == process_day.date() + assert new_fact_data[2].template_id == first_template.id + assert new_fact_data[2].service_id == first_service.id + assert new_fact_data[2].job_id == UUID('00000000-0000-0000-0000-000000000000') + assert new_fact_data[2].notification_type == 'sms' + assert new_fact_data[2].notification_status == 'delivered' + assert new_fact_data[2].notification_count == 1 + + +def test__update_fact_notification_status_updates_row(notify_db_session): + first_service = create_service(service_name='First Service') + first_template = create_template(service=first_service) + create_notification(template=first_template, status='delivered') + + process_day = datetime.utcnow() + data = fetch_notification_status_for_day(process_day=process_day) + update_fact_notification_status(data=data, process_day=process_day) + + new_fact_data = FactNotificationStatus.query.order_by(FactNotificationStatus.bst_date, + FactNotificationStatus.notification_type + ).all() + assert len(new_fact_data) == 1 + assert new_fact_data[0].notification_count == 1 + + create_notification(template=first_template, status='delivered') + + data = fetch_notification_status_for_day(process_day=process_day) + update_fact_notification_status(data=data, process_day=process_day) + + updated_fact_data = FactNotificationStatus.query.order_by(FactNotificationStatus.bst_date, + FactNotificationStatus.notification_type + ).all() + assert len(updated_fact_data) == 1 + assert updated_fact_data[0].notification_count == 2 +