Files
notifications-api/app/celery/celery.py
Ben Thorner 44b3b42aba Rewrite config to fix deprecation warnings
The new format was introduced in Celery 4 [1] and is due for removal
in Celery 6 [2], hence the warnings e.g.

    [2021-10-26 14:31:57,588: WARNING/MainProcess] /Users/benthorner/.pyenv/versions/notifications-api/lib/python3.6/site-packages/celery/app/utils.py:206: CDeprecationWarning:
        The 'CELERY_TIMEZONE' setting is deprecated and scheduled for removal in
        version 6.0.0. Use the timezone instead

      alternative=f'Use the {_TO_NEW_KEY[setting]} instead')

This rewrites the config to match our other apps [3][4]. Some of the
settings have been removed entirely:

- "CELERY_ENABLE_UTC = True" - this has been enabled by default since
  Celery 3 [5].

- "CELERY_ACCEPT_CONTENT = ['json']", "CELERY_TASK_SERIALIZER = 'json'"
  - these are the default settings since Celery 4 [6][7].

Finally, this removes a redundant (and broken) bit of development config
- NOTIFICATION_QUEUE_PREFIX - that should be set in environment.sh [8].

[1]: https://docs.celeryproject.org/en/stable/history/whatsnew-4.0.html#lowercase-setting-names
[2]: https://docs.celeryproject.org/en/stable/history/whatsnew-5.0.html#step-2-update-your-configuration-with-the-new-setting-names
[3]: 252ad01d39/app/config.py (L27)
[4]: 03df0d9252/app/__init__.py (L33)
[5]: https://docs.celeryproject.org/en/stable/userguide/configuration.html#std-setting-enable_utc
[6]: https://docs.celeryproject.org/en/stable/userguide/configuration.html#std-setting-task_serializer
[7]: https://docs.celeryproject.org/en/stable/userguide/configuration.html#std-setting-accept_content
[8]: 2edbdec4ee/README.md (environmentsh)
2021-11-01 09:54:05 +00:00

100 lines
3.4 KiB
Python

import time
from celery import Celery, Task
from celery.signals import worker_process_shutdown
from flask import g, request
from flask.ctx import has_app_context, has_request_context
@worker_process_shutdown.connect
def log_on_worker_shutdown(sender, signal, pid, exitcode, **kwargs):
# imported here to avoid circular imports
from app import notify_celery
# if the worker has already restarted at least once, then we no longer have app context and current_app won't work
# to create a new one. Instead we have to create a new app context from the original flask app and use that instead.
with notify_celery._app.app_context():
# if the worker has restarted
notify_celery._app.logger.info('worker shutdown: PID: {} Exitcode: {}'.format(pid, exitcode))
def make_task(app):
class NotifyTask(Task):
abstract = True
start = None
def on_success(self, retval, task_id, args, kwargs):
elapsed_time = time.monotonic() - self.start
delivery_info = self.request.delivery_info or {}
queue_name = delivery_info.get('routing_key', 'none')
app.logger.info(
"Celery task {task_name} (queue: {queue_name}) took {time}".format(
task_name=self.name,
queue_name=queue_name,
time="{0:.4f}".format(elapsed_time)
)
)
app.statsd_client.timing(
"celery.{queue_name}.{task_name}.success".format(
task_name=self.name,
queue_name=queue_name
), elapsed_time
)
def on_failure(self, exc, task_id, args, kwargs, einfo):
delivery_info = self.request.delivery_info or {}
queue_name = delivery_info.get('routing_key', 'none')
app.logger.exception(
"Celery task {task_name} (queue: {queue_name}) failed".format(
task_name=self.name,
queue_name=queue_name,
)
)
app.statsd_client.incr(
"celery.{queue_name}.{task_name}.failure".format(
task_name=self.name,
queue_name=queue_name
)
)
super().on_failure(exc, task_id, args, kwargs, einfo)
def __call__(self, *args, **kwargs):
# ensure task has flask context to access config, logger, etc
with app.app_context():
self.start = time.monotonic()
# Remove piggyback values from kwargs
# Add 'request_id' to 'g' so that it gets logged
g.request_id = kwargs.pop('request_id', None)
return super().__call__(*args, **kwargs)
return NotifyTask
class NotifyCelery(Celery):
def init_app(self, app):
super().__init__(
app.import_name,
broker=app.config['CELERY']['broker_url'],
task_cls=make_task(app),
)
self.conf.update(app.config['CELERY'])
self._app = app
def send_task(self, name, args=None, kwargs=None, **other_kwargs):
kwargs = kwargs or {}
if has_request_context() and hasattr(request, 'request_id'):
kwargs['request_id'] = request.request_id
elif has_app_context() and 'request_id' in g:
kwargs['request_id'] = g.request_id
return super().send_task(name, args, kwargs, **other_kwargs)