Duplicate metrics to StatsD for Celery tasks

Previously we used a '@statsd' decorator to time and count Celery
tasks [1]. Using a decorator isn't ideal since we need to remember
to add it to every task we define. In addition, it's not possible
to use data like the task name and queue.

In order to avoid breaking existing stats, this duplicates them as
new StatsD metrics until we have sufficient data to update dashboards
using the old ones. Using the CeleryTask superclass to send metrics
avoids a future maintenance overhead, and means we can include more
useful data in the StatsD metric. Note that the new metrics will sit
in StatsD until we add a mapping for them [2].

StatsD automatically produces a 'count' stat for timing metrics, so
we don't need to increment a separate counter for successful tasks.

[1]: dea5828d0e/app/celery/tasks.py (L65)
[2]: https://github.com/alphagov/notifications-aws/blob/master/paas/statsd/statsd-mapping.yml
This commit is contained in:
Ben Thorner
2021-04-07 14:32:24 +01:00
parent 248f5a0708
commit ab8dd6d52c
2 changed files with 18 additions and 4 deletions

View File

@@ -18,7 +18,7 @@ def log_on_worker_shutdown(sender, signal, pid, exitcode, **kwargs):
notify_celery._app.logger.info('worker shutdown: PID: {} Exitcode: {}'.format(pid, exitcode))
def make_task(app):
def make_task(app, statsd_client):
class NotifyTask(Task):
abstract = True
start = None
@@ -36,6 +36,13 @@ def make_task(app):
)
)
statsd_client.timing(
"celery.{queue_name}.{task_name}.success".format(
task_name=self.name,
queue_name=queue_name
), elapsed_time
)
def on_failure(self, exc, task_id, args, kwargs, einfo):
delivery_info = self.request.delivery_info or {}
queue_name = delivery_info.get('routing_key', 'none')
@@ -47,6 +54,13 @@ def make_task(app):
)
)
statsd_client.incr(
"celery.{queue_name}.{task_name}.failure".format(
task_name=self.name,
queue_name=queue_name
)
)
super().on_failure(exc, task_id, args, kwargs, einfo)
def __call__(self, *args, **kwargs):
@@ -75,11 +89,11 @@ def make_task(app):
class NotifyCelery(Celery):
def init_app(self, app):
def init_app(self, app, statsd_client):
super().__init__(
app.import_name,
broker=app.config['BROKER_URL'],
task_cls=make_task(app),
task_cls=make_task(app, statsd_client),
)
self.conf.update(app.config)