Ben Thorner 3007ff8f04 Investigate duplicate tasks
This proves that a task can be duplicated **even** when the prefetch
multiplier [1] is set to 1. This is because prefetched tasks are still
present on the SQS queue and subject to the visibility timeout, whereas
tasks in progress are not (deleted, unless acks_late is set [2]).

I get the same results with:

- A concurrency of 2 processes; or
- A concurrency of 1 and 2 workers

This is consistent with [3], which reinforces that there's no way to
prevent this behaviour for successive, long-running tasks. Arguably the
current behaviour - without acks_late - is worse because the task that
gets duplicated is not the task at "fault" (the long-running one).

Suggested actions
-----------------

Keep the worker as-is: the settings are appropriate for long-running
tasks and do help minimise any duplication. Turning on acks_late has a
risk of creating new kinds of duplication - may be worse than now.

Continue other work to parallelise and optimise long-running tasks, so
that the visibility timeout is never an issue in the first place.

Demo of task duplication
------------------------

In order to do the demo we need:

- 3 tasks and 2 proceses: the hypothesis is that process #1 will pick
up tasks #1 and #2, with task #2 "timing out".

- To poll more often than the timeout, so that process #2 will pick
up (duplicate) task #2 after it finishes #3.

Alternatively, use two workers:

    source environment.sh && celery -A run_celery.notify_celery worker --loglevel=INFO --concurrency=1

    source environment.sh && celery -A run_celery.notify_celery worker --loglevel=INFO --concurrency=1

Command to schedule the tasks:

    from app.celery.reporting_tasks import long_task
    [long_task.apply_async(queue='reporting-tasks') for i in [1,2,3]]

Logs that show the duplication:

    [2022-01-12 16:50:09,333: INFO/MainProcess] Task long[4c23eaf0-69fc-4951-b215-437d79e30462] received
    [2022-01-12 16:50:09,901: INFO/ForkPoolWorker-1] Running long_task
    [2022-01-12 16:50:10,208: INFO/MainProcess] Task long[7c22c30d-e61d-4d2d-834c-7dae56b20064] received
    [2022-01-12 16:50:10,253: INFO/ForkPoolWorker-2] Running long_task
    [2022-01-12 16:50:11,373: INFO/MainProcess] Task long[67e5526d-4a82-430b-8eb5-b8022a20bf70] received
    [2022-01-12 16:50:21,410: INFO/MainProcess] Task long[67e5526d-4a82-430b-8eb5-b8022a20bf70] received
    [2022-01-12 16:50:30,499: INFO/ForkPoolWorker-3] Running long_task
    [2022-01-12 16:50:36,555: INFO/ForkPoolWorker-4] Running long_task

Note that when the first two tasks are received varies: I've seen them
received separately and at the same time.

[1]: https://docs.celeryproject.org/en/stable/userguide/configuration.html#std-setting-worker_prefetch_multiplier
[2]: https://docs.celeryproject.org/en/stable/userguide/tasks.html#Task.acks_late
[3]: https://github.com/celery/celery/issues/2788#issuecomment-136273421
2022-01-13 09:58:12 +00:00
2022-01-13 09:58:12 +00:00
2021-12-14 14:02:28 +00:00
2021-11-11 13:54:21 +00:00
2022-01-13 09:58:12 +00:00
2021-12-20 16:45:47 +00:00
2021-12-20 16:45:47 +00:00
2022-01-13 09:58:12 +00:00
2021-11-11 13:54:14 +00:00

GOV.UK Notify API

Contains:

  • the public-facing REST API for GOV.UK Notify, which teams can integrate with using our clients
  • an internal-only REST API built using Flask to manage services, users, templates, etc (this is what the admin app talks to)
  • asynchronous workers built using Celery to put things on queues and read them off to be processed, sent to providers, updated, etc

Setting Up

Python version

We run python 3.9 both locally and in production.

pycurl

See https://github.com/alphagov/notifications-manuals/wiki/Getting-started#pycurl

AWS credentials

To run the API you will need appropriate AWS credentials. See the Wiki for more details.

environment.sh

Creating and edit an environment.sh file.

echo "
export NOTIFY_ENVIRONMENT='development'

export MMG_API_KEY='MMG_API_KEY'
export FIRETEXT_API_KEY='FIRETEXT_ACTUAL_KEY'
export NOTIFICATION_QUEUE_PREFIX='YOUR_OWN_PREFIX'

export FLASK_APP=application.py
export FLASK_ENV=development
export WERKZEUG_DEBUG_PIN=off
"> environment.sh

Things to change:

  • Replace YOUR_OWN_PREFIX with local_dev_<first name>.
  • Run the following in the credentials repo to get the API keys.
notify-pass credentials/providers/api_keys

Postgres

Install Postgres.app.

Currently the API works with PostgreSQL 11. After installation, open the Postgres app, open the sidebar, and update or replace the default server with a compatible version.

Note: you may need to add the following directory to your PATH in order to bootstrap the app.

export PATH=${PATH}:/Applications/Postgres.app/Contents/Versions/11/bin/

Redis

To switch redis on you'll need to install it locally. On a OSX we've used brew for this. To use redis caching you need to switch it on by changing the config for development:

    REDIS_ENABLED = True

To run the application

# install dependencies, etc.
make bootstrap

# run the web app
make run-flask

# run the background tasks
make run-celery

# run scheduled tasks (optional)
make run-celery-beat

To test the application

# install dependencies, etc.
make bootstrap

make test

To run one off tasks

Tasks are run through the flask command - run flask --help for more information. There are two sections we need to care about: flask db contains alembic migration commands, and flask command contains all of our custom commands. For example, to purge all dynamically generated functional test data, do the following:

Locally

flask command purge_functional_test_data -u <functional tests user name prefix>

On the server

cf run-task notify-api "flask command purge_functional_test_data -u <functional tests user name prefix>"

All commands and command options have a --help command if you need more information.

Further documentation

Description
The API powering Notify.gov
Readme 88 MiB
Languages
Python 98.5%
HCL 0.6%
Jinja 0.5%
Shell 0.3%
Makefile 0.1%