From ab8dd6d52cc2f71d60dfb3f5fdbc883022d96c32 Mon Sep 17 00:00:00 2001 From: Ben Thorner Date: Wed, 7 Apr 2021 14:32:24 +0100 Subject: [PATCH] Duplicate metrics to StatsD for Celery tasks Previously we used a '@statsd' decorator to time and count Celery tasks [1]. Using a decorator isn't ideal since we need to remember to add it to every task we define. In addition, it's not possible to use data like the task name and queue. In order to avoid breaking existing stats, this duplicates them as new StatsD metrics until we have sufficient data to update dashboards using the old ones. Using the CeleryTask superclass to send metrics avoids a future maintenance overhead, and means we can include more useful data in the StatsD metric. Note that the new metrics will sit in StatsD until we add a mapping for them [2]. StatsD automatically produces a 'count' stat for timing metrics, so we don't need to increment a separate counter for successful tasks. [1]: https://github.com/alphagov/notifications-api/blob/dea5828d0e708bc69916a98e2bff989a56112204/app/celery/tasks.py#L65 [2]: https://github.com/alphagov/notifications-aws/blob/master/paas/statsd/statsd-mapping.yml --- app/__init__.py | 2 +- app/celery/celery.py | 20 +++++++++++++++++--- 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/app/__init__.py b/app/__init__.py index 57cd1c2ef..3b0fd05af 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -110,7 +110,7 @@ def create_app(application): email_clients = [aws_ses_stub_client] if application.config['SES_STUB_URL'] else [aws_ses_client] notification_provider_clients.init_app(sms_clients=[firetext_client, mmg_client], email_clients=email_clients) - notify_celery.init_app(application) + notify_celery.init_app(application, statsd_client) encryption.init_app(application) redis_store.init_app(application) document_download_client.init_app(application) diff --git a/app/celery/celery.py b/app/celery/celery.py index 75d9fdc0d..ecc7486b8 100644 --- a/app/celery/celery.py +++ b/app/celery/celery.py @@ -18,7 +18,7 @@ def log_on_worker_shutdown(sender, signal, pid, exitcode, **kwargs): notify_celery._app.logger.info('worker shutdown: PID: {} Exitcode: {}'.format(pid, exitcode)) -def make_task(app): +def make_task(app, statsd_client): class NotifyTask(Task): abstract = True start = None @@ -36,6 +36,13 @@ def make_task(app): ) ) + statsd_client.timing( + "celery.{queue_name}.{task_name}.success".format( + task_name=self.name, + queue_name=queue_name + ), elapsed_time + ) + def on_failure(self, exc, task_id, args, kwargs, einfo): delivery_info = self.request.delivery_info or {} queue_name = delivery_info.get('routing_key', 'none') @@ -47,6 +54,13 @@ def make_task(app): ) ) + statsd_client.incr( + "celery.{queue_name}.{task_name}.failure".format( + task_name=self.name, + queue_name=queue_name + ) + ) + super().on_failure(exc, task_id, args, kwargs, einfo) def __call__(self, *args, **kwargs): @@ -75,11 +89,11 @@ def make_task(app): class NotifyCelery(Celery): - def init_app(self, app): + def init_app(self, app, statsd_client): super().__init__( app.import_name, broker=app.config['BROKER_URL'], - task_cls=make_task(app), + task_cls=make_task(app, statsd_client), ) self.conf.update(app.config)