Merge pull request #3366 from alphagov/celery-extend-request-id-180213914

Extend request tracing to cover Celery logs
This commit is contained in:
Ben Thorner
2021-11-12 11:10:38 +00:00
committed by GitHub

View File

@@ -1,4 +1,5 @@
import time import time
from contextlib import contextmanager
from celery import Celery, Task from celery import Celery, Task
from celery.signals import worker_process_shutdown from celery.signals import worker_process_shutdown
@@ -24,56 +25,69 @@ def make_task(app):
start = None start = None
typing = False typing = False
def on_success(self, retval, task_id, args, kwargs): @property
elapsed_time = time.monotonic() - self.start def queue_name(self):
delivery_info = self.request.delivery_info or {} delivery_info = self.request.delivery_info or {}
queue_name = delivery_info.get('routing_key', 'none') return delivery_info.get('routing_key', 'none')
app.logger.info( @property
"Celery task {task_name} (queue: {queue_name}) took {time}".format( def request_id(self):
task_name=self.name, # Note that each header is a direct attribute of the
queue_name=queue_name, # task context (aka "request").
time="{0:.4f}".format(elapsed_time) return self.request.get('notify_request_id')
@contextmanager
def app_context(self):
with app.app_context():
# Add 'request_id' to 'g' so that it gets logged.
g.request_id = self.request_id
yield
def on_success(self, retval, task_id, args, kwargs):
# enables request id tracing for these logs
with self.app_context():
elapsed_time = time.monotonic() - self.start
app.logger.info(
"Celery task {task_name} (queue: {queue_name}) took {time}".format(
task_name=self.name,
queue_name=self.queue_name,
time="{0:.4f}".format(elapsed_time)
)
) )
)
app.statsd_client.timing( app.statsd_client.timing(
"celery.{queue_name}.{task_name}.success".format( "celery.{queue_name}.{task_name}.success".format(
task_name=self.name, task_name=self.name,
queue_name=queue_name queue_name=self.queue_name
), elapsed_time ), elapsed_time
) )
def on_failure(self, exc, task_id, args, kwargs, einfo): def on_failure(self, exc, task_id, args, kwargs, einfo):
delivery_info = self.request.delivery_info or {} # enables request id tracing for these logs
queue_name = delivery_info.get('routing_key', 'none') with self.app_context():
app.logger.exception(
app.logger.exception( "Celery task {task_name} (queue: {queue_name}) failed".format(
"Celery task {task_name} (queue: {queue_name}) failed".format( task_name=self.name,
task_name=self.name, queue_name=self.queue_name,
queue_name=queue_name, )
) )
)
app.statsd_client.incr( app.statsd_client.incr(
"celery.{queue_name}.{task_name}.failure".format( "celery.{queue_name}.{task_name}.failure".format(
task_name=self.name, task_name=self.name,
queue_name=queue_name queue_name=self.queue_name
)
) )
)
super().on_failure(exc, task_id, args, kwargs, einfo) super().on_failure(exc, task_id, args, kwargs, einfo)
def __call__(self, *args, **kwargs): def __call__(self, *args, **kwargs):
# ensure task has flask context to access config, logger, etc # ensure task has flask context to access config, logger, etc
with app.app_context(): with self.app_context():
self.start = time.monotonic() self.start = time.monotonic()
# TEMPORARY: remove old piggyback values from kwargs # TEMPORARY: remove old piggyback values from kwargs
kwargs.pop('request_id', None) kwargs.pop('request_id', None)
# Add 'request_id' to 'g' so that it gets logged. Note
# that each header is a direct attribute of the task
# context (aka "request").
g.request_id = self.request.get('notify_request_id')
return super().__call__(*args, **kwargs) return super().__call__(*args, **kwargs)