From 054205835b0b46d7cbc47b2ec3cb68ec5fdd3fbe Mon Sep 17 00:00:00 2001 From: Ben Thorner Date: Wed, 7 Apr 2021 13:54:04 +0100 Subject: [PATCH 1/7] Remove unused metric for SQS apply duration This was added as part of a wider performance investigation [1]. I checked with Leo, who made the change, and while the other metrics are still be useful, there's no reason to keep this one. [1]: https://github.com/alphagov/notifications-api/commit/6e32ca59963b53a8a404b9362a56fc689d6fd4c2#diff-76936416943346b5f691dac57a64acebc6a1227293820d1d9af4791087c9fb9eR23 --- app/celery/celery.py | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/app/celery/celery.py b/app/celery/celery.py index 6018319c8..9729391a3 100644 --- a/app/celery/celery.py +++ b/app/celery/celery.py @@ -4,7 +4,6 @@ from celery import Celery, Task from celery.signals import worker_process_shutdown from flask import g, request from flask.ctx import has_app_context, has_request_context -from gds_metrics.metrics import Histogram @worker_process_shutdown.connect @@ -20,12 +19,6 @@ def log_on_worker_shutdown(sender, signal, pid, exitcode, **kwargs): def make_task(app): - SQS_APPLY_ASYNC_DURATION_SECONDS = Histogram( - 'sqs_apply_async_duration_seconds', - 'Time taken to put task on queue', - ['task_name'] - ) - class NotifyTask(Task): abstract = True start = None @@ -60,8 +53,7 @@ def make_task(app): elif has_app_context() and 'request_id' in g: kwargs['request_id'] = g.request_id - with SQS_APPLY_ASYNC_DURATION_SECONDS.labels(self.name).time(): - return super().apply_async(args, kwargs, task_id, producer, link, link_error, **options) + return super().apply_async(args, kwargs, task_id, producer, link, link_error, **options) return NotifyTask From 19be4faf4517f17100fb1c58ca318cbbb9fc6b65 Mon Sep 17 00:00:00 2001 From: Ben Thorner Date: Wed, 7 Apr 2021 13:56:10 +0100 Subject: [PATCH 2/7] Switch to monotonic time for task logs This matches the approach we take in utils [1]. Monotonic time is better because it avoids weird negative results due to clock shift. [1]: https://github.com/alphagov/notifications-utils/blob/5d18ebd79623e7d16bd48fbfe78ae632c0c0aaf1/notifications_utils/statsd_decorators.py#L14 --- app/celery/celery.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/app/celery/celery.py b/app/celery/celery.py index 9729391a3..7a1eceb45 100644 --- a/app/celery/celery.py +++ b/app/celery/celery.py @@ -24,7 +24,7 @@ def make_task(app): start = None def on_success(self, retval, task_id, args, kwargs): - elapsed_time = time.time() - self.start + elapsed_time = time.monotonic() - self.start app.logger.info( "{task_name} took {time}".format( task_name=self.name, time="{0:.4f}".format(elapsed_time) @@ -39,7 +39,7 @@ def make_task(app): def __call__(self, *args, **kwargs): # ensure task has flask context to access config, logger, etc with app.app_context(): - self.start = time.time() + self.start = time.monotonic() # Remove 'request_id' from the kwargs (so the task doesn't get an unexpected kwarg), then add it to g # so that it gets logged g.request_id = kwargs.pop('request_id', None) From 248f5a07082e8ca620062bd59b08334bbbe7b74b Mon Sep 17 00:00:00 2001 From: Ben Thorner Date: Wed, 7 Apr 2021 14:10:39 +0100 Subject: [PATCH 3/7] Include queue name in Celery task logs This is mainly so we can use it in the new metrics we send to StatsD in the following commits, but it should also be useful in the logs. I've taken the opportunity to make the log format consistent between success / failure, and with our Template Preview app [1]. [1]: https://github.com/alphagov/notifications-template-preview/blob/f456433a5ae7b41661ee014b6589bb31cc2e2c66/app/celery/celery.py#L19 --- app/celery/celery.py | 27 +++++++++++++++++++++------ 1 file changed, 21 insertions(+), 6 deletions(-) diff --git a/app/celery/celery.py b/app/celery/celery.py index 7a1eceb45..75d9fdc0d 100644 --- a/app/celery/celery.py +++ b/app/celery/celery.py @@ -25,29 +25,44 @@ def make_task(app): def on_success(self, retval, task_id, args, kwargs): elapsed_time = time.monotonic() - self.start + delivery_info = self.request.delivery_info or {} + queue_name = delivery_info.get('routing_key', 'none') + app.logger.info( - "{task_name} took {time}".format( - task_name=self.name, time="{0:.4f}".format(elapsed_time) + "Celery task {task_name} (queue: {queue_name}) took {time}".format( + task_name=self.name, + queue_name=queue_name, + time="{0:.4f}".format(elapsed_time) ) ) def on_failure(self, exc, task_id, args, kwargs, einfo): - # ensure task will log exceptions to correct handlers - app.logger.exception('Celery task: {} failed'.format(self.name)) + delivery_info = self.request.delivery_info or {} + queue_name = delivery_info.get('routing_key', 'none') + + app.logger.exception( + "Celery task {task_name} (queue: {queue_name}) failed".format( + task_name=self.name, + queue_name=queue_name, + ) + ) + super().on_failure(exc, task_id, args, kwargs, einfo) def __call__(self, *args, **kwargs): # ensure task has flask context to access config, logger, etc with app.app_context(): self.start = time.monotonic() - # Remove 'request_id' from the kwargs (so the task doesn't get an unexpected kwarg), then add it to g - # so that it gets logged + # Remove piggyback values from kwargs + # Add 'request_id' to 'g' so that it gets logged g.request_id = kwargs.pop('request_id', None) + return super().__call__(*args, **kwargs) def apply_async(self, args=None, kwargs=None, task_id=None, producer=None, link=None, link_error=None, **options): kwargs = kwargs or {} + if has_request_context() and hasattr(request, 'request_id'): kwargs['request_id'] = request.request_id elif has_app_context() and 'request_id' in g: From ab8dd6d52cc2f71d60dfb3f5fdbc883022d96c32 Mon Sep 17 00:00:00 2001 From: Ben Thorner Date: Wed, 7 Apr 2021 14:32:24 +0100 Subject: [PATCH 4/7] Duplicate metrics to StatsD for Celery tasks Previously we used a '@statsd' decorator to time and count Celery tasks [1]. Using a decorator isn't ideal since we need to remember to add it to every task we define. In addition, it's not possible to use data like the task name and queue. In order to avoid breaking existing stats, this duplicates them as new StatsD metrics until we have sufficient data to update dashboards using the old ones. Using the CeleryTask superclass to send metrics avoids a future maintenance overhead, and means we can include more useful data in the StatsD metric. Note that the new metrics will sit in StatsD until we add a mapping for them [2]. StatsD automatically produces a 'count' stat for timing metrics, so we don't need to increment a separate counter for successful tasks. [1]: https://github.com/alphagov/notifications-api/blob/dea5828d0e708bc69916a98e2bff989a56112204/app/celery/tasks.py#L65 [2]: https://github.com/alphagov/notifications-aws/blob/master/paas/statsd/statsd-mapping.yml --- app/__init__.py | 2 +- app/celery/celery.py | 20 +++++++++++++++++--- 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/app/__init__.py b/app/__init__.py index 57cd1c2ef..3b0fd05af 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -110,7 +110,7 @@ def create_app(application): email_clients = [aws_ses_stub_client] if application.config['SES_STUB_URL'] else [aws_ses_client] notification_provider_clients.init_app(sms_clients=[firetext_client, mmg_client], email_clients=email_clients) - notify_celery.init_app(application) + notify_celery.init_app(application, statsd_client) encryption.init_app(application) redis_store.init_app(application) document_download_client.init_app(application) diff --git a/app/celery/celery.py b/app/celery/celery.py index 75d9fdc0d..ecc7486b8 100644 --- a/app/celery/celery.py +++ b/app/celery/celery.py @@ -18,7 +18,7 @@ def log_on_worker_shutdown(sender, signal, pid, exitcode, **kwargs): notify_celery._app.logger.info('worker shutdown: PID: {} Exitcode: {}'.format(pid, exitcode)) -def make_task(app): +def make_task(app, statsd_client): class NotifyTask(Task): abstract = True start = None @@ -36,6 +36,13 @@ def make_task(app): ) ) + statsd_client.timing( + "celery.{queue_name}.{task_name}.success".format( + task_name=self.name, + queue_name=queue_name + ), elapsed_time + ) + def on_failure(self, exc, task_id, args, kwargs, einfo): delivery_info = self.request.delivery_info or {} queue_name = delivery_info.get('routing_key', 'none') @@ -47,6 +54,13 @@ def make_task(app): ) ) + statsd_client.incr( + "celery.{queue_name}.{task_name}.failure".format( + task_name=self.name, + queue_name=queue_name + ) + ) + super().on_failure(exc, task_id, args, kwargs, einfo) def __call__(self, *args, **kwargs): @@ -75,11 +89,11 @@ def make_task(app): class NotifyCelery(Celery): - def init_app(self, app): + def init_app(self, app, statsd_client): super().__init__( app.import_name, broker=app.config['BROKER_URL'], - task_cls=make_task(app), + task_cls=make_task(app, statsd_client), ) self.conf.update(app.config) From 8954cec5a18c9ff57e02711ca4f4cc5f472d3fc7 Mon Sep 17 00:00:00 2001 From: Ben Thorner Date: Thu, 8 Apr 2021 10:01:51 +0100 Subject: [PATCH 5/7] Add tests for celery task superclass This requires upgrading freezegun, as time.monotonic wasn't frozen by v1.0. Note that we need to explicitly specify the base class for the task in the test, the reason for which is quite subtle: - Normally, by using the 'notify_api' fixture, the base class is set to NotifyTask automatically by running app.create_app [1]. - However, when run alongside other tests, the imports of files with other celery tasks cause the base class to be instantiated and cached as the default Celery one. This means none of our tests actually use our custom superclass when testing tasks. Because we can't run 'apply_async' directly (since this would require an actual Celery broker), we need to manually push/pop the request Context that's normally done as part of sending a task. Note also that we use a UUID as the name for a task, since these are global. We want to avoid the task polluting other tests in future, as well as make it clear the task is being reused. [1]: https://github.com/alphagov/notifications-api/blob/dea5828d0e708bc69916a98e2bff989a56112204/app/__init__.py#L113 --- requirements_for_test.txt | 2 +- tests/app/celery/test_celery.py | 76 +++++++++++++++++++++++++++++++++ 2 files changed, 77 insertions(+), 1 deletion(-) create mode 100644 tests/app/celery/test_celery.py diff --git a/requirements_for_test.txt b/requirements_for_test.txt index 3b9beafba..c9862ea0a 100644 --- a/requirements_for_test.txt +++ b/requirements_for_test.txt @@ -8,7 +8,7 @@ pytest-env==0.6.2 pytest-mock==3.3.1 pytest-cov==2.10.1 pytest-xdist==2.1.0 -freezegun==1.0.0 +freezegun==1.1.0 requests-mock==1.8.0 # used for creating manifest file locally jinja2-cli[yaml]==0.7.0 diff --git a/tests/app/celery/test_celery.py b/tests/app/celery/test_celery.py new file mode 100644 index 000000000..bea1588e6 --- /dev/null +++ b/tests/app/celery/test_celery.py @@ -0,0 +1,76 @@ +import uuid + +import pytest +from freezegun import freeze_time + +from app import notify_celery + + +@pytest.fixture(scope='session') +def celery_task(): + @notify_celery.task(name=uuid.uuid4(), base=notify_celery.task_cls) + def test_task(delivery_info=None): pass + return test_task + + +@pytest.fixture +def async_task(celery_task): + celery_task.push_request(delivery_info={'routing_key': 'test-queue'}) + yield celery_task + celery_task.pop_request() + + +def test_success_should_log_and_call_statsd(mocker, notify_api, async_task): + statsd = mocker.patch.object(notify_api.statsd_client, 'timing') + logger = mocker.patch.object(notify_api.logger, 'info') + + with freeze_time() as frozen: + async_task() + frozen.tick(5) + + async_task.on_success( + retval=None, task_id=1234, args=[], kwargs={} + ) + + statsd.assert_called_once_with(f'celery.test-queue.{async_task.name}.success', 5.0) + logger.assert_called_once_with(f'Celery task {async_task.name} (queue: test-queue) took 5.0000') + + +def test_success_queue_when_applied_synchronously(mocker, notify_api, celery_task): + statsd = mocker.patch.object(notify_api.statsd_client, 'timing') + logger = mocker.patch.object(notify_api.logger, 'info') + + with freeze_time() as frozen: + celery_task() + frozen.tick(5) + + celery_task.on_success( + retval=None, task_id=1234, args=[], kwargs={} + ) + + statsd.assert_called_once_with(f'celery.none.{celery_task.name}.success', 5.0) + logger.assert_called_once_with(f'Celery task {celery_task.name} (queue: none) took 5.0000') + + +def test_failure_should_log_and_call_statsd(mocker, notify_api, async_task): + statsd = mocker.patch.object(notify_api.statsd_client, 'incr') + logger = mocker.patch.object(notify_api.logger, 'exception') + + async_task.on_failure( + exc=Exception, task_id=1234, args=[], kwargs={}, einfo=None + ) + + statsd.assert_called_once_with(f'celery.test-queue.{async_task.name}.failure') + logger.assert_called_once_with(f'Celery task {async_task.name} (queue: test-queue) failed') + + +def test_failure_queue_when_applied_synchronously(mocker, notify_api, celery_task): + statsd = mocker.patch.object(notify_api.statsd_client, 'incr') + logger = mocker.patch.object(notify_api.logger, 'exception') + + celery_task.on_failure( + exc=Exception, task_id=1234, args=[], kwargs={}, einfo=None + ) + + statsd.assert_called_once_with(f'celery.none.{celery_task.name}.failure') + logger.assert_called_once_with(f'Celery task {celery_task.name} (queue: none) failed') From df6e27d8fd4fcb65bd7ce0f17a10dc055f61146c Mon Sep 17 00:00:00 2001 From: Ben Thorner Date: Fri, 9 Apr 2021 10:44:36 +0100 Subject: [PATCH 6/7] Add test for extracting request_id in __call__ Tasks will fail if we leave the kwarg in, so I think it's quite important that we test this works. We don't cover this in any other test because we call the task functions directly, so the request_id kwarg doesn't get injected beforehand. --- tests/app/celery/test_celery.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/tests/app/celery/test_celery.py b/tests/app/celery/test_celery.py index bea1588e6..b28eb088b 100644 --- a/tests/app/celery/test_celery.py +++ b/tests/app/celery/test_celery.py @@ -1,13 +1,15 @@ import uuid import pytest +from flask import g from freezegun import freeze_time from app import notify_celery +# requiring notify_api ensures notify_celery.init_app has been called @pytest.fixture(scope='session') -def celery_task(): +def celery_task(notify_api): @notify_celery.task(name=uuid.uuid4(), base=notify_celery.task_cls) def test_task(delivery_info=None): pass return test_task @@ -74,3 +76,10 @@ def test_failure_queue_when_applied_synchronously(mocker, notify_api, celery_tas statsd.assert_called_once_with(f'celery.none.{celery_task.name}.failure') logger.assert_called_once_with(f'Celery task {celery_task.name} (queue: none) failed') + + +def test_call_exports_request_id_from_kwargs(mocker, celery_task): + g = mocker.patch('app.celery.celery.g') + # this would fail if the kwarg was passed through unexpectedly + celery_task(request_id='1234') + assert g.request_id == '1234' From 37f91e021473365c908c294671b92f6c9af73da2 Mon Sep 17 00:00:00 2001 From: Ben Thorner Date: Fri, 9 Apr 2021 11:40:58 +0100 Subject: [PATCH 7/7] Add tests for apply_async injecting request_id --- tests/app/celery/test_celery.py | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/tests/app/celery/test_celery.py b/tests/app/celery/test_celery.py index b28eb088b..0acbd8d23 100644 --- a/tests/app/celery/test_celery.py +++ b/tests/app/celery/test_celery.py @@ -83,3 +83,36 @@ def test_call_exports_request_id_from_kwargs(mocker, celery_task): # this would fail if the kwarg was passed through unexpectedly celery_task(request_id='1234') assert g.request_id == '1234' + + +def test_apply_async_injects_global_request_id_into_kwargs(mocker, celery_task): + super_apply = mocker.patch('celery.app.task.Task.apply_async') + g.request_id = '1234' + celery_task.apply_async() + + super_apply.assert_called_with( + None, + {'request_id': '1234'}, + None, + None, + None, + None + ) + + +def test_apply_async_injects_id_into_kwargs_from_request(mocker, notify_api, celery_task): + super_apply = mocker.patch('celery.app.task.Task.apply_async') + request_id_header = notify_api.config['NOTIFY_TRACE_ID_HEADER'] + request_headers = {request_id_header: '1234'} + + with notify_api.test_request_context(headers=request_headers): + celery_task.apply_async() + + super_apply.assert_called_with( + None, + {'request_id': '1234'}, + None, + None, + None, + None + )