Merge pull request #3454 from alphagov/upsert-status-180693991

Rewrite status aggregation to be a bulk upsert
This commit is contained in:
Ben Thorner
2022-02-17 13:21:50 +00:00
committed by GitHub
4 changed files with 72 additions and 69 deletions

View File

@@ -10,10 +10,7 @@ from app.dao.fact_billing_dao import (
fetch_billing_data_for_day,
update_fact_billing,
)
from app.dao.fact_notification_status_dao import (
fetch_status_data_for_service_and_day,
update_fact_notification_status,
)
from app.dao.fact_notification_status_dao import update_fact_notification_status
from app.dao.notifications_dao import get_service_ids_with_notifications_on_date
from app.models import EMAIL_TYPE, LETTER_TYPE, SMS_TYPE
@@ -117,23 +114,8 @@ def create_nightly_notification_status():
def create_nightly_notification_status_for_service_and_day(process_day, service_id, notification_type):
process_day = datetime.strptime(process_day, "%Y-%m-%d").date()
start = datetime.utcnow()
new_status_rows = fetch_status_data_for_service_and_day(
process_day=process_day,
notification_type=notification_type,
service_id=service_id,
)
end = datetime.utcnow()
current_app.logger.info(
f'create-nightly-notification-status-for-service-and-day task fetch '
f'for {service_id}, {notification_type} for {process_day}: '
f'data fetched in {(end - start).seconds} seconds'
)
start = datetime.utcnow()
update_fact_notification_status(
new_status_rows=new_status_rows,
process_day=process_day,
notification_type=notification_type,
service_id=service_id
@@ -143,5 +125,5 @@ def create_nightly_notification_status_for_service_and_day(process_day, service_
current_app.logger.info(
f'create-nightly-notification-status-for-service-and-day task update '
f'for {service_id}, {notification_type} for {process_day}: '
f'data updated in {(end - start).seconds} seconds'
f'updated in {(end - start).seconds} seconds'
)

View File

@@ -34,56 +34,65 @@ from app.utils import (
)
def fetch_status_data_for_service_and_day(process_day, service_id, notification_type):
@autocommit
def update_fact_notification_status(process_day, notification_type, service_id):
start_date = get_london_midnight_in_utc(process_day)
end_date = get_london_midnight_in_utc(process_day + timedelta(days=1))
# query notifications or notification_history for the day, depending on their data retention
service = Service.query.get(service_id)
table = get_notification_table_to_use(service, notification_type, process_day, has_delete_task_run=False)
return db.session.query(
table.template_id,
func.coalesce(table.job_id, '00000000-0000-0000-0000-000000000000').label('job_id'),
table.key_type,
table.status,
func.count().label('notification_count')
).filter(
table.created_at >= start_date,
table.created_at < end_date,
table.notification_type == notification_type,
table.service_id == service_id,
table.key_type.in_((KEY_TYPE_NORMAL, KEY_TYPE_TEAM)),
).group_by(
table.template_id,
'job_id',
table.key_type,
table.status
).all()
@autocommit
def update_fact_notification_status(new_status_rows, process_day, notification_type, service_id):
table = FactNotificationStatus.__table__
# delete any existing rows in case some no longer exist e.g. if all messages are sent
FactNotificationStatus.query.filter(
FactNotificationStatus.bst_date == process_day,
FactNotificationStatus.notification_type == notification_type,
FactNotificationStatus.service_id == service_id,
).delete()
for row in new_status_rows:
db.session.connection().execute(
insert(table).values(
bst_date=process_day,
template_id=row.template_id,
service_id=service_id,
job_id=row.job_id,
notification_type=notification_type,
key_type=row.key_type,
notification_status=row.status,
notification_count=row.notification_count,
)
# query notifications or notification_history for the day, depending on their data retention
service = Service.query.get(service_id)
source_table = get_notification_table_to_use(
service,
notification_type,
process_day,
has_delete_task_run=False
)
query = db.session.query(
literal(process_day).label("process_day"),
source_table.template_id,
literal(service_id).label("service_id"),
func.coalesce(source_table.job_id, '00000000-0000-0000-0000-000000000000').label('job_id'),
literal(notification_type).label("notification_type"),
source_table.key_type,
source_table.status,
func.count().label('notification_count')
).filter(
source_table.created_at >= start_date,
source_table.created_at < end_date,
source_table.notification_type == notification_type,
source_table.service_id == service_id,
source_table.key_type.in_((KEY_TYPE_NORMAL, KEY_TYPE_TEAM)),
).group_by(
source_table.template_id,
source_table.template_id,
'job_id',
source_table.key_type,
source_table.status
)
db.session.connection().execute(
insert(FactNotificationStatus.__table__).from_select(
[
FactNotificationStatus.bst_date,
FactNotificationStatus.template_id,
FactNotificationStatus.service_id,
FactNotificationStatus.job_id,
FactNotificationStatus.notification_type,
FactNotificationStatus.key_type,
FactNotificationStatus.notification_status,
FactNotificationStatus.notification_count
],
query
)
)
def fetch_notification_status_for_service_by_month(start_date, end_date, service_id):