Files
notifications-api/app/celery/celery.py
Ben Thorner 3ecbdbb260 Temporarily disable task argument checking
This was added in Celery 4 [1]. and appears to be incompatible with
our approach of injecting "request_id" into task arguments (example
exception below). Although our other apps are on Celery 5 our logs
don't show any similar issues, probably because all their tasks are
invoked without request IDs. In the longterm we should decide if we
want to enable argument checking and fix the tracing approach, or
stop tracing request IDs in Celery tasks.

[1]: https://docs.celeryproject.org/en/stable/userguide/tasks.html#argument-checking

    2021-11-01T11:37:36 delivery delivery ERROR None "RETRY: Email notification f69a9305-686f-42eb-a2ee-61bc2ba1f5f3 failed" [in /Users/benthorner/Documents/Projects/api/app/celery/provider_tasks.py:68]
    Traceback (most recent call last):
      File "/Users/benthorner/Documents/Projects/api/app/celery/provider_tasks.py", line 53, in deliver_email
        raise TypeError("test retry")
    TypeError: test retry
    [2021-11-01 11:37:36,385: ERROR/ForkPoolWorker-1] RETRY: Email notification f69a9305-686f-42eb-a2ee-61bc2ba1f5f3 failed
    Traceback (most recent call last):
      File "/Users/benthorner/Documents/Projects/api/app/celery/provider_tasks.py", line 53, in deliver_email
        raise TypeError("test retry")
    TypeError: test retry
    [2021-11-01 11:37:36,394: WARNING/ForkPoolWorker-1] Task deliver_email[449cd221-173c-4e18-83ac-229e88c029a5] reject requeue=False: deliver_email() got an unexpected keyword argument 'request_id'
    Traceback (most recent call last):
      File "/Users/benthorner/Documents/Projects/api/app/celery/provider_tasks.py", line 53, in deliver_email
        raise TypeError("test retry")
    TypeError: test retry

    During handling of the above exception, another exception occurred:

    Traceback (most recent call last):
      File "/Users/benthorner/.pyenv/versions/notifications-api/lib/python3.6/site-packages/celery/app/task.py", line 731, in retry
        S.apply_async()
      File "/Users/benthorner/.pyenv/versions/notifications-api/lib/python3.6/site-packages/celery/canvas.py", line 219, in apply_async
        return _apply(args, kwargs, **options)
      File "/Users/benthorner/.pyenv/versions/notifications-api/lib/python3.6/site-packages/celery/app/task.py", line 537, in apply_async
        check_arguments(*(args or ()), **(kwargs or {}))
    TypeError: deliver_email() got an unexpected keyword argument 'request_id'

    During handling of the above exception, another exception occurred:

    Traceback (most recent call last):
      File "/Users/benthorner/.pyenv/versions/notifications-api/lib/python3.6/site-packages/celery/app/trace.py", line 450, in trace_task
        R = retval = fun(*args, **kwargs)
      File "/Users/benthorner/Documents/Projects/api/app/celery/celery.py", line 74, in __call__
        return super().__call__(*args, **kwargs)
      File "/Users/benthorner/.pyenv/versions/notifications-api/lib/python3.6/site-packages/celery/app/trace.py", line 731, in __protected_call__
        return self.run(*args, **kwargs)
      File "/Users/benthorner/Documents/Projects/api/app/celery/provider_tasks.py", line 71, in deliver_email
        self.retry(queue=QueueNames.RETRY)
      File "/Users/benthorner/.pyenv/versions/notifications-api/lib/python3.6/site-packages/celery/app/task.py", line 733, in retry
        raise Reject(exc, requeue=False)
    celery.exceptions.Reject: (TypeError("deliver_email() got an unexpected keyword argument 'request_id'",), False)
2021-11-01 11:39:57 +00:00

101 lines
3.5 KiB
Python

import time
from celery import Celery, Task
from celery.signals import worker_process_shutdown
from flask import g, request
from flask.ctx import has_app_context, has_request_context
@worker_process_shutdown.connect
def log_on_worker_shutdown(sender, signal, pid, exitcode, **kwargs):
# imported here to avoid circular imports
from app import notify_celery
# if the worker has already restarted at least once, then we no longer have app context and current_app won't work
# to create a new one. Instead we have to create a new app context from the original flask app and use that instead.
with notify_celery._app.app_context():
# if the worker has restarted
notify_celery._app.logger.info('worker shutdown: PID: {} Exitcode: {}'.format(pid, exitcode))
def make_task(app):
class NotifyTask(Task):
abstract = True
start = None
typing = False
def on_success(self, retval, task_id, args, kwargs):
elapsed_time = time.monotonic() - self.start
delivery_info = self.request.delivery_info or {}
queue_name = delivery_info.get('routing_key', 'none')
app.logger.info(
"Celery task {task_name} (queue: {queue_name}) took {time}".format(
task_name=self.name,
queue_name=queue_name,
time="{0:.4f}".format(elapsed_time)
)
)
app.statsd_client.timing(
"celery.{queue_name}.{task_name}.success".format(
task_name=self.name,
queue_name=queue_name
), elapsed_time
)
def on_failure(self, exc, task_id, args, kwargs, einfo):
delivery_info = self.request.delivery_info or {}
queue_name = delivery_info.get('routing_key', 'none')
app.logger.exception(
"Celery task {task_name} (queue: {queue_name}) failed".format(
task_name=self.name,
queue_name=queue_name,
)
)
app.statsd_client.incr(
"celery.{queue_name}.{task_name}.failure".format(
task_name=self.name,
queue_name=queue_name
)
)
super().on_failure(exc, task_id, args, kwargs, einfo)
def __call__(self, *args, **kwargs):
# ensure task has flask context to access config, logger, etc
with app.app_context():
self.start = time.monotonic()
# Remove piggyback values from kwargs
# Add 'request_id' to 'g' so that it gets logged
g.request_id = kwargs.pop('request_id', None)
return super().__call__(*args, **kwargs)
return NotifyTask
class NotifyCelery(Celery):
def init_app(self, app):
super().__init__(
app.import_name,
broker=app.config['CELERY']['broker_url'],
task_cls=make_task(app),
)
self.conf.update(app.config['CELERY'])
self._app = app
def send_task(self, name, args=None, kwargs=None, **other_kwargs):
kwargs = kwargs or {}
if has_request_context() and hasattr(request, 'request_id'):
kwargs['request_id'] = request.request_id
elif has_app_context() and 'request_id' in g:
kwargs['request_id'] = g.request_id
return super().send_task(name, args, kwargs, **other_kwargs)