Update FactStatus table in bulk for each service

Previously we were looping over data from the Notifications/History
table and then shovelling it into the status table, one row at a time
- plus an extra delete to clean up any existing data.

This replaces that with a batch insertion, similar to how we archive
notifications [1], but using a simple subquery (via "from_select" [2])
instead of a temporary table.

To make the select compatible with the insert, I've used "literal"
to inject the constant pieces of data, so each row has everything it
needs to go into the status table.

[1]: 9ce6d2fe92/app/dao/notifications_dao.py (L295)
[2]: https://docs.sqlalchemy.org/en/14/core/dml.html#sqlalchemy.sql.expression.Insert.from_select
This commit is contained in:
Ben Thorner
2022-02-10 15:00:57 +00:00
parent efde271c5a
commit a69d1635a1
3 changed files with 58 additions and 64 deletions

View File

@@ -10,10 +10,7 @@ from app.dao.fact_billing_dao import (
fetch_billing_data_for_day,
update_fact_billing,
)
from app.dao.fact_notification_status_dao import (
fetch_status_data_for_service_and_day,
update_fact_notification_status,
)
from app.dao.fact_notification_status_dao import update_fact_notification_status
from app.dao.notifications_dao import get_service_ids_with_notifications_on_date
from app.models import EMAIL_TYPE, LETTER_TYPE, SMS_TYPE
@@ -117,23 +114,8 @@ def create_nightly_notification_status():
def create_nightly_notification_status_for_service_and_day(process_day, service_id, notification_type):
process_day = datetime.strptime(process_day, "%Y-%m-%d").date()
start = datetime.utcnow()
new_status_rows = fetch_status_data_for_service_and_day(
process_day=process_day,
notification_type=notification_type,
service_id=service_id,
)
end = datetime.utcnow()
current_app.logger.info(
f'create-nightly-notification-status-for-service-and-day task fetch '
f'for {service_id}, {notification_type} for {process_day}: '
f'data fetched in {(end - start).seconds} seconds'
)
start = datetime.utcnow()
update_fact_notification_status(
new_status_rows=new_status_rows,
process_day=process_day,
notification_type=notification_type,
service_id=service_id
@@ -143,5 +125,5 @@ def create_nightly_notification_status_for_service_and_day(process_day, service_
current_app.logger.info(
f'create-nightly-notification-status-for-service-and-day task update '
f'for {service_id}, {notification_type} for {process_day}: '
f'data updated in {(end - start).seconds} seconds'
f'updated in {(end - start).seconds} seconds'
)

View File

@@ -34,59 +34,66 @@ from app.utils import (
)
def fetch_status_data_for_service_and_day(process_day, service_id, notification_type):
@autocommit
def update_fact_notification_status(process_day, notification_type, service_id):
start_date = get_london_midnight_in_utc(process_day)
end_date = get_london_midnight_in_utc(process_day + timedelta(days=1))
# query notifications or notification_history for the day, depending on their data retention
service = Service.query.get(service_id)
table = get_notification_table_to_use(service, notification_type, process_day, has_delete_task_run=False)
source_table = get_notification_table_to_use(
service,
notification_type,
process_day,
has_delete_task_run=False
)
return db.session.query(
table.template_id,
func.coalesce(table.job_id, '00000000-0000-0000-0000-000000000000').label('job_id'),
table.key_type,
table.status,
query = db.session.query(
literal(process_day).label("process_day"),
source_table.template_id,
literal(service_id).label("service_id"),
func.coalesce(source_table.job_id, '00000000-0000-0000-0000-000000000000').label('job_id'),
literal(notification_type).label("notification_type"),
source_table.key_type,
source_table.status,
func.count().label('notification_count')
).filter(
table.created_at >= start_date,
table.created_at < end_date,
table.notification_type == notification_type,
table.service_id == service_id,
table.key_type.in_((KEY_TYPE_NORMAL, KEY_TYPE_TEAM)),
source_table.created_at >= start_date,
source_table.created_at < end_date,
source_table.notification_type == notification_type,
source_table.service_id == service_id,
source_table.key_type.in_((KEY_TYPE_NORMAL, KEY_TYPE_TEAM)),
).group_by(
table.template_id,
source_table.template_id,
source_table.template_id,
'job_id',
table.key_type,
table.status
).all()
source_table.key_type,
source_table.status
)
stmt = insert(FactNotificationStatus.__table__).from_select(
[
FactNotificationStatus.bst_date,
FactNotificationStatus.template_id,
FactNotificationStatus.service_id,
FactNotificationStatus.job_id,
FactNotificationStatus.notification_type,
FactNotificationStatus.key_type,
FactNotificationStatus.notification_status,
FactNotificationStatus.notification_count
],
query
)
@autocommit
def update_fact_notification_status(new_status_rows, process_day, notification_type, service_id):
table = FactNotificationStatus.__table__
stmt = stmt.on_conflict_do_update(
constraint="ft_notification_status_pkey",
set_={
FactNotificationStatus.notification_count: stmt.excluded.notification_count,
FactNotificationStatus.updated_at: datetime.utcnow()
}
)
for row in new_status_rows:
stmt = insert(table).values(
bst_date=process_day,
template_id=row.template_id,
service_id=service_id,
job_id=row.job_id,
notification_type=notification_type,
key_type=row.key_type,
notification_status=row.status,
notification_count=row.notification_count,
)
stmt = stmt.on_conflict_do_update(
constraint="ft_notification_status_pkey",
set_={
FactNotificationStatus.notification_count: stmt.excluded.notification_count,
FactNotificationStatus.updated_at: datetime.utcnow()
}
)
db.session.connection().execute(stmt)
db.session.connection().execute(stmt)
def fetch_notification_status_for_service_by_month(start_date, end_date, service_id):