Files
notifications-api/tests/app/celery/test_celery.py
Ben Thorner 8954cec5a1 Add tests for celery task superclass
This requires upgrading freezegun, as time.monotonic wasn't frozen
by v1.0. Note that we need to explicitly specify the base class for
the task in the test, the reason for which is quite subtle:

- Normally, by using the 'notify_api' fixture, the base class is set
to NotifyTask automatically by running app.create_app [1].

- However, when run alongside other tests, the imports of files with
other celery tasks cause the base class to be instantiated and cached
as the default Celery one. This means none of our tests actually use
our custom superclass when testing tasks.

Because we can't run 'apply_async' directly (since this would require
an actual Celery broker), we need to manually push/pop the request
Context that's normally done as part of sending a task.

Note also that we use a UUID as the name for a task, since these are
global. We want to avoid the task polluting other tests in future,
as well as make it clear the task is being reused.

[1]: dea5828d0e/app/__init__.py (L113)
2021-04-12 14:50:02 +01:00

77 lines
2.5 KiB
Python

import uuid
import pytest
from freezegun import freeze_time
from app import notify_celery
@pytest.fixture(scope='session')
def celery_task():
@notify_celery.task(name=uuid.uuid4(), base=notify_celery.task_cls)
def test_task(delivery_info=None): pass
return test_task
@pytest.fixture
def async_task(celery_task):
celery_task.push_request(delivery_info={'routing_key': 'test-queue'})
yield celery_task
celery_task.pop_request()
def test_success_should_log_and_call_statsd(mocker, notify_api, async_task):
statsd = mocker.patch.object(notify_api.statsd_client, 'timing')
logger = mocker.patch.object(notify_api.logger, 'info')
with freeze_time() as frozen:
async_task()
frozen.tick(5)
async_task.on_success(
retval=None, task_id=1234, args=[], kwargs={}
)
statsd.assert_called_once_with(f'celery.test-queue.{async_task.name}.success', 5.0)
logger.assert_called_once_with(f'Celery task {async_task.name} (queue: test-queue) took 5.0000')
def test_success_queue_when_applied_synchronously(mocker, notify_api, celery_task):
statsd = mocker.patch.object(notify_api.statsd_client, 'timing')
logger = mocker.patch.object(notify_api.logger, 'info')
with freeze_time() as frozen:
celery_task()
frozen.tick(5)
celery_task.on_success(
retval=None, task_id=1234, args=[], kwargs={}
)
statsd.assert_called_once_with(f'celery.none.{celery_task.name}.success', 5.0)
logger.assert_called_once_with(f'Celery task {celery_task.name} (queue: none) took 5.0000')
def test_failure_should_log_and_call_statsd(mocker, notify_api, async_task):
statsd = mocker.patch.object(notify_api.statsd_client, 'incr')
logger = mocker.patch.object(notify_api.logger, 'exception')
async_task.on_failure(
exc=Exception, task_id=1234, args=[], kwargs={}, einfo=None
)
statsd.assert_called_once_with(f'celery.test-queue.{async_task.name}.failure')
logger.assert_called_once_with(f'Celery task {async_task.name} (queue: test-queue) failed')
def test_failure_queue_when_applied_synchronously(mocker, notify_api, celery_task):
statsd = mocker.patch.object(notify_api.statsd_client, 'incr')
logger = mocker.patch.object(notify_api.logger, 'exception')
celery_task.on_failure(
exc=Exception, task_id=1234, args=[], kwargs={}, einfo=None
)
statsd.assert_called_once_with(f'celery.none.{celery_task.name}.failure')
logger.assert_called_once_with(f'Celery task {celery_task.name} (queue: none) failed')