diff --git a/app/celery/celery.py b/app/celery/celery.py index 1173f93a0..d60f1903f 100644 --- a/app/celery/celery.py +++ b/app/celery/celery.py @@ -1,4 +1,5 @@ import time +from contextlib import contextmanager from celery import Celery, Task from celery.signals import worker_process_shutdown @@ -35,49 +36,58 @@ def make_task(app): # task context (aka "request"). return self.request.get('notify_request_id') + @contextmanager + def app_context(self): + with app.app_context(): + # Add 'request_id' to 'g' so that it gets logged. + g.request_id = self.request_id + yield + def on_success(self, retval, task_id, args, kwargs): - elapsed_time = time.monotonic() - self.start + # enables request id tracing for these logs + with self.app_context(): + elapsed_time = time.monotonic() - self.start - app.logger.info( - "Celery task {task_name} (queue: {queue_name}) took {time}".format( - task_name=self.name, - queue_name=self.queue_name, - time="{0:.4f}".format(elapsed_time) + app.logger.info( + "Celery task {task_name} (queue: {queue_name}) took {time}".format( + task_name=self.name, + queue_name=self.queue_name, + time="{0:.4f}".format(elapsed_time) + ) ) - ) - app.statsd_client.timing( - "celery.{queue_name}.{task_name}.success".format( - task_name=self.name, - queue_name=self.queue_name - ), elapsed_time - ) + app.statsd_client.timing( + "celery.{queue_name}.{task_name}.success".format( + task_name=self.name, + queue_name=self.queue_name + ), elapsed_time + ) def on_failure(self, exc, task_id, args, kwargs, einfo): - app.logger.exception( - "Celery task {task_name} (queue: {queue_name}) failed".format( - task_name=self.name, - queue_name=self.queue_name, + # enables request id tracing for these logs + with self.app_context(): + app.logger.exception( + "Celery task {task_name} (queue: {queue_name}) failed".format( + task_name=self.name, + queue_name=self.queue_name, + ) ) - ) - app.statsd_client.incr( - "celery.{queue_name}.{task_name}.failure".format( - task_name=self.name, - queue_name=self.queue_name + app.statsd_client.incr( + "celery.{queue_name}.{task_name}.failure".format( + task_name=self.name, + queue_name=self.queue_name + ) ) - ) - super().on_failure(exc, task_id, args, kwargs, einfo) + super().on_failure(exc, task_id, args, kwargs, einfo) def __call__(self, *args, **kwargs): # ensure task has flask context to access config, logger, etc - with app.app_context(): + with self.app_context(): self.start = time.monotonic() # TEMPORARY: remove old piggyback values from kwargs kwargs.pop('request_id', None) - # Add 'request_id' to 'g' so that it gets logged. Note - g.request_id = self.request_id return super().__call__(*args, **kwargs)