Rewrite config to fix deprecation warnings

The new format was introduced in Celery 4 [1] and is due for removal
in Celery 6 [2], hence the warnings e.g.

    [2021-10-26 14:31:57,588: WARNING/MainProcess] /Users/benthorner/.pyenv/versions/notifications-api/lib/python3.6/site-packages/celery/app/utils.py:206: CDeprecationWarning:
        The 'CELERY_TIMEZONE' setting is deprecated and scheduled for removal in
        version 6.0.0. Use the timezone instead

      alternative=f'Use the {_TO_NEW_KEY[setting]} instead')

This rewrites the config to match our other apps [3][4]. Some of the
settings have been removed entirely:

- "CELERY_ENABLE_UTC = True" - this has been enabled by default since
  Celery 3 [5].

- "CELERY_ACCEPT_CONTENT = ['json']", "CELERY_TASK_SERIALIZER = 'json'"
  - these are the default settings since Celery 4 [6][7].

Finally, this removes a redundant (and broken) bit of development config
- NOTIFICATION_QUEUE_PREFIX - that should be set in environment.sh [8].

[1]: https://docs.celeryproject.org/en/stable/history/whatsnew-4.0.html#lowercase-setting-names
[2]: https://docs.celeryproject.org/en/stable/history/whatsnew-5.0.html#step-2-update-your-configuration-with-the-new-setting-names
[3]: 252ad01d39/app/config.py (L27)
[4]: 03df0d9252/app/__init__.py (L33)
[5]: https://docs.celeryproject.org/en/stable/userguide/configuration.html#std-setting-enable_utc
[6]: https://docs.celeryproject.org/en/stable/userguide/configuration.html#std-setting-task_serializer
[7]: https://docs.celeryproject.org/en/stable/userguide/configuration.html#std-setting-accept_content
[8]: 2edbdec4ee/README.md (environmentsh)
This commit is contained in:
Ben Thorner
2021-10-26 16:36:25 +01:00
parent 19394ab9dd
commit 44b3b42aba
2 changed files with 156 additions and 153 deletions

View File

@@ -81,11 +81,11 @@ class NotifyCelery(Celery):
def init_app(self, app): def init_app(self, app):
super().__init__( super().__init__(
app.import_name, app.import_name,
broker=app.config['BROKER_URL'], broker=app.config['CELERY']['broker_url'],
task_cls=make_task(app), task_cls=make_task(app),
) )
self.conf.update(app.config) self.conf.update(app.config['CELERY'])
self._app = app self._app = app
def send_task(self, name, args=None, kwargs=None, **other_kwargs): def send_task(self, name, args=None, kwargs=None, **other_kwargs):

View File

@@ -187,29 +187,28 @@ class Config(object):
# we only need real email in Live environment (production) # we only need real email in Live environment (production)
DVLA_EMAIL_ADDRESSES = json.loads(os.environ.get('DVLA_EMAIL_ADDRESSES', '[]')) DVLA_EMAIL_ADDRESSES = json.loads(os.environ.get('DVLA_EMAIL_ADDRESSES', '[]'))
BROKER_URL = 'sqs://' CELERY = {
BROKER_TRANSPORT_OPTIONS = { 'broker_url': 'sqs://',
'broker_transport_options': {
'region': AWS_REGION, 'region': AWS_REGION,
'polling_interval': 1, # 1 second 'polling_interval': 1, # 1 second
'visibility_timeout': 310, 'visibility_timeout': 310,
'queue_name_prefix': NOTIFICATION_QUEUE_PREFIX 'queue_name_prefix': NOTIFICATION_QUEUE_PREFIX,
} },
CELERY_ENABLE_UTC = True 'timezone': 'Europe/London',
CELERY_TIMEZONE = 'Europe/London'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
# on reporting worker, restart workers after each task is executed to help prevent memory leaks # on reporting worker, restart workers after each task is executed to help prevent memory leaks
CELERYD_MAX_TASKS_PER_CHILD = os.getenv('CELERYD_MAX_TASKS_PER_CHILD') 'worker_max_tasks_per_child': os.getenv('CELERYD_MAX_TASKS_PER_CHILD'),
# we can set celeryd_prefetch_multiplier to be 1 for celery apps which handle only long running tasks 'imports': [
if os.getenv('CELERYD_PREFETCH_MULTIPLIER'):
CELERYD_PREFETCH_MULTIPLIER = os.getenv('CELERYD_PREFETCH_MULTIPLIER')
CELERY_IMPORTS = (
'app.celery.tasks', 'app.celery.tasks',
'app.celery.scheduled_tasks', 'app.celery.scheduled_tasks',
'app.celery.reporting_tasks', 'app.celery.reporting_tasks',
'app.celery.nightly_tasks', 'app.celery.nightly_tasks',
) ],
CELERYBEAT_SCHEDULE = { # this is overriden by the -Q command, but locally, we should read from all queues
'task_queues': [
Queue(queue, Exchange('default'), routing_key=queue) for queue in QueueNames.all_queues()
],
'beat_schedule': {
# app/celery/scheduled_tasks.py # app/celery/scheduled_tasks.py
'run-scheduled-jobs': { 'run-scheduled-jobs': {
'task': 'run-scheduled-jobs', 'task': 'run-scheduled-jobs',
@@ -336,9 +335,11 @@ class Config(object):
'options': {'queue': QueueNames.PERIODIC} 'options': {'queue': QueueNames.PERIODIC}
}, },
} }
}
# this is overriden by the -Q command, but locally, we should read from all queues # we can set celeryd_prefetch_multiplier to be 1 for celery apps which handle only long running tasks
CELERY_QUEUES = [Queue(queue, Exchange('default'), routing_key=queue) for queue in QueueNames.all_queues()] if os.getenv('CELERYD_PREFETCH_MULTIPLIER'):
CELERY['worker_prefetch_multiplier'] = os.getenv('CELERYD_PREFETCH_MULTIPLIER')
FROM_NUMBER = 'development' FROM_NUMBER = 'development'
@@ -421,7 +422,6 @@ class Development(Config):
NOTIFY_ENVIRONMENT = 'development' NOTIFY_ENVIRONMENT = 'development'
NOTIFY_LOG_PATH = 'application.log' NOTIFY_LOG_PATH = 'application.log'
NOTIFICATION_QUEUE_PREFIX = 'development'
NOTIFY_EMAIL_DOMAIN = "notify.tools" NOTIFY_EMAIL_DOMAIN = "notify.tools"
SQLALCHEMY_DATABASE_URI = 'postgresql://localhost/notification_api' SQLALCHEMY_DATABASE_URI = 'postgresql://localhost/notification_api'
@@ -462,7 +462,10 @@ class Test(Development):
# this is overriden in jenkins and on cloudfoundry # this is overriden in jenkins and on cloudfoundry
SQLALCHEMY_DATABASE_URI = os.getenv('SQLALCHEMY_DATABASE_URI', 'postgresql://localhost/test_notification_api') SQLALCHEMY_DATABASE_URI = os.getenv('SQLALCHEMY_DATABASE_URI', 'postgresql://localhost/test_notification_api')
BROKER_URL = 'you-forgot-to-mock-celery-in-your-tests://' CELERY = {
**Config.CELERY,
'broker_url': 'you-forgot-to-mock-celery-in-your-tests://'
}
ANTIVIRUS_ENABLED = True ANTIVIRUS_ENABLED = True