Files
notifications-api/app/celery/celery.py
Ben Thorner ab8dd6d52c Duplicate metrics to StatsD for Celery tasks
Previously we used a '@statsd' decorator to time and count Celery
tasks [1]. Using a decorator isn't ideal since we need to remember
to add it to every task we define. In addition, it's not possible
to use data like the task name and queue.

In order to avoid breaking existing stats, this duplicates them as
new StatsD metrics until we have sufficient data to update dashboards
using the old ones. Using the CeleryTask superclass to send metrics
avoids a future maintenance overhead, and means we can include more
useful data in the StatsD metric. Note that the new metrics will sit
in StatsD until we add a mapping for them [2].

StatsD automatically produces a 'count' stat for timing metrics, so
we don't need to increment a separate counter for successful tasks.

[1]: dea5828d0e/app/celery/tasks.py (L65)
[2]: https://github.com/alphagov/notifications-aws/blob/master/paas/statsd/statsd-mapping.yml
2021-04-08 18:02:53 +01:00

101 lines
3.6 KiB
Python

import time
from celery import Celery, Task
from celery.signals import worker_process_shutdown
from flask import g, request
from flask.ctx import has_app_context, has_request_context
@worker_process_shutdown.connect
def log_on_worker_shutdown(sender, signal, pid, exitcode, **kwargs):
# imported here to avoid circular imports
from app import notify_celery
# if the worker has already restarted at least once, then we no longer have app context and current_app won't work
# to create a new one. Instead we have to create a new app context from the original flask app and use that instead.
with notify_celery._app.app_context():
# if the worker has restarted
notify_celery._app.logger.info('worker shutdown: PID: {} Exitcode: {}'.format(pid, exitcode))
def make_task(app, statsd_client):
class NotifyTask(Task):
abstract = True
start = None
def on_success(self, retval, task_id, args, kwargs):
elapsed_time = time.monotonic() - self.start
delivery_info = self.request.delivery_info or {}
queue_name = delivery_info.get('routing_key', 'none')
app.logger.info(
"Celery task {task_name} (queue: {queue_name}) took {time}".format(
task_name=self.name,
queue_name=queue_name,
time="{0:.4f}".format(elapsed_time)
)
)
statsd_client.timing(
"celery.{queue_name}.{task_name}.success".format(
task_name=self.name,
queue_name=queue_name
), elapsed_time
)
def on_failure(self, exc, task_id, args, kwargs, einfo):
delivery_info = self.request.delivery_info or {}
queue_name = delivery_info.get('routing_key', 'none')
app.logger.exception(
"Celery task {task_name} (queue: {queue_name}) failed".format(
task_name=self.name,
queue_name=queue_name,
)
)
statsd_client.incr(
"celery.{queue_name}.{task_name}.failure".format(
task_name=self.name,
queue_name=queue_name
)
)
super().on_failure(exc, task_id, args, kwargs, einfo)
def __call__(self, *args, **kwargs):
# ensure task has flask context to access config, logger, etc
with app.app_context():
self.start = time.monotonic()
# Remove piggyback values from kwargs
# Add 'request_id' to 'g' so that it gets logged
g.request_id = kwargs.pop('request_id', None)
return super().__call__(*args, **kwargs)
def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
link=None, link_error=None, **options):
kwargs = kwargs or {}
if has_request_context() and hasattr(request, 'request_id'):
kwargs['request_id'] = request.request_id
elif has_app_context() and 'request_id' in g:
kwargs['request_id'] = g.request_id
return super().apply_async(args, kwargs, task_id, producer, link, link_error, **options)
return NotifyTask
class NotifyCelery(Celery):
def init_app(self, app, statsd_client):
super().__init__(
app.import_name,
broker=app.config['BROKER_URL'],
task_cls=make_task(app, statsd_client),
)
self.conf.update(app.config)
self._app = app