Parallelise status aggregation by service and day

This follows a similar approach as [1]. Recently we've seen lots
of errors from this task, which we think are a consequence of it
doing too much work and tripping Celery's visibility timeout.

While we can optimise the query [2], it's likely the errors will
return as the number of live services grows. Parallelising the
aggregation now will make it more futureproof.

[1]: https://github.com/alphagov/notifications-api/pull/3397
[2]: https://github.com/alphagov/notifications-api/pull/3417
This commit is contained in:
Ben Thorner
2022-01-11 17:01:06 +00:00
parent c3da139e9c
commit 9182ebf4e5
2 changed files with 65 additions and 53 deletions

View File

@@ -91,54 +91,58 @@ def create_nightly_notification_status():
yesterday = convert_utc_to_bst(datetime.utcnow()).date() - timedelta(days=1)
for notification_type in [SMS_TYPE, EMAIL_TYPE, LETTER_TYPE]:
days = 10 if notification_type == LETTER_TYPE else 4
for (service_id,) in db.session.query(Service.id):
for notification_type in [SMS_TYPE, EMAIL_TYPE, LETTER_TYPE]:
days = 10 if notification_type == LETTER_TYPE else 4
for i in range(days):
process_day = yesterday - timedelta(days=i)
for i in range(days):
process_day = yesterday - timedelta(days=i)
create_nightly_notification_status_for_day.apply_async(
kwargs={'process_day': process_day.isoformat(), 'notification_type': notification_type},
queue=QueueNames.REPORTING
)
current_app.logger.info(
f"create-nightly-notification-status-for-day task created "
f"for {notification_type} and {process_day}"
)
create_nightly_notification_status_for_service_and_day.apply_async(
kwargs={
'process_day': process_day.isoformat(),
'notification_type': notification_type,
'service_id': service_id,
},
queue=QueueNames.REPORTING
)
current_app.logger.info(
f"create-nightly-notification-status-for-day task created "
f"for {service_id}, {notification_type} and {process_day}"
)
@notify_celery.task(name="create-nightly-notification-status-for-day")
def create_nightly_notification_status_for_day(process_day, notification_type):
@notify_celery.task(name="create-nightly-notification-status-for-service-and-day")
def create_nightly_notification_status_for_service_and_day(process_day, service_id, notification_type):
process_day = datetime.strptime(process_day, "%Y-%m-%d").date()
current_app.logger.info(
f'create-nightly-notification-status-for-day task started '
f'for {notification_type} and {process_day}'
f'for {service_id}, {notification_type} and {process_day}'
)
for (service_id,) in db.session.query(Service.id):
start = datetime.utcnow()
transit_data = fetch_status_data_for_service_and_day(
process_day=process_day,
notification_type=notification_type,
service_id=service_id,
)
start = datetime.utcnow()
transit_data = fetch_status_data_for_service_and_day(
process_day=process_day,
notification_type=notification_type,
service_id=service_id,
)
end = datetime.utcnow()
current_app.logger.info(
f'create-nightly-notification-status-for-day task fetch '
f'for {process_day} and {notification_type}: '
f'data fetched in {(end - start).seconds} seconds'
)
end = datetime.utcnow()
current_app.logger.info(
f'create-nightly-notification-status-for-day task fetch '
f'for {service_id}, {process_day} and {notification_type}: '
f'data fetched in {(end - start).seconds} seconds'
)
update_fact_notification_status(
transit_data=transit_data,
process_day=process_day,
notification_type=notification_type,
service_id=service_id
)
update_fact_notification_status(
transit_data=transit_data,
process_day=process_day,
notification_type=notification_type,
service_id=service_id
)
current_app.logger.info(
f'create-nightly-notification-status-for-day task finished '
f'for {process_day} and {notification_type}: '
f'{len(transit_data)} rows updated'
)
current_app.logger.info(
f'create-nightly-notification-status-for-day task finished '
f'for {service_id}, {process_day} and {notification_type}: '
f'{len(transit_data)} rows updated'
)