Merge pull request #3355 from alphagov/celery-5-180017131

Upgrade to Celery 5
This commit is contained in:
Ben Thorner
2021-11-05 13:04:14 +00:00
committed by GitHub
8 changed files with 187 additions and 184 deletions

View File

@@ -22,6 +22,7 @@ def make_task(app):
class NotifyTask(Task):
abstract = True
start = None
typing = False
def on_success(self, retval, task_id, args, kwargs):
elapsed_time = time.monotonic() - self.start
@@ -81,11 +82,11 @@ class NotifyCelery(Celery):
def init_app(self, app):
super().__init__(
app.import_name,
broker=app.config['BROKER_URL'],
broker=app.config['CELERY']['broker_url'],
task_cls=make_task(app),
)
self.conf.update(app.config)
self.conf.update(app.config['CELERY'])
self._app = app
def send_task(self, name, args=None, kwargs=None, **other_kwargs):

View File

@@ -85,12 +85,12 @@ def get_pdf_for_templated_letter(self, notification_id):
args=(encrypted_data,),
queue=QueueNames.SANITISE_LETTERS
)
except Exception:
except Exception as e:
try:
current_app.logger.exception(
f"RETRY: calling create-letter-pdf task for notification {notification_id} failed"
)
self.retry(queue=QueueNames.RETRY)
self.retry(exc=e, queue=QueueNames.RETRY)
except self.MaxRetriesExceededError:
message = f"RETRY FAILED: Max retries reached. " \
f"The task create-letter-pdf failed for notification id {notification_id}. " \

View File

@@ -187,156 +187,160 @@ class Config(object):
# we only need real email in Live environment (production)
DVLA_EMAIL_ADDRESSES = json.loads(os.environ.get('DVLA_EMAIL_ADDRESSES', '[]'))
BROKER_URL = 'sqs://'
BROKER_TRANSPORT_OPTIONS = {
'region': AWS_REGION,
'polling_interval': 1, # 1 second
'visibility_timeout': 310,
'queue_name_prefix': NOTIFICATION_QUEUE_PREFIX
CELERY = {
'broker_url': 'sqs://',
'broker_transport_options': {
'region': AWS_REGION,
'visibility_timeout': 310,
'queue_name_prefix': NOTIFICATION_QUEUE_PREFIX,
},
'timezone': 'Europe/London',
'imports': [
'app.celery.tasks',
'app.celery.scheduled_tasks',
'app.celery.reporting_tasks',
'app.celery.nightly_tasks',
],
# this is overriden by the -Q command, but locally, we should read from all queues
'task_queues': [
Queue(queue, Exchange('default'), routing_key=queue) for queue in QueueNames.all_queues()
],
'beat_schedule': {
# app/celery/scheduled_tasks.py
'run-scheduled-jobs': {
'task': 'run-scheduled-jobs',
'schedule': crontab(minute='0,15,30,45'),
'options': {'queue': QueueNames.PERIODIC}
},
'delete-verify-codes': {
'task': 'delete-verify-codes',
'schedule': timedelta(minutes=63),
'options': {'queue': QueueNames.PERIODIC}
},
'delete-invitations': {
'task': 'delete-invitations',
'schedule': timedelta(minutes=66),
'options': {'queue': QueueNames.PERIODIC}
},
'switch-current-sms-provider-on-slow-delivery': {
'task': 'switch-current-sms-provider-on-slow-delivery',
'schedule': crontab(), # Every minute
'options': {'queue': QueueNames.PERIODIC}
},
'check-job-status': {
'task': 'check-job-status',
'schedule': crontab(),
'options': {'queue': QueueNames.PERIODIC}
},
'tend-providers-back-to-middle': {
'task': 'tend-providers-back-to-middle',
'schedule': crontab(minute='*/5'),
'options': {'queue': QueueNames.PERIODIC}
},
'check-for-missing-rows-in-completed-jobs': {
'task': 'check-for-missing-rows-in-completed-jobs',
'schedule': crontab(minute='*/10'),
'options': {'queue': QueueNames.PERIODIC}
},
'replay-created-notifications': {
'task': 'replay-created-notifications',
'schedule': crontab(minute='0, 15, 30, 45'),
'options': {'queue': QueueNames.PERIODIC}
},
# app/celery/nightly_tasks.py
'timeout-sending-notifications': {
'task': 'timeout-sending-notifications',
'schedule': crontab(hour=0, minute=5),
'options': {'queue': QueueNames.PERIODIC}
},
'create-nightly-billing': {
'task': 'create-nightly-billing',
'schedule': crontab(hour=0, minute=15),
'options': {'queue': QueueNames.REPORTING}
},
'create-nightly-notification-status': {
'task': 'create-nightly-notification-status',
'schedule': crontab(hour=0, minute=30), # after 'timeout-sending-notifications'
'options': {'queue': QueueNames.REPORTING}
},
'delete-notifications-older-than-retention': {
'task': 'delete-notifications-older-than-retention',
'schedule': crontab(hour=3, minute=0), # after 'create-nightly-notification-status'
'options': {'queue': QueueNames.PERIODIC}
},
'delete-inbound-sms': {
'task': 'delete-inbound-sms',
'schedule': crontab(hour=1, minute=40),
'options': {'queue': QueueNames.PERIODIC}
},
'save-daily-notification-processing-time': {
'task': 'save-daily-notification-processing-time',
'schedule': crontab(hour=2, minute=0),
'options': {'queue': QueueNames.PERIODIC}
},
'remove_sms_email_jobs': {
'task': 'remove_sms_email_jobs',
'schedule': crontab(hour=4, minute=0),
'options': {'queue': QueueNames.PERIODIC},
},
'remove_letter_jobs': {
'task': 'remove_letter_jobs',
'schedule': crontab(hour=4, minute=20),
# since we mark jobs as archived
'options': {'queue': QueueNames.PERIODIC},
},
'check-if-letters-still-in-created': {
'task': 'check-if-letters-still-in-created',
'schedule': crontab(day_of_week='mon-fri', hour=7, minute=0),
'options': {'queue': QueueNames.PERIODIC}
},
'check-if-letters-still-pending-virus-check': {
'task': 'check-if-letters-still-pending-virus-check',
'schedule': crontab(day_of_week='mon-fri', hour='9,15', minute=0),
'options': {'queue': QueueNames.PERIODIC}
},
'check-for-services-with-high-failure-rates-or-sending-to-tv-numbers': {
'task': 'check-for-services-with-high-failure-rates-or-sending-to-tv-numbers',
'schedule': crontab(day_of_week='mon-fri', hour=10, minute=30),
'options': {'queue': QueueNames.PERIODIC}
},
'raise-alert-if-letter-notifications-still-sending': {
'task': 'raise-alert-if-letter-notifications-still-sending',
'schedule': crontab(hour=15, minute=30),
'options': {'queue': QueueNames.PERIODIC}
},
# The collate-letter-pdf does assume it is called in an hour that BST does not make a
# difference to the truncate date which translates to the filename to process
'collate-letter-pdfs-to-be-sent': {
'task': 'collate-letter-pdfs-to-be-sent',
'schedule': crontab(hour=17, minute=50),
'options': {'queue': QueueNames.PERIODIC}
},
'raise-alert-if-no-letter-ack-file': {
'task': 'raise-alert-if-no-letter-ack-file',
'schedule': crontab(hour=23, minute=00),
'options': {'queue': QueueNames.PERIODIC}
},
'trigger-link-tests': {
'task': 'trigger-link-tests',
'schedule': timedelta(minutes=15),
'options': {'queue': QueueNames.PERIODIC}
},
'auto-expire-broadcast-messages': {
'task': 'auto-expire-broadcast-messages',
'schedule': timedelta(minutes=5),
'options': {'queue': QueueNames.PERIODIC}
},
}
}
CELERY_ENABLE_UTC = True
CELERY_TIMEZONE = 'Europe/London'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
# on reporting worker, restart workers after each task is executed to help prevent memory leaks
CELERYD_MAX_TASKS_PER_CHILD = os.getenv('CELERYD_MAX_TASKS_PER_CHILD')
# we can set celeryd_prefetch_multiplier to be 1 for celery apps which handle only long running tasks
if os.getenv('CELERYD_PREFETCH_MULTIPLIER'):
CELERYD_PREFETCH_MULTIPLIER = os.getenv('CELERYD_PREFETCH_MULTIPLIER')
CELERY_IMPORTS = (
'app.celery.tasks',
'app.celery.scheduled_tasks',
'app.celery.reporting_tasks',
'app.celery.nightly_tasks',
)
CELERYBEAT_SCHEDULE = {
# app/celery/scheduled_tasks.py
'run-scheduled-jobs': {
'task': 'run-scheduled-jobs',
'schedule': crontab(minute='0,15,30,45'),
'options': {'queue': QueueNames.PERIODIC}
},
'delete-verify-codes': {
'task': 'delete-verify-codes',
'schedule': timedelta(minutes=63),
'options': {'queue': QueueNames.PERIODIC}
},
'delete-invitations': {
'task': 'delete-invitations',
'schedule': timedelta(minutes=66),
'options': {'queue': QueueNames.PERIODIC}
},
'switch-current-sms-provider-on-slow-delivery': {
'task': 'switch-current-sms-provider-on-slow-delivery',
'schedule': crontab(), # Every minute
'options': {'queue': QueueNames.PERIODIC}
},
'check-job-status': {
'task': 'check-job-status',
'schedule': crontab(),
'options': {'queue': QueueNames.PERIODIC}
},
'tend-providers-back-to-middle': {
'task': 'tend-providers-back-to-middle',
'schedule': crontab(minute='*/5'),
'options': {'queue': QueueNames.PERIODIC}
},
'check-for-missing-rows-in-completed-jobs': {
'task': 'check-for-missing-rows-in-completed-jobs',
'schedule': crontab(minute='*/10'),
'options': {'queue': QueueNames.PERIODIC}
},
'replay-created-notifications': {
'task': 'replay-created-notifications',
'schedule': crontab(minute='0, 15, 30, 45'),
'options': {'queue': QueueNames.PERIODIC}
},
# app/celery/nightly_tasks.py
'timeout-sending-notifications': {
'task': 'timeout-sending-notifications',
'schedule': crontab(hour=0, minute=5),
'options': {'queue': QueueNames.PERIODIC}
},
'create-nightly-billing': {
'task': 'create-nightly-billing',
'schedule': crontab(hour=0, minute=15),
'options': {'queue': QueueNames.REPORTING}
},
'create-nightly-notification-status': {
'task': 'create-nightly-notification-status',
'schedule': crontab(hour=0, minute=30), # after 'timeout-sending-notifications'
'options': {'queue': QueueNames.REPORTING}
},
'delete-notifications-older-than-retention': {
'task': 'delete-notifications-older-than-retention',
'schedule': crontab(hour=3, minute=0), # after 'create-nightly-notification-status'
'options': {'queue': QueueNames.PERIODIC}
},
'delete-inbound-sms': {
'task': 'delete-inbound-sms',
'schedule': crontab(hour=1, minute=40),
'options': {'queue': QueueNames.PERIODIC}
},
'save-daily-notification-processing-time': {
'task': 'save-daily-notification-processing-time',
'schedule': crontab(hour=2, minute=0),
'options': {'queue': QueueNames.PERIODIC}
},
'remove_sms_email_jobs': {
'task': 'remove_sms_email_jobs',
'schedule': crontab(hour=4, minute=0),
'options': {'queue': QueueNames.PERIODIC},
},
'remove_letter_jobs': {
'task': 'remove_letter_jobs',
'schedule': crontab(hour=4, minute=20),
# since we mark jobs as archived
'options': {'queue': QueueNames.PERIODIC},
},
'check-if-letters-still-in-created': {
'task': 'check-if-letters-still-in-created',
'schedule': crontab(day_of_week='mon-fri', hour=7, minute=0),
'options': {'queue': QueueNames.PERIODIC}
},
'check-if-letters-still-pending-virus-check': {
'task': 'check-if-letters-still-pending-virus-check',
'schedule': crontab(day_of_week='mon-fri', hour='9,15', minute=0),
'options': {'queue': QueueNames.PERIODIC}
},
'check-for-services-with-high-failure-rates-or-sending-to-tv-numbers': {
'task': 'check-for-services-with-high-failure-rates-or-sending-to-tv-numbers',
'schedule': crontab(day_of_week='mon-fri', hour=10, minute=30),
'options': {'queue': QueueNames.PERIODIC}
},
'raise-alert-if-letter-notifications-still-sending': {
'task': 'raise-alert-if-letter-notifications-still-sending',
'schedule': crontab(hour=15, minute=30),
'options': {'queue': QueueNames.PERIODIC}
},
# The collate-letter-pdf does assume it is called in an hour that BST does not make a
# difference to the truncate date which translates to the filename to process
'collate-letter-pdfs-to-be-sent': {
'task': 'collate-letter-pdfs-to-be-sent',
'schedule': crontab(hour=17, minute=50),
'options': {'queue': QueueNames.PERIODIC}
},
'raise-alert-if-no-letter-ack-file': {
'task': 'raise-alert-if-no-letter-ack-file',
'schedule': crontab(hour=23, minute=00),
'options': {'queue': QueueNames.PERIODIC}
},
'trigger-link-tests': {
'task': 'trigger-link-tests',
'schedule': timedelta(minutes=15),
'options': {'queue': QueueNames.PERIODIC}
},
'auto-expire-broadcast-messages': {
'task': 'auto-expire-broadcast-messages',
'schedule': timedelta(minutes=5),
'options': {'queue': QueueNames.PERIODIC}
},
}
CELERY_QUEUES = []
CELERY['worker_prefetch_multiplier'] = os.getenv('CELERYD_PREFETCH_MULTIPLIER')
# on reporting worker, restart workers after each task is executed to help prevent memory leaks
if os.getenv('CELERYD_MAX_TASKS_PER_CHILD'):
CELERY['worker_max_tasks_per_child'] = int(os.getenv('CELERYD_MAX_TASKS_PER_CHILD'))
FROM_NUMBER = 'development'
@@ -419,7 +423,6 @@ class Development(Config):
NOTIFY_ENVIRONMENT = 'development'
NOTIFY_LOG_PATH = 'application.log'
NOTIFICATION_QUEUE_PREFIX = 'development'
NOTIFY_EMAIL_DOMAIN = "notify.tools"
SQLALCHEMY_DATABASE_URI = 'postgresql://localhost/notification_api'
@@ -427,11 +430,6 @@ class Development(Config):
ANTIVIRUS_ENABLED = os.getenv('ANTIVIRUS_ENABLED') == '1'
for queue in QueueNames.all_queues():
Config.CELERY_QUEUES.append(
Queue(queue, Exchange('default'), routing_key=queue)
)
API_HOST_NAME = "http://localhost:6011"
API_RATE_LIMIT_ENABLED = True
DVLA_EMAIL_ADDRESSES = ['success@simulator.amazonses.com']
@@ -465,15 +463,13 @@ class Test(Development):
# this is overriden in jenkins and on cloudfoundry
SQLALCHEMY_DATABASE_URI = os.getenv('SQLALCHEMY_DATABASE_URI', 'postgresql://localhost/test_notification_api')
BROKER_URL = 'you-forgot-to-mock-celery-in-your-tests://'
CELERY = {
**Config.CELERY,
'broker_url': 'you-forgot-to-mock-celery-in-your-tests://'
}
ANTIVIRUS_ENABLED = True
for queue in QueueNames.all_queues():
Config.CELERY_QUEUES.append(
Queue(queue, Exchange('default'), routing_key=queue)
)
API_RATE_LIMIT_ENABLED = True
API_HOST_NAME = "http://localhost:6011"

View File

@@ -3,7 +3,7 @@ import functools
import uuid
from datetime import datetime
from boto.exception import SQSError
import botocore
from flask import abort, current_app, jsonify, request
from gds_metrics import Histogram
from notifications_utils.recipients import try_validate_and_format_phone_number
@@ -232,7 +232,7 @@ def process_sms_or_email_notification(
reply_to_text=reply_to_text
)
return resp
except SQSError:
except botocore.exceptions.ClientError:
# if SQS cannot put the task on the queue, it's probably because the notification body was too long and it
# went over SQS's 256kb message limit. If so, we
current_app.logger.info(

View File

@@ -2,7 +2,7 @@
# with package version changes made in requirements-app.txt
cffi==1.14.5
celery[sqs]==3.1.26.post2 # pyup: <4
celery[sqs]==5.1.2
docopt==0.6.2
Flask-Bcrypt==0.7.1
flask-marshmallow==0.14.0

View File

@@ -4,7 +4,7 @@
# with package version changes made in requirements-app.txt
cffi==1.14.5
celery[sqs]==3.1.26.post2 # pyup: <4
celery[sqs]==5.1.2
docopt==0.6.2
Flask-Bcrypt==0.7.1
flask-marshmallow==0.14.0
@@ -46,20 +46,22 @@ gds-metrics==0.2.4
## The following requirements were added by pip freeze:
alembic==1.7.4
amqp==1.4.9
anyjson==0.3.3
amqp==5.0.6
attrs==21.2.0
awscli==1.21.4
bcrypt==3.2.0
billiard==3.3.0.23
billiard==3.6.4.0
bleach==4.1.0
blinker==1.4
boto==2.49.0
boto3==1.19.4
botocore==1.22.4
cached-property==1.5.2
certifi==2021.10.8
charset-normalizer==2.0.7
click==8.0.3
click==7.1.2
click-didyoumean==0.3.0
click-plugins==1.1.1
click-repl==0.2.0
colorama==0.4.3
dataclasses==0.8
dnspython==1.16.0
@@ -73,15 +75,17 @@ importlib-metadata==4.8.1
importlib-resources==5.3.0
Jinja2==3.0.2
jmespath==0.10.0
kombu==3.0.37
kombu==5.1.0
Mako==1.1.5
MarkupSafe==2.0.1
mistune==0.8.4
orderedset==2.0.3
packaging==21.0
phonenumbers==8.12.36
prompt-toolkit==3.0.21
pyasn1==0.4.8
pycparser==2.20
pycurl==7.43.0.5
pyparsing==3.0.1
PyPDF2==1.26.0
pyrsistent==0.18.0
@@ -100,5 +104,7 @@ soupsieve==2.2.1
statsd==3.3.0
typing-extensions==3.10.0.2
urllib3==1.26.7
vine==5.0.0
wcwidth==0.2.5
webencodings==0.5.1
zipp==3.6.0

View File

@@ -5,15 +5,15 @@ case $NOTIFY_APP_NAME in
exec scripts/run_app_paas.sh gunicorn -c /home/vcap/app/gunicorn_config.py application
;;
delivery-worker-retry-tasks)
exec scripts/run_app_paas.sh celery -A run_celery.notify_celery worker --loglevel=INFO --concurrency=11 \
exec scripts/run_app_paas.sh celery -A run_celery.notify_celery worker --loglevel=INFO --concurrency=4 \
-Q retry-tasks 2> /dev/null
;;
delivery-worker-letters)
exec scripts/run_app_paas.sh celery -A run_celery.notify_celery worker --loglevel=INFO --concurrency=11 \
exec scripts/run_app_paas.sh celery -A run_celery.notify_celery worker --loglevel=INFO --concurrency=4 \
-Q create-letters-pdf-tasks,letter-tasks 2> /dev/null
;;
delivery-worker-jobs)
exec scripts/run_app_paas.sh celery -A run_celery.notify_celery worker --loglevel=INFO --concurrency=11 \
exec scripts/run_app_paas.sh celery -A run_celery.notify_celery worker --loglevel=INFO --concurrency=4 \
-Q database-tasks,job-tasks 2> /dev/null
;;
delivery-worker-research)
@@ -21,7 +21,7 @@ case $NOTIFY_APP_NAME in
-Q research-mode-tasks 2> /dev/null
;;
delivery-worker-sender)
exec scripts/run_multi_worker_app_paas.sh celery multi start 3 -c 10 -A run_celery.notify_celery --loglevel=INFO \
exec scripts/run_multi_worker_app_paas.sh celery multi start 3 -c 4 -A run_celery.notify_celery --loglevel=INFO \
--logfile=/dev/null --pidfile=/tmp/celery%N.pid -Q send-sms-tasks,send-email-tasks
;;
delivery-worker-periodic)
@@ -33,12 +33,12 @@ case $NOTIFY_APP_NAME in
-Q reporting-tasks 2> /dev/null
;;
delivery-worker-priority)
exec scripts/run_app_paas.sh celery -A run_celery.notify_celery worker --loglevel=INFO --concurrency=5 \
exec scripts/run_app_paas.sh celery -A run_celery.notify_celery worker --loglevel=INFO --concurrency=4 \
-Q priority-tasks 2> /dev/null
;;
# Only consume the notify-internal-tasks queue on this app so that Notify messages are processed as a priority
delivery-worker-internal)
exec scripts/run_app_paas.sh celery -A run_celery.notify_celery worker --loglevel=INFO --concurrency=11 \
exec scripts/run_app_paas.sh celery -A run_celery.notify_celery worker --loglevel=INFO --concurrency=4 \
-Q notify-internal-tasks 2> /dev/null
;;
delivery-worker-broadcasts)
@@ -46,15 +46,15 @@ case $NOTIFY_APP_NAME in
-Q broadcast-tasks 2> /dev/null
;;
delivery-worker-receipts)
exec scripts/run_app_paas.sh celery -A run_celery.notify_celery worker --loglevel=INFO --concurrency=11 \
exec scripts/run_app_paas.sh celery -A run_celery.notify_celery worker --loglevel=INFO --concurrency=4 \
-Q ses-callbacks,sms-callbacks 2> /dev/null
;;
delivery-worker-service-callbacks)
exec scripts/run_app_paas.sh celery -A run_celery.notify_celery worker --loglevel=INFO --concurrency=11 \
exec scripts/run_app_paas.sh celery -A run_celery.notify_celery worker --loglevel=INFO --concurrency=4 \
-Q service-callbacks,service-callbacks-retry 2> /dev/null
;;
delivery-worker-save-api-notifications)
exec scripts/run_app_paas.sh celery -A run_celery.notify_celery worker --loglevel=INFO --concurrency=11 \
exec scripts/run_app_paas.sh celery -A run_celery.notify_celery worker --loglevel=INFO --concurrency=4 \
-Q save-api-email-tasks,save-api-sms-tasks 2> /dev/null
;;
delivery-celery-beat)

View File

@@ -2,8 +2,8 @@ import uuid
from unittest import mock
from unittest.mock import call
import botocore
import pytest
from boto.exception import SQSError
from flask import current_app, json
from app.dao import templates_dao
@@ -1074,7 +1074,7 @@ def test_post_notifications_saves_email_or_sms_normally_if_saving_to_queue_fails
):
save_task = mocker.patch(
f"app.celery.tasks.save_api_{notification_type}.apply_async",
side_effect=SQSError({'some': 'json'}, 'some opname')
side_effect=botocore.exceptions.ClientError({'some': 'json'}, 'some opname')
)
mock_send_task = mocker.patch(f'app.celery.provider_tasks.deliver_{notification_type}.apply_async')