diff --git a/app/__init__.py b/app/__init__.py index 57cd1c2ef..3b0fd05af 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -110,7 +110,7 @@ def create_app(application): email_clients = [aws_ses_stub_client] if application.config['SES_STUB_URL'] else [aws_ses_client] notification_provider_clients.init_app(sms_clients=[firetext_client, mmg_client], email_clients=email_clients) - notify_celery.init_app(application) + notify_celery.init_app(application, statsd_client) encryption.init_app(application) redis_store.init_app(application) document_download_client.init_app(application) diff --git a/app/celery/celery.py b/app/celery/celery.py index 75d9fdc0d..ecc7486b8 100644 --- a/app/celery/celery.py +++ b/app/celery/celery.py @@ -18,7 +18,7 @@ def log_on_worker_shutdown(sender, signal, pid, exitcode, **kwargs): notify_celery._app.logger.info('worker shutdown: PID: {} Exitcode: {}'.format(pid, exitcode)) -def make_task(app): +def make_task(app, statsd_client): class NotifyTask(Task): abstract = True start = None @@ -36,6 +36,13 @@ def make_task(app): ) ) + statsd_client.timing( + "celery.{queue_name}.{task_name}.success".format( + task_name=self.name, + queue_name=queue_name + ), elapsed_time + ) + def on_failure(self, exc, task_id, args, kwargs, einfo): delivery_info = self.request.delivery_info or {} queue_name = delivery_info.get('routing_key', 'none') @@ -47,6 +54,13 @@ def make_task(app): ) ) + statsd_client.incr( + "celery.{queue_name}.{task_name}.failure".format( + task_name=self.name, + queue_name=queue_name + ) + ) + super().on_failure(exc, task_id, args, kwargs, einfo) def __call__(self, *args, **kwargs): @@ -75,11 +89,11 @@ def make_task(app): class NotifyCelery(Celery): - def init_app(self, app): + def init_app(self, app, statsd_client): super().__init__( app.import_name, broker=app.config['BROKER_URL'], - task_cls=make_task(app), + task_cls=make_task(app, statsd_client), ) self.conf.update(app.config)