Files
notifications-api/app/celery/reporting_tasks.py
Ben Thorner 3007ff8f04 Investigate duplicate tasks
This proves that a task can be duplicated **even** when the prefetch
multiplier [1] is set to 1. This is because prefetched tasks are still
present on the SQS queue and subject to the visibility timeout, whereas
tasks in progress are not (deleted, unless acks_late is set [2]).

I get the same results with:

- A concurrency of 2 processes; or
- A concurrency of 1 and 2 workers

This is consistent with [3], which reinforces that there's no way to
prevent this behaviour for successive, long-running tasks. Arguably the
current behaviour - without acks_late - is worse because the task that
gets duplicated is not the task at "fault" (the long-running one).

Suggested actions
-----------------

Keep the worker as-is: the settings are appropriate for long-running
tasks and do help minimise any duplication. Turning on acks_late has a
risk of creating new kinds of duplication - may be worse than now.

Continue other work to parallelise and optimise long-running tasks, so
that the visibility timeout is never an issue in the first place.

Demo of task duplication
------------------------

In order to do the demo we need:

- 3 tasks and 2 proceses: the hypothesis is that process #1 will pick
up tasks #1 and #2, with task #2 "timing out".

- To poll more often than the timeout, so that process #2 will pick
up (duplicate) task #2 after it finishes #3.

Alternatively, use two workers:

    source environment.sh && celery -A run_celery.notify_celery worker --loglevel=INFO --concurrency=1

    source environment.sh && celery -A run_celery.notify_celery worker --loglevel=INFO --concurrency=1

Command to schedule the tasks:

    from app.celery.reporting_tasks import long_task
    [long_task.apply_async(queue='reporting-tasks') for i in [1,2,3]]

Logs that show the duplication:

    [2022-01-12 16:50:09,333: INFO/MainProcess] Task long[4c23eaf0-69fc-4951-b215-437d79e30462] received
    [2022-01-12 16:50:09,901: INFO/ForkPoolWorker-1] Running long_task
    [2022-01-12 16:50:10,208: INFO/MainProcess] Task long[7c22c30d-e61d-4d2d-834c-7dae56b20064] received
    [2022-01-12 16:50:10,253: INFO/ForkPoolWorker-2] Running long_task
    [2022-01-12 16:50:11,373: INFO/MainProcess] Task long[67e5526d-4a82-430b-8eb5-b8022a20bf70] received
    [2022-01-12 16:50:21,410: INFO/MainProcess] Task long[67e5526d-4a82-430b-8eb5-b8022a20bf70] received
    [2022-01-12 16:50:30,499: INFO/ForkPoolWorker-3] Running long_task
    [2022-01-12 16:50:36,555: INFO/ForkPoolWorker-4] Running long_task

Note that when the first two tasks are received varies: I've seen them
received separately and at the same time.

[1]: https://docs.celeryproject.org/en/stable/userguide/configuration.html#std-setting-worker_prefetch_multiplier
[2]: https://docs.celeryproject.org/en/stable/userguide/tasks.html#Task.acks_late
[3]: https://github.com/celery/celery/issues/2788#issuecomment-136273421
2022-01-13 09:58:12 +00:00

147 lines
5.6 KiB
Python

from datetime import datetime, timedelta
from flask import current_app
from notifications_utils.timezones import convert_utc_to_bst
from app import notify_celery
from app.config import QueueNames
from app.cronitor import cronitor
from app.dao.fact_billing_dao import (
fetch_billing_data_for_day,
update_fact_billing,
)
from app.dao.fact_notification_status_dao import (
fetch_notification_status_for_day,
update_fact_notification_status,
)
from app.models import EMAIL_TYPE, LETTER_TYPE, SMS_TYPE
@notify_celery.task(name='long')
def long_task():
from time import sleep
current_app.logger.info("Running long_task")
sleep(20)
@notify_celery.task(name="create-nightly-billing")
@cronitor("create-nightly-billing")
def create_nightly_billing(day_start=None):
# day_start is a datetime.date() object. e.g.
# up to 4 days of data counting back from day_start is consolidated
if day_start is None:
day_start = convert_utc_to_bst(datetime.utcnow()).date() - timedelta(days=1)
else:
# When calling the task its a string in the format of "YYYY-MM-DD"
day_start = datetime.strptime(day_start, "%Y-%m-%d").date()
for i in range(0, 4):
process_day = (day_start - timedelta(days=i)).isoformat()
create_nightly_billing_for_day.apply_async(
kwargs={'process_day': process_day},
queue=QueueNames.REPORTING
)
current_app.logger.info(
f"create-nightly-billing task: create-nightly-billing-for-day task created for {process_day}"
)
@notify_celery.task(name="create-nightly-billing-for-day")
def create_nightly_billing_for_day(process_day):
process_day = datetime.strptime(process_day, "%Y-%m-%d").date()
current_app.logger.info(
f'create-nightly-billing-for-day task for {process_day}: started'
)
start = datetime.utcnow()
transit_data = fetch_billing_data_for_day(process_day=process_day)
end = datetime.utcnow()
current_app.logger.info(
f'create-nightly-billing-for-day task for {process_day}: data fetched in {(end - start).seconds} seconds'
)
for data in transit_data:
update_fact_billing(data, process_day)
current_app.logger.info(
f"create-nightly-billing-for-day task for {process_day}: "
f"task complete. {len(transit_data)} rows updated"
)
@notify_celery.task(name="create-nightly-notification-status")
@cronitor("create-nightly-notification-status")
def create_nightly_notification_status():
"""
Aggregate notification statuses into rows in ft_notification_status.
In order to minimise effort, this task assumes that:
- Email + SMS statuses don't change after 3 days. This is currently true
because all outstanding email / SMS are "timed out" after 3 days, and
we reject delivery receipts after this point.
- Letter statuses don't change after 9 days. There's no "timeout" for
letters but this is the longest we've had to cope with in the past - due
to major issues with our print provider.
Because the time range of the task exceeds the minimum possible retention
period (3 days), we need to choose which table to query for each service.
The aggregation happens for 1 extra day in case:
- This task or the "timeout" task fails to run.
- Data is (somehow) still in transit to the history table, which would
mean the aggregated results are temporarily incorrect.
"""
yesterday = convert_utc_to_bst(datetime.utcnow()).date() - timedelta(days=1)
# email and sms
for i in range(4):
process_day = yesterday - timedelta(days=i)
for notification_type in [SMS_TYPE, EMAIL_TYPE]:
create_nightly_notification_status_for_day.apply_async(
kwargs={'process_day': process_day.isoformat(), 'notification_type': notification_type},
queue=QueueNames.REPORTING
)
current_app.logger.info(
f"create-nightly-notification-status task: create-nightly-notification-status-for-day task created "
f"for type {notification_type} for {process_day}"
)
# letters
for i in range(10):
process_day = yesterday - timedelta(days=i)
create_nightly_notification_status_for_day.apply_async(
kwargs={'process_day': process_day.isoformat(), 'notification_type': LETTER_TYPE},
queue=QueueNames.REPORTING
)
current_app.logger.info(
f"create-nightly-notification-status task: create-nightly-notification-status-for-day task created "
f"for type letter for {process_day}"
)
@notify_celery.task(name="create-nightly-notification-status-for-day")
def create_nightly_notification_status_for_day(process_day, notification_type):
process_day = datetime.strptime(process_day, "%Y-%m-%d").date()
current_app.logger.info(
f'create-nightly-notification-status-for-day task for {process_day} type {notification_type}: started'
)
start = datetime.utcnow()
transit_data = fetch_notification_status_for_day(process_day=process_day, notification_type=notification_type)
end = datetime.utcnow()
current_app.logger.info(
f'create-nightly-notification-status-for-day task for {process_day} type {notification_type}: '
f'data fetched in {(end - start).seconds} seconds'
)
update_fact_notification_status(transit_data, process_day, notification_type)
current_app.logger.info(
f'create-nightly-notification-status-for-day task for {process_day} type {notification_type}: '
f'task complete - {len(transit_data)} rows updated'
)