From 3e49de533044acf214e4cacc65660cf82d3c2452 Mon Sep 17 00:00:00 2001 From: Ben Thorner Date: Thu, 21 Oct 2021 14:33:31 +0100 Subject: [PATCH 01/13] Upgrade to Celery 5.1.2 There are several other changes we need to make in order to install the new version. For more context, see: - https://github.com/celery/celery/commit/208e90e40f4aa3bfd5bc75600af9d1ed4e1efa28 - https://github.com/celery/celery/commit/e3d1993a58d3a55d59b6ad95e871f3ed43b21e0b - https://github.com/celery/kombu/commit/7e93611fcea2af953830218184d21e05b3b67c5a In the next commits we'll look at tidying up the config and other dependencies so the change is deployable. --- requirements-app.txt | 2 +- requirements.txt | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/requirements-app.txt b/requirements-app.txt index 32efa05d5..45e5a6197 100644 --- a/requirements-app.txt +++ b/requirements-app.txt @@ -2,7 +2,7 @@ # with package version changes made in requirements-app.txt cffi==1.14.5 -celery[sqs]==3.1.26.post2 # pyup: <4 +celery[sqs]==5.1.2 docopt==0.6.2 Flask-Bcrypt==0.7.1 flask-marshmallow==0.14.0 diff --git a/requirements.txt b/requirements.txt index 7514b7f7e..316748e23 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,7 +4,7 @@ # with package version changes made in requirements-app.txt cffi==1.14.5 -celery[sqs]==3.1.26.post2 # pyup: <4 +celery[sqs]==5.1.2 docopt==0.6.2 Flask-Bcrypt==0.7.1 flask-marshmallow==0.14.0 @@ -46,12 +46,12 @@ gds-metrics==0.2.4 ## The following requirements were added by pip freeze: alembic==1.7.4 -amqp==1.4.9 +amqp==5.0.6 anyjson==0.3.3 attrs==21.2.0 awscli==1.21.4 bcrypt==3.2.0 -billiard==3.3.0.23 +billiard==3.6.4.0 bleach==4.1.0 blinker==1.4 boto==2.49.0 @@ -59,7 +59,7 @@ boto3==1.19.4 botocore==1.22.4 certifi==2021.10.8 charset-normalizer==2.0.7 -click==8.0.3 +click==7.1.2 colorama==0.4.3 dataclasses==0.8 dnspython==1.16.0 @@ -73,7 +73,7 @@ importlib-metadata==4.8.1 importlib-resources==5.3.0 Jinja2==3.0.2 jmespath==0.10.0 -kombu==3.0.37 +kombu==5.1.0 Mako==1.1.5 MarkupSafe==2.0.1 mistune==0.8.4 From c2fe1b04bb40cf268a50cc1a982044cdfa24be98 Mon Sep 17 00:00:00 2001 From: Ben Thorner Date: Thu, 21 Oct 2021 15:02:39 +0100 Subject: [PATCH 02/13] Fix test checking for nested exception Previously this type of exception was raised at the top level and the task did not retry [1]. Since Celery 4+ the behaviour changed so that a Retry exception will be raised unless we explicitly say we want to raise the original one [2]. It's unclear if we actually want to retry this task for any type of exception, but it's out-of-scope for this PR to decide on this, so here we just reraise the exception to make it compatible with the new version of Celery and the existing test. [1]: https://github.com/alphagov/notifications-api/pull/2832/files#diff-926badba91648d56a973e16bd92da3345b23bc60dc89360119b1df08de52723fL77 [2]: https://github.com/celery/celery/commit/32b52ca875509b84d786e33ce2d39f62ab7ea050#diff-db604dd7cb51e386710260ff2eba378aac19ba11eec97904bbf097b68caeada6L625 --- app/celery/letters_pdf_tasks.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/app/celery/letters_pdf_tasks.py b/app/celery/letters_pdf_tasks.py index 3d4b335aa..8616c9fe5 100644 --- a/app/celery/letters_pdf_tasks.py +++ b/app/celery/letters_pdf_tasks.py @@ -85,12 +85,12 @@ def get_pdf_for_templated_letter(self, notification_id): args=(encrypted_data,), queue=QueueNames.SANITISE_LETTERS ) - except Exception: + except Exception as e: try: current_app.logger.exception( f"RETRY: calling create-letter-pdf task for notification {notification_id} failed" ) - self.retry(queue=QueueNames.RETRY) + self.retry(exc=e, queue=QueueNames.RETRY) except self.MaxRetriesExceededError: message = f"RETRY FAILED: Max retries reached. " \ f"The task create-letter-pdf failed for notification id {notification_id}. " \ From 19394ab9ddebb9027d11add62020577a09eba65f Mon Sep 17 00:00:00 2001 From: Leo Hemsted Date: Thu, 28 Dec 2017 12:04:19 +0000 Subject: [PATCH 03/13] construct celery queues once in the base config previously, we were confusing things by appending to CELERY_QUEUES in both dev and test configs - these are executed at import time, so the list contained all queues twice, regardless of what config you're actually using. Fortunately, the -Q command that we supply the workers with overrides this config option, so other environments weren't affected. Given that, we can tidy up this code by just declaring it in the base config every time --- app/config.py | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) diff --git a/app/config.py b/app/config.py index 0d29a02d1..697982ac4 100644 --- a/app/config.py +++ b/app/config.py @@ -336,7 +336,9 @@ class Config(object): 'options': {'queue': QueueNames.PERIODIC} }, } - CELERY_QUEUES = [] + + # this is overriden by the -Q command, but locally, we should read from all queues + CELERY_QUEUES = [Queue(queue, Exchange('default'), routing_key=queue) for queue in QueueNames.all_queues()] FROM_NUMBER = 'development' @@ -427,11 +429,6 @@ class Development(Config): ANTIVIRUS_ENABLED = os.getenv('ANTIVIRUS_ENABLED') == '1' - for queue in QueueNames.all_queues(): - Config.CELERY_QUEUES.append( - Queue(queue, Exchange('default'), routing_key=queue) - ) - API_HOST_NAME = "http://localhost:6011" API_RATE_LIMIT_ENABLED = True DVLA_EMAIL_ADDRESSES = ['success@simulator.amazonses.com'] @@ -469,11 +466,6 @@ class Test(Development): ANTIVIRUS_ENABLED = True - for queue in QueueNames.all_queues(): - Config.CELERY_QUEUES.append( - Queue(queue, Exchange('default'), routing_key=queue) - ) - API_RATE_LIMIT_ENABLED = True API_HOST_NAME = "http://localhost:6011" From 44b3b42abace79798e467194097fee51b5d766a4 Mon Sep 17 00:00:00 2001 From: Ben Thorner Date: Tue, 26 Oct 2021 16:36:25 +0100 Subject: [PATCH 04/13] Rewrite config to fix deprecation warnings The new format was introduced in Celery 4 [1] and is due for removal in Celery 6 [2], hence the warnings e.g. [2021-10-26 14:31:57,588: WARNING/MainProcess] /Users/benthorner/.pyenv/versions/notifications-api/lib/python3.6/site-packages/celery/app/utils.py:206: CDeprecationWarning: The 'CELERY_TIMEZONE' setting is deprecated and scheduled for removal in version 6.0.0. Use the timezone instead alternative=f'Use the {_TO_NEW_KEY[setting]} instead') This rewrites the config to match our other apps [3][4]. Some of the settings have been removed entirely: - "CELERY_ENABLE_UTC = True" - this has been enabled by default since Celery 3 [5]. - "CELERY_ACCEPT_CONTENT = ['json']", "CELERY_TASK_SERIALIZER = 'json'" - these are the default settings since Celery 4 [6][7]. Finally, this removes a redundant (and broken) bit of development config - NOTIFICATION_QUEUE_PREFIX - that should be set in environment.sh [8]. [1]: https://docs.celeryproject.org/en/stable/history/whatsnew-4.0.html#lowercase-setting-names [2]: https://docs.celeryproject.org/en/stable/history/whatsnew-5.0.html#step-2-update-your-configuration-with-the-new-setting-names [3]: https://github.com/alphagov/notifications-govuk-alerts/blob/252ad01d3934e5d75aabbee92badbf38a009046a/app/config.py#L27 [4]: https://github.com/alphagov/notifications-template-preview/blob/03df0d92522f13091b081f3fe04c188e85d2ade6/app/__init__.py#L33 [5]: https://docs.celeryproject.org/en/stable/userguide/configuration.html#std-setting-enable_utc [6]: https://docs.celeryproject.org/en/stable/userguide/configuration.html#std-setting-task_serializer [7]: https://docs.celeryproject.org/en/stable/userguide/configuration.html#std-setting-accept_content [8]: https://github.com/alphagov/notifications-api/blob/2edbdec4eeaee4a937ece1a98000bd439624c0e0/README.md#environmentsh --- app/celery/celery.py | 4 +- app/config.py | 305 ++++++++++++++++++++++--------------------- 2 files changed, 156 insertions(+), 153 deletions(-) diff --git a/app/celery/celery.py b/app/celery/celery.py index 16c50da23..e76af8c55 100644 --- a/app/celery/celery.py +++ b/app/celery/celery.py @@ -81,11 +81,11 @@ class NotifyCelery(Celery): def init_app(self, app): super().__init__( app.import_name, - broker=app.config['BROKER_URL'], + broker=app.config['CELERY']['broker_url'], task_cls=make_task(app), ) - self.conf.update(app.config) + self.conf.update(app.config['CELERY']) self._app = app def send_task(self, name, args=None, kwargs=None, **other_kwargs): diff --git a/app/config.py b/app/config.py index 697982ac4..1a8cd3cf9 100644 --- a/app/config.py +++ b/app/config.py @@ -187,158 +187,159 @@ class Config(object): # we only need real email in Live environment (production) DVLA_EMAIL_ADDRESSES = json.loads(os.environ.get('DVLA_EMAIL_ADDRESSES', '[]')) - BROKER_URL = 'sqs://' - BROKER_TRANSPORT_OPTIONS = { - 'region': AWS_REGION, - 'polling_interval': 1, # 1 second - 'visibility_timeout': 310, - 'queue_name_prefix': NOTIFICATION_QUEUE_PREFIX - } - CELERY_ENABLE_UTC = True - CELERY_TIMEZONE = 'Europe/London' - CELERY_ACCEPT_CONTENT = ['json'] - CELERY_TASK_SERIALIZER = 'json' - # on reporting worker, restart workers after each task is executed to help prevent memory leaks - CELERYD_MAX_TASKS_PER_CHILD = os.getenv('CELERYD_MAX_TASKS_PER_CHILD') - # we can set celeryd_prefetch_multiplier to be 1 for celery apps which handle only long running tasks - if os.getenv('CELERYD_PREFETCH_MULTIPLIER'): - CELERYD_PREFETCH_MULTIPLIER = os.getenv('CELERYD_PREFETCH_MULTIPLIER') - CELERY_IMPORTS = ( - 'app.celery.tasks', - 'app.celery.scheduled_tasks', - 'app.celery.reporting_tasks', - 'app.celery.nightly_tasks', - ) - CELERYBEAT_SCHEDULE = { - # app/celery/scheduled_tasks.py - 'run-scheduled-jobs': { - 'task': 'run-scheduled-jobs', - 'schedule': crontab(minute='0,15,30,45'), - 'options': {'queue': QueueNames.PERIODIC} - }, - 'delete-verify-codes': { - 'task': 'delete-verify-codes', - 'schedule': timedelta(minutes=63), - 'options': {'queue': QueueNames.PERIODIC} - }, - 'delete-invitations': { - 'task': 'delete-invitations', - 'schedule': timedelta(minutes=66), - 'options': {'queue': QueueNames.PERIODIC} - }, - 'switch-current-sms-provider-on-slow-delivery': { - 'task': 'switch-current-sms-provider-on-slow-delivery', - 'schedule': crontab(), # Every minute - 'options': {'queue': QueueNames.PERIODIC} - }, - 'check-job-status': { - 'task': 'check-job-status', - 'schedule': crontab(), - 'options': {'queue': QueueNames.PERIODIC} - }, - 'tend-providers-back-to-middle': { - 'task': 'tend-providers-back-to-middle', - 'schedule': crontab(minute='*/5'), - 'options': {'queue': QueueNames.PERIODIC} - }, - 'check-for-missing-rows-in-completed-jobs': { - 'task': 'check-for-missing-rows-in-completed-jobs', - 'schedule': crontab(minute='*/10'), - 'options': {'queue': QueueNames.PERIODIC} - }, - 'replay-created-notifications': { - 'task': 'replay-created-notifications', - 'schedule': crontab(minute='0, 15, 30, 45'), - 'options': {'queue': QueueNames.PERIODIC} - }, - # app/celery/nightly_tasks.py - 'timeout-sending-notifications': { - 'task': 'timeout-sending-notifications', - 'schedule': crontab(hour=0, minute=5), - 'options': {'queue': QueueNames.PERIODIC} - }, - 'create-nightly-billing': { - 'task': 'create-nightly-billing', - 'schedule': crontab(hour=0, minute=15), - 'options': {'queue': QueueNames.REPORTING} - }, - 'create-nightly-notification-status': { - 'task': 'create-nightly-notification-status', - 'schedule': crontab(hour=0, minute=30), # after 'timeout-sending-notifications' - 'options': {'queue': QueueNames.REPORTING} - }, - 'delete-notifications-older-than-retention': { - 'task': 'delete-notifications-older-than-retention', - 'schedule': crontab(hour=3, minute=0), # after 'create-nightly-notification-status' - 'options': {'queue': QueueNames.PERIODIC} - }, - 'delete-inbound-sms': { - 'task': 'delete-inbound-sms', - 'schedule': crontab(hour=1, minute=40), - 'options': {'queue': QueueNames.PERIODIC} - }, - 'save-daily-notification-processing-time': { - 'task': 'save-daily-notification-processing-time', - 'schedule': crontab(hour=2, minute=0), - 'options': {'queue': QueueNames.PERIODIC} - }, - 'remove_sms_email_jobs': { - 'task': 'remove_sms_email_jobs', - 'schedule': crontab(hour=4, minute=0), - 'options': {'queue': QueueNames.PERIODIC}, - }, - 'remove_letter_jobs': { - 'task': 'remove_letter_jobs', - 'schedule': crontab(hour=4, minute=20), - # since we mark jobs as archived - 'options': {'queue': QueueNames.PERIODIC}, - }, - 'check-if-letters-still-in-created': { - 'task': 'check-if-letters-still-in-created', - 'schedule': crontab(day_of_week='mon-fri', hour=7, minute=0), - 'options': {'queue': QueueNames.PERIODIC} - }, - 'check-if-letters-still-pending-virus-check': { - 'task': 'check-if-letters-still-pending-virus-check', - 'schedule': crontab(day_of_week='mon-fri', hour='9,15', minute=0), - 'options': {'queue': QueueNames.PERIODIC} - }, - 'check-for-services-with-high-failure-rates-or-sending-to-tv-numbers': { - 'task': 'check-for-services-with-high-failure-rates-or-sending-to-tv-numbers', - 'schedule': crontab(day_of_week='mon-fri', hour=10, minute=30), - 'options': {'queue': QueueNames.PERIODIC} - }, - 'raise-alert-if-letter-notifications-still-sending': { - 'task': 'raise-alert-if-letter-notifications-still-sending', - 'schedule': crontab(hour=15, minute=30), - 'options': {'queue': QueueNames.PERIODIC} - }, - # The collate-letter-pdf does assume it is called in an hour that BST does not make a - # difference to the truncate date which translates to the filename to process - 'collate-letter-pdfs-to-be-sent': { - 'task': 'collate-letter-pdfs-to-be-sent', - 'schedule': crontab(hour=17, minute=50), - 'options': {'queue': QueueNames.PERIODIC} - }, - 'raise-alert-if-no-letter-ack-file': { - 'task': 'raise-alert-if-no-letter-ack-file', - 'schedule': crontab(hour=23, minute=00), - 'options': {'queue': QueueNames.PERIODIC} - }, - 'trigger-link-tests': { - 'task': 'trigger-link-tests', - 'schedule': timedelta(minutes=15), - 'options': {'queue': QueueNames.PERIODIC} - }, - 'auto-expire-broadcast-messages': { - 'task': 'auto-expire-broadcast-messages', - 'schedule': timedelta(minutes=5), - 'options': {'queue': QueueNames.PERIODIC} + CELERY = { + 'broker_url': 'sqs://', + 'broker_transport_options': { + 'region': AWS_REGION, + 'polling_interval': 1, # 1 second + 'visibility_timeout': 310, + 'queue_name_prefix': NOTIFICATION_QUEUE_PREFIX, }, + 'timezone': 'Europe/London', + # on reporting worker, restart workers after each task is executed to help prevent memory leaks + 'worker_max_tasks_per_child': os.getenv('CELERYD_MAX_TASKS_PER_CHILD'), + 'imports': [ + 'app.celery.tasks', + 'app.celery.scheduled_tasks', + 'app.celery.reporting_tasks', + 'app.celery.nightly_tasks', + ], + # this is overriden by the -Q command, but locally, we should read from all queues + 'task_queues': [ + Queue(queue, Exchange('default'), routing_key=queue) for queue in QueueNames.all_queues() + ], + 'beat_schedule': { + # app/celery/scheduled_tasks.py + 'run-scheduled-jobs': { + 'task': 'run-scheduled-jobs', + 'schedule': crontab(minute='0,15,30,45'), + 'options': {'queue': QueueNames.PERIODIC} + }, + 'delete-verify-codes': { + 'task': 'delete-verify-codes', + 'schedule': timedelta(minutes=63), + 'options': {'queue': QueueNames.PERIODIC} + }, + 'delete-invitations': { + 'task': 'delete-invitations', + 'schedule': timedelta(minutes=66), + 'options': {'queue': QueueNames.PERIODIC} + }, + 'switch-current-sms-provider-on-slow-delivery': { + 'task': 'switch-current-sms-provider-on-slow-delivery', + 'schedule': crontab(), # Every minute + 'options': {'queue': QueueNames.PERIODIC} + }, + 'check-job-status': { + 'task': 'check-job-status', + 'schedule': crontab(), + 'options': {'queue': QueueNames.PERIODIC} + }, + 'tend-providers-back-to-middle': { + 'task': 'tend-providers-back-to-middle', + 'schedule': crontab(minute='*/5'), + 'options': {'queue': QueueNames.PERIODIC} + }, + 'check-for-missing-rows-in-completed-jobs': { + 'task': 'check-for-missing-rows-in-completed-jobs', + 'schedule': crontab(minute='*/10'), + 'options': {'queue': QueueNames.PERIODIC} + }, + 'replay-created-notifications': { + 'task': 'replay-created-notifications', + 'schedule': crontab(minute='0, 15, 30, 45'), + 'options': {'queue': QueueNames.PERIODIC} + }, + # app/celery/nightly_tasks.py + 'timeout-sending-notifications': { + 'task': 'timeout-sending-notifications', + 'schedule': crontab(hour=0, minute=5), + 'options': {'queue': QueueNames.PERIODIC} + }, + 'create-nightly-billing': { + 'task': 'create-nightly-billing', + 'schedule': crontab(hour=0, minute=15), + 'options': {'queue': QueueNames.REPORTING} + }, + 'create-nightly-notification-status': { + 'task': 'create-nightly-notification-status', + 'schedule': crontab(hour=0, minute=30), # after 'timeout-sending-notifications' + 'options': {'queue': QueueNames.REPORTING} + }, + 'delete-notifications-older-than-retention': { + 'task': 'delete-notifications-older-than-retention', + 'schedule': crontab(hour=3, minute=0), # after 'create-nightly-notification-status' + 'options': {'queue': QueueNames.PERIODIC} + }, + 'delete-inbound-sms': { + 'task': 'delete-inbound-sms', + 'schedule': crontab(hour=1, minute=40), + 'options': {'queue': QueueNames.PERIODIC} + }, + 'save-daily-notification-processing-time': { + 'task': 'save-daily-notification-processing-time', + 'schedule': crontab(hour=2, minute=0), + 'options': {'queue': QueueNames.PERIODIC} + }, + 'remove_sms_email_jobs': { + 'task': 'remove_sms_email_jobs', + 'schedule': crontab(hour=4, minute=0), + 'options': {'queue': QueueNames.PERIODIC}, + }, + 'remove_letter_jobs': { + 'task': 'remove_letter_jobs', + 'schedule': crontab(hour=4, minute=20), + # since we mark jobs as archived + 'options': {'queue': QueueNames.PERIODIC}, + }, + 'check-if-letters-still-in-created': { + 'task': 'check-if-letters-still-in-created', + 'schedule': crontab(day_of_week='mon-fri', hour=7, minute=0), + 'options': {'queue': QueueNames.PERIODIC} + }, + 'check-if-letters-still-pending-virus-check': { + 'task': 'check-if-letters-still-pending-virus-check', + 'schedule': crontab(day_of_week='mon-fri', hour='9,15', minute=0), + 'options': {'queue': QueueNames.PERIODIC} + }, + 'check-for-services-with-high-failure-rates-or-sending-to-tv-numbers': { + 'task': 'check-for-services-with-high-failure-rates-or-sending-to-tv-numbers', + 'schedule': crontab(day_of_week='mon-fri', hour=10, minute=30), + 'options': {'queue': QueueNames.PERIODIC} + }, + 'raise-alert-if-letter-notifications-still-sending': { + 'task': 'raise-alert-if-letter-notifications-still-sending', + 'schedule': crontab(hour=15, minute=30), + 'options': {'queue': QueueNames.PERIODIC} + }, + # The collate-letter-pdf does assume it is called in an hour that BST does not make a + # difference to the truncate date which translates to the filename to process + 'collate-letter-pdfs-to-be-sent': { + 'task': 'collate-letter-pdfs-to-be-sent', + 'schedule': crontab(hour=17, minute=50), + 'options': {'queue': QueueNames.PERIODIC} + }, + 'raise-alert-if-no-letter-ack-file': { + 'task': 'raise-alert-if-no-letter-ack-file', + 'schedule': crontab(hour=23, minute=00), + 'options': {'queue': QueueNames.PERIODIC} + }, + 'trigger-link-tests': { + 'task': 'trigger-link-tests', + 'schedule': timedelta(minutes=15), + 'options': {'queue': QueueNames.PERIODIC} + }, + 'auto-expire-broadcast-messages': { + 'task': 'auto-expire-broadcast-messages', + 'schedule': timedelta(minutes=5), + 'options': {'queue': QueueNames.PERIODIC} + }, + } } - # this is overriden by the -Q command, but locally, we should read from all queues - CELERY_QUEUES = [Queue(queue, Exchange('default'), routing_key=queue) for queue in QueueNames.all_queues()] + # we can set celeryd_prefetch_multiplier to be 1 for celery apps which handle only long running tasks + if os.getenv('CELERYD_PREFETCH_MULTIPLIER'): + CELERY['worker_prefetch_multiplier'] = os.getenv('CELERYD_PREFETCH_MULTIPLIER') FROM_NUMBER = 'development' @@ -421,7 +422,6 @@ class Development(Config): NOTIFY_ENVIRONMENT = 'development' NOTIFY_LOG_PATH = 'application.log' - NOTIFICATION_QUEUE_PREFIX = 'development' NOTIFY_EMAIL_DOMAIN = "notify.tools" SQLALCHEMY_DATABASE_URI = 'postgresql://localhost/notification_api' @@ -462,7 +462,10 @@ class Test(Development): # this is overriden in jenkins and on cloudfoundry SQLALCHEMY_DATABASE_URI = os.getenv('SQLALCHEMY_DATABASE_URI', 'postgresql://localhost/test_notification_api') - BROKER_URL = 'you-forgot-to-mock-celery-in-your-tests://' + CELERY = { + **Config.CELERY, + 'broker_url': 'you-forgot-to-mock-celery-in-your-tests://' + } ANTIVIRUS_ENABLED = True From 60799399ab4281eade51fba3a3eaada2bcf02306 Mon Sep 17 00:00:00 2001 From: Ben Thorner Date: Wed, 27 Oct 2021 16:57:06 +0100 Subject: [PATCH 05/13] Remove anyjson package This is no longer required by Celery [1] and now causes an error when deploying with the new versions of other packages: use_2to3 is invalid [1]: https://docs.celeryproject.org/en/stable/history/whatsnew-4.0.html#requirements --- requirements.txt | 1 - 1 file changed, 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 316748e23..c4d4cc891 100644 --- a/requirements.txt +++ b/requirements.txt @@ -47,7 +47,6 @@ gds-metrics==0.2.4 ## The following requirements were added by pip freeze: alembic==1.7.4 amqp==5.0.6 -anyjson==0.3.3 attrs==21.2.0 awscli==1.21.4 bcrypt==3.2.0 From d0550533a7b317a7c80fe0703d2ee0d8241dac17 Mon Sep 17 00:00:00 2001 From: Ben Thorner Date: Wed, 27 Oct 2021 16:59:34 +0100 Subject: [PATCH 06/13] Remove redundant polling_interval setting This appeared without explanation in [1], but it's the same as the default value [2] so we don't need to specify it - doing so gives the impression we made a decision, but that's not clear here. [1]: https://github.com/alphagov/notifications-api/pull/2142/files#diff-84f1a9419471e289c6b6e2b0209b329e20df6cef81d1f7f0a193ddc2fc6ad69dR153 [2]: https://docs.celeryproject.org/en/stable/getting-started/backends-and-brokers/sqs.html#polling-interval --- app/config.py | 1 - 1 file changed, 1 deletion(-) diff --git a/app/config.py b/app/config.py index 1a8cd3cf9..317dccd80 100644 --- a/app/config.py +++ b/app/config.py @@ -191,7 +191,6 @@ class Config(object): 'broker_url': 'sqs://', 'broker_transport_options': { 'region': AWS_REGION, - 'polling_interval': 1, # 1 second 'visibility_timeout': 310, 'queue_name_prefix': NOTIFICATION_QUEUE_PREFIX, }, From 89e390a3fc6ecf441d2e0e932f08e1a293f40e0f Mon Sep 17 00:00:00 2001 From: Ben Thorner Date: Thu, 28 Oct 2021 11:43:58 +0100 Subject: [PATCH 07/13] Run "make freeze-requirements" Most of these are due to dependency changes in celery / kombu: -boto==2.49.0 https://github.com/celery/celery/commit/9b2a1720781930f8eed87bce2c3396e40a99529e +cached-property==1.5.2 https://github.com/celery/kombu/commit/560518287ab672fff3ca98107fb6bded456f6a01 +click-didyoumean==0.3.0 +click-plugins==1.1.1 +click-repl==0.2.0 https://github.com/celery/celery/blame/f462a437e3371acb867e94b52c2595b6d0a742d8/requirements/default.txt +pycurl==7.43.0.5 https://github.com/celery/celery/blame/59d88326b8caa84083c01efb3a3983b3332853e9/requirements/extras/sqs.txt +vine==5.0.0 https://github.com/celery/celery/commit/f6c3b3313f8d43b0919a09dae9107a8c0a09aa6d I'm not sure about the following, but neither are critical so I don't think it's worth tracking down where they came from. +prompt-toolkit==3.0.21 +wcwidth==0.2.5 --- requirements.txt | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index c4d4cc891..e14f0d527 100644 --- a/requirements.txt +++ b/requirements.txt @@ -53,12 +53,15 @@ bcrypt==3.2.0 billiard==3.6.4.0 bleach==4.1.0 blinker==1.4 -boto==2.49.0 boto3==1.19.4 botocore==1.22.4 +cached-property==1.5.2 certifi==2021.10.8 charset-normalizer==2.0.7 click==7.1.2 +click-didyoumean==0.3.0 +click-plugins==1.1.1 +click-repl==0.2.0 colorama==0.4.3 dataclasses==0.8 dnspython==1.16.0 @@ -79,8 +82,10 @@ mistune==0.8.4 orderedset==2.0.3 packaging==21.0 phonenumbers==8.12.36 +prompt-toolkit==3.0.21 pyasn1==0.4.8 pycparser==2.20 +pycurl==7.43.0.5 pyparsing==3.0.1 PyPDF2==1.26.0 pyrsistent==0.18.0 @@ -99,5 +104,7 @@ soupsieve==2.2.1 statsd==3.3.0 typing-extensions==3.10.0.2 urllib3==1.26.7 +vine==5.0.0 +wcwidth==0.2.5 webencodings==0.5.1 zipp==3.6.0 From efe4c6f06ed05ebf2ef99801bb989f1f527ee6c5 Mon Sep 17 00:00:00 2001 From: Ben Thorner Date: Thu, 28 Oct 2021 14:32:29 +0100 Subject: [PATCH 08/13] Fix notify-api crashing in PaaS This is purely by elimination: I couldn't see anything in the logs to indicate the cause of the crashes, just that the processes were exiting. The crash seemed to happen immediately after the AWS logs part of the wrapper script, which was a small indicator it might be something AWS-related. Since this package is no longer included by other dependencies, we need to include it explicitly. --- requirements-app.txt | 1 + requirements.txt | 1 + 2 files changed, 2 insertions(+) diff --git a/requirements-app.txt b/requirements-app.txt index 45e5a6197..ea0d82fcb 100644 --- a/requirements-app.txt +++ b/requirements-app.txt @@ -1,6 +1,7 @@ # Run `make freeze-requirements` to update requirements.txt # with package version changes made in requirements-app.txt +boto==2.49.0 # notify-api crashes in PaaS without this cffi==1.14.5 celery[sqs]==5.1.2 docopt==0.6.2 diff --git a/requirements.txt b/requirements.txt index e14f0d527..304eda7be 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,6 +3,7 @@ # Run `make freeze-requirements` to update requirements.txt # with package version changes made in requirements-app.txt +boto==2.49.0 # notify-api crashes in PaaS without this cffi==1.14.5 celery[sqs]==5.1.2 docopt==0.6.2 From 29c92a9e54fe406f9490f6c89bd047832f75ab5f Mon Sep 17 00:00:00 2001 From: Ben Thorner Date: Fri, 29 Oct 2021 10:51:22 +0100 Subject: [PATCH 09/13] Try removing boto package again --- app/v2/notifications/post_notifications.py | 4 ++-- requirements-app.txt | 1 - requirements.txt | 1 - tests/app/v2/notifications/test_post_notifications.py | 4 ++-- 4 files changed, 4 insertions(+), 6 deletions(-) diff --git a/app/v2/notifications/post_notifications.py b/app/v2/notifications/post_notifications.py index 40cee1678..01cf21a56 100644 --- a/app/v2/notifications/post_notifications.py +++ b/app/v2/notifications/post_notifications.py @@ -3,7 +3,7 @@ import functools import uuid from datetime import datetime -from boto.exception import SQSError +import botocore from flask import abort, current_app, jsonify, request from gds_metrics import Histogram from notifications_utils.recipients import try_validate_and_format_phone_number @@ -232,7 +232,7 @@ def process_sms_or_email_notification( reply_to_text=reply_to_text ) return resp - except SQSError: + except botocore.exceptions.ClientError: # if SQS cannot put the task on the queue, it's probably because the notification body was too long and it # went over SQS's 256kb message limit. If so, we current_app.logger.info( diff --git a/requirements-app.txt b/requirements-app.txt index ea0d82fcb..45e5a6197 100644 --- a/requirements-app.txt +++ b/requirements-app.txt @@ -1,7 +1,6 @@ # Run `make freeze-requirements` to update requirements.txt # with package version changes made in requirements-app.txt -boto==2.49.0 # notify-api crashes in PaaS without this cffi==1.14.5 celery[sqs]==5.1.2 docopt==0.6.2 diff --git a/requirements.txt b/requirements.txt index 304eda7be..e14f0d527 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,7 +3,6 @@ # Run `make freeze-requirements` to update requirements.txt # with package version changes made in requirements-app.txt -boto==2.49.0 # notify-api crashes in PaaS without this cffi==1.14.5 celery[sqs]==5.1.2 docopt==0.6.2 diff --git a/tests/app/v2/notifications/test_post_notifications.py b/tests/app/v2/notifications/test_post_notifications.py index dde524dea..7d500d84a 100644 --- a/tests/app/v2/notifications/test_post_notifications.py +++ b/tests/app/v2/notifications/test_post_notifications.py @@ -2,8 +2,8 @@ import uuid from unittest import mock from unittest.mock import call +import botocore import pytest -from boto.exception import SQSError from flask import current_app, json from app.dao import templates_dao @@ -1074,7 +1074,7 @@ def test_post_notifications_saves_email_or_sms_normally_if_saving_to_queue_fails ): save_task = mocker.patch( f"app.celery.tasks.save_api_{notification_type}.apply_async", - side_effect=SQSError({'some': 'json'}, 'some opname') + side_effect=botocore.exceptions.ClientError({'some': 'json'}, 'some opname') ) mock_send_task = mocker.patch(f'app.celery.provider_tasks.deliver_{notification_type}.apply_async') From 3ecbdbb2609a9edcca5be1892053d8379bb6b580 Mon Sep 17 00:00:00 2001 From: Ben Thorner Date: Mon, 1 Nov 2021 11:39:57 +0000 Subject: [PATCH 10/13] Temporarily disable task argument checking This was added in Celery 4 [1]. and appears to be incompatible with our approach of injecting "request_id" into task arguments (example exception below). Although our other apps are on Celery 5 our logs don't show any similar issues, probably because all their tasks are invoked without request IDs. In the longterm we should decide if we want to enable argument checking and fix the tracing approach, or stop tracing request IDs in Celery tasks. [1]: https://docs.celeryproject.org/en/stable/userguide/tasks.html#argument-checking 2021-11-01T11:37:36 delivery delivery ERROR None "RETRY: Email notification f69a9305-686f-42eb-a2ee-61bc2ba1f5f3 failed" [in /Users/benthorner/Documents/Projects/api/app/celery/provider_tasks.py:68] Traceback (most recent call last): File "/Users/benthorner/Documents/Projects/api/app/celery/provider_tasks.py", line 53, in deliver_email raise TypeError("test retry") TypeError: test retry [2021-11-01 11:37:36,385: ERROR/ForkPoolWorker-1] RETRY: Email notification f69a9305-686f-42eb-a2ee-61bc2ba1f5f3 failed Traceback (most recent call last): File "/Users/benthorner/Documents/Projects/api/app/celery/provider_tasks.py", line 53, in deliver_email raise TypeError("test retry") TypeError: test retry [2021-11-01 11:37:36,394: WARNING/ForkPoolWorker-1] Task deliver_email[449cd221-173c-4e18-83ac-229e88c029a5] reject requeue=False: deliver_email() got an unexpected keyword argument 'request_id' Traceback (most recent call last): File "/Users/benthorner/Documents/Projects/api/app/celery/provider_tasks.py", line 53, in deliver_email raise TypeError("test retry") TypeError: test retry During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/Users/benthorner/.pyenv/versions/notifications-api/lib/python3.6/site-packages/celery/app/task.py", line 731, in retry S.apply_async() File "/Users/benthorner/.pyenv/versions/notifications-api/lib/python3.6/site-packages/celery/canvas.py", line 219, in apply_async return _apply(args, kwargs, **options) File "/Users/benthorner/.pyenv/versions/notifications-api/lib/python3.6/site-packages/celery/app/task.py", line 537, in apply_async check_arguments(*(args or ()), **(kwargs or {})) TypeError: deliver_email() got an unexpected keyword argument 'request_id' During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/Users/benthorner/.pyenv/versions/notifications-api/lib/python3.6/site-packages/celery/app/trace.py", line 450, in trace_task R = retval = fun(*args, **kwargs) File "/Users/benthorner/Documents/Projects/api/app/celery/celery.py", line 74, in __call__ return super().__call__(*args, **kwargs) File "/Users/benthorner/.pyenv/versions/notifications-api/lib/python3.6/site-packages/celery/app/trace.py", line 731, in __protected_call__ return self.run(*args, **kwargs) File "/Users/benthorner/Documents/Projects/api/app/celery/provider_tasks.py", line 71, in deliver_email self.retry(queue=QueueNames.RETRY) File "/Users/benthorner/.pyenv/versions/notifications-api/lib/python3.6/site-packages/celery/app/task.py", line 733, in retry raise Reject(exc, requeue=False) celery.exceptions.Reject: (TypeError("deliver_email() got an unexpected keyword argument 'request_id'",), False) --- app/celery/celery.py | 1 + 1 file changed, 1 insertion(+) diff --git a/app/celery/celery.py b/app/celery/celery.py index e76af8c55..98ca6812f 100644 --- a/app/celery/celery.py +++ b/app/celery/celery.py @@ -22,6 +22,7 @@ def make_task(app): class NotifyTask(Task): abstract = True start = None + typing = False def on_success(self, retval, task_id, args, kwargs): elapsed_time = time.monotonic() - self.start From 92086e20903feb35df9f18f2a59dc19e7b46e19c Mon Sep 17 00:00:00 2001 From: sakisv Date: Thu, 4 Nov 2021 11:24:44 +0200 Subject: [PATCH 11/13] Reduce concurrency on high volume workers We noticed that having high concurrency led to significant memory usage. The hypothesis is that because of long polling, there are many connections being held open which seems to impact the memory usage. Initially the high concurrency was put in place as a way to get around the lack of long polling: We were spawning multiple processes and each one was doing many requests to SQS to check for and receive new tasks. Now with long polling enabled and reduced concurrency, the workers are much more efficient at their job (the tasks are being picked up so fast that the queues are practically empty) and much lighter on resource requirements. (This last bit will allow us to reduce the memory requirement for heavy workers like the sender and reduce our costs) The concurrency number was chosen semi-arbitrarily: Usually this is set to the number of CPUs available to the system. Because we're running on PaaS and that number is both abstracted and may be claimed for by other processes, we went for a conservative one to also reduce the competion for CPU among the processes of the same worker instance. --- scripts/paas_app_wrapper.sh | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/scripts/paas_app_wrapper.sh b/scripts/paas_app_wrapper.sh index 7bb89d759..b48627bf6 100755 --- a/scripts/paas_app_wrapper.sh +++ b/scripts/paas_app_wrapper.sh @@ -21,7 +21,7 @@ case $NOTIFY_APP_NAME in -Q research-mode-tasks 2> /dev/null ;; delivery-worker-sender) - exec scripts/run_multi_worker_app_paas.sh celery multi start 3 -c 10 -A run_celery.notify_celery --loglevel=INFO \ + exec scripts/run_multi_worker_app_paas.sh celery multi start 3 -c 4 -A run_celery.notify_celery --loglevel=INFO \ --logfile=/dev/null --pidfile=/tmp/celery%N.pid -Q send-sms-tasks,send-email-tasks ;; delivery-worker-periodic) @@ -33,7 +33,7 @@ case $NOTIFY_APP_NAME in -Q reporting-tasks 2> /dev/null ;; delivery-worker-priority) - exec scripts/run_app_paas.sh celery -A run_celery.notify_celery worker --loglevel=INFO --concurrency=5 \ + exec scripts/run_app_paas.sh celery -A run_celery.notify_celery worker --loglevel=INFO --concurrency=4 \ -Q priority-tasks 2> /dev/null ;; # Only consume the notify-internal-tasks queue on this app so that Notify messages are processed as a priority @@ -46,15 +46,15 @@ case $NOTIFY_APP_NAME in -Q broadcast-tasks 2> /dev/null ;; delivery-worker-receipts) - exec scripts/run_app_paas.sh celery -A run_celery.notify_celery worker --loglevel=INFO --concurrency=11 \ + exec scripts/run_app_paas.sh celery -A run_celery.notify_celery worker --loglevel=INFO --concurrency=4 \ -Q ses-callbacks,sms-callbacks 2> /dev/null ;; delivery-worker-service-callbacks) - exec scripts/run_app_paas.sh celery -A run_celery.notify_celery worker --loglevel=INFO --concurrency=11 \ + exec scripts/run_app_paas.sh celery -A run_celery.notify_celery worker --loglevel=INFO --concurrency=4 \ -Q service-callbacks,service-callbacks-retry 2> /dev/null ;; delivery-worker-save-api-notifications) - exec scripts/run_app_paas.sh celery -A run_celery.notify_celery worker --loglevel=INFO --concurrency=11 \ + exec scripts/run_app_paas.sh celery -A run_celery.notify_celery worker --loglevel=INFO --concurrency=4 \ -Q save-api-email-tasks,save-api-sms-tasks 2> /dev/null ;; delivery-celery-beat) From 9e9091e980c6c2a10a47cc7c443ecadbcaf29512 Mon Sep 17 00:00:00 2001 From: sakisv Date: Thu, 4 Nov 2021 16:31:22 +0200 Subject: [PATCH 12/13] Reduce concurrency for other workers too for consistency Any worker that had `--concurrency` > 4 is now set to 4 for consistency with the how volume workers. See previous commit (Reduce concurrency on high volume workers) for details --- scripts/paas_app_wrapper.sh | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/scripts/paas_app_wrapper.sh b/scripts/paas_app_wrapper.sh index b48627bf6..ce192555c 100755 --- a/scripts/paas_app_wrapper.sh +++ b/scripts/paas_app_wrapper.sh @@ -5,15 +5,15 @@ case $NOTIFY_APP_NAME in exec scripts/run_app_paas.sh gunicorn -c /home/vcap/app/gunicorn_config.py application ;; delivery-worker-retry-tasks) - exec scripts/run_app_paas.sh celery -A run_celery.notify_celery worker --loglevel=INFO --concurrency=11 \ + exec scripts/run_app_paas.sh celery -A run_celery.notify_celery worker --loglevel=INFO --concurrency=4 \ -Q retry-tasks 2> /dev/null ;; delivery-worker-letters) - exec scripts/run_app_paas.sh celery -A run_celery.notify_celery worker --loglevel=INFO --concurrency=11 \ + exec scripts/run_app_paas.sh celery -A run_celery.notify_celery worker --loglevel=INFO --concurrency=4 \ -Q create-letters-pdf-tasks,letter-tasks 2> /dev/null ;; delivery-worker-jobs) - exec scripts/run_app_paas.sh celery -A run_celery.notify_celery worker --loglevel=INFO --concurrency=11 \ + exec scripts/run_app_paas.sh celery -A run_celery.notify_celery worker --loglevel=INFO --concurrency=4 \ -Q database-tasks,job-tasks 2> /dev/null ;; delivery-worker-research) @@ -38,7 +38,7 @@ case $NOTIFY_APP_NAME in ;; # Only consume the notify-internal-tasks queue on this app so that Notify messages are processed as a priority delivery-worker-internal) - exec scripts/run_app_paas.sh celery -A run_celery.notify_celery worker --loglevel=INFO --concurrency=11 \ + exec scripts/run_app_paas.sh celery -A run_celery.notify_celery worker --loglevel=INFO --concurrency=4 \ -Q notify-internal-tasks 2> /dev/null ;; delivery-worker-broadcasts) From e10f45b3a7cd68151adba3852823a401d89882c8 Mon Sep 17 00:00:00 2001 From: Richard Baker Date: Fri, 5 Nov 2021 10:35:17 +0000 Subject: [PATCH 13/13] Cast Celery worker_max_tasks_per_child to int or None We use this config option when running workers that process non-memory-safe tasks to restart the worker after n tasks. Celery 5 requires this to be passed as an int or None. Signed-off-by: Richard Baker --- app/config.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/app/config.py b/app/config.py index 317dccd80..fa62f2b98 100644 --- a/app/config.py +++ b/app/config.py @@ -195,8 +195,6 @@ class Config(object): 'queue_name_prefix': NOTIFICATION_QUEUE_PREFIX, }, 'timezone': 'Europe/London', - # on reporting worker, restart workers after each task is executed to help prevent memory leaks - 'worker_max_tasks_per_child': os.getenv('CELERYD_MAX_TASKS_PER_CHILD'), 'imports': [ 'app.celery.tasks', 'app.celery.scheduled_tasks', @@ -340,6 +338,10 @@ class Config(object): if os.getenv('CELERYD_PREFETCH_MULTIPLIER'): CELERY['worker_prefetch_multiplier'] = os.getenv('CELERYD_PREFETCH_MULTIPLIER') + # on reporting worker, restart workers after each task is executed to help prevent memory leaks + if os.getenv('CELERYD_MAX_TASKS_PER_CHILD'): + CELERY['worker_max_tasks_per_child'] = int(os.getenv('CELERYD_MAX_TASKS_PER_CHILD')) + FROM_NUMBER = 'development' STATSD_HOST = os.getenv('STATSD_HOST')