mirror of
https://github.com/GSA/notifications-api.git
synced 2025-12-23 17:01:35 -05:00
Celery's apply_async function accepts 'kwargs' as (get ready to be
confused) either a positional argument, or a keyword argument:
Positional: apply_async(['args'], {'kw': 'args'})
Keyword: apply_async(args=['args'], kwargs={'kw': 'args'})
We rely on the positional form in at least one place [1]. This fixes
the overload of apply_async to cope with both forms, and continue to
pass through any other (confusion time again) keyword args to super(),
such as queue="queue".
Note that we've also decided to stop accepting other positional args,
since this is unnecessarily confusing, and we don't currently rely on
it in our code. This stops it creeping in in future.
[1]: fde927e00e/app/job/rest.py (L186)
118 lines
4.2 KiB
Python
118 lines
4.2 KiB
Python
import uuid
|
|
|
|
import pytest
|
|
from flask import g
|
|
from freezegun import freeze_time
|
|
|
|
from app import notify_celery
|
|
|
|
|
|
# requiring notify_api ensures notify_celery.init_app has been called
|
|
@pytest.fixture(scope='session')
|
|
def celery_task(notify_api):
|
|
@notify_celery.task(name=uuid.uuid4(), base=notify_celery.task_cls)
|
|
def test_task(delivery_info=None): pass
|
|
return test_task
|
|
|
|
|
|
@pytest.fixture
|
|
def async_task(celery_task):
|
|
celery_task.push_request(delivery_info={'routing_key': 'test-queue'})
|
|
yield celery_task
|
|
celery_task.pop_request()
|
|
|
|
|
|
def test_success_should_log_and_call_statsd(mocker, notify_api, async_task):
|
|
statsd = mocker.patch.object(notify_api.statsd_client, 'timing')
|
|
logger = mocker.patch.object(notify_api.logger, 'info')
|
|
|
|
with freeze_time() as frozen:
|
|
async_task()
|
|
frozen.tick(5)
|
|
|
|
async_task.on_success(
|
|
retval=None, task_id=1234, args=[], kwargs={}
|
|
)
|
|
|
|
statsd.assert_called_once_with(f'celery.test-queue.{async_task.name}.success', 5.0)
|
|
logger.assert_called_once_with(f'Celery task {async_task.name} (queue: test-queue) took 5.0000')
|
|
|
|
|
|
def test_success_queue_when_applied_synchronously(mocker, notify_api, celery_task):
|
|
statsd = mocker.patch.object(notify_api.statsd_client, 'timing')
|
|
logger = mocker.patch.object(notify_api.logger, 'info')
|
|
|
|
with freeze_time() as frozen:
|
|
celery_task()
|
|
frozen.tick(5)
|
|
|
|
celery_task.on_success(
|
|
retval=None, task_id=1234, args=[], kwargs={}
|
|
)
|
|
|
|
statsd.assert_called_once_with(f'celery.none.{celery_task.name}.success', 5.0)
|
|
logger.assert_called_once_with(f'Celery task {celery_task.name} (queue: none) took 5.0000')
|
|
|
|
|
|
def test_failure_should_log_and_call_statsd(mocker, notify_api, async_task):
|
|
statsd = mocker.patch.object(notify_api.statsd_client, 'incr')
|
|
logger = mocker.patch.object(notify_api.logger, 'exception')
|
|
|
|
async_task.on_failure(
|
|
exc=Exception, task_id=1234, args=[], kwargs={}, einfo=None
|
|
)
|
|
|
|
statsd.assert_called_once_with(f'celery.test-queue.{async_task.name}.failure')
|
|
logger.assert_called_once_with(f'Celery task {async_task.name} (queue: test-queue) failed')
|
|
|
|
|
|
def test_failure_queue_when_applied_synchronously(mocker, notify_api, celery_task):
|
|
statsd = mocker.patch.object(notify_api.statsd_client, 'incr')
|
|
logger = mocker.patch.object(notify_api.logger, 'exception')
|
|
|
|
celery_task.on_failure(
|
|
exc=Exception, task_id=1234, args=[], kwargs={}, einfo=None
|
|
)
|
|
|
|
statsd.assert_called_once_with(f'celery.none.{celery_task.name}.failure')
|
|
logger.assert_called_once_with(f'Celery task {celery_task.name} (queue: none) failed')
|
|
|
|
|
|
def test_call_exports_request_id_from_kwargs(mocker, celery_task):
|
|
g = mocker.patch('app.celery.celery.g')
|
|
# this would fail if the kwarg was passed through unexpectedly
|
|
celery_task(request_id='1234')
|
|
assert g.request_id == '1234'
|
|
|
|
|
|
def test_apply_async_injects_global_request_id_into_kwargs(mocker, celery_task):
|
|
super_apply = mocker.patch('celery.app.task.Task.apply_async')
|
|
g.request_id = '1234'
|
|
celery_task.apply_async()
|
|
super_apply.assert_called_with(None, {'request_id': '1234'})
|
|
|
|
|
|
def test_apply_async_inject_request_id_with_other_kwargs(mocker, celery_task):
|
|
super_apply = mocker.patch('celery.app.task.Task.apply_async')
|
|
g.request_id = '1234'
|
|
celery_task.apply_async(kwargs={'something': 'else'})
|
|
super_apply.assert_called_with(None, {'request_id': '1234', 'something': 'else'})
|
|
|
|
|
|
def test_apply_async_inject_request_id_with_positional_args(mocker, celery_task):
|
|
super_apply = mocker.patch('celery.app.task.Task.apply_async')
|
|
g.request_id = '1234'
|
|
celery_task.apply_async(['args'], {'something': 'else'})
|
|
super_apply.assert_called_with(['args'], {'request_id': '1234', 'something': 'else'})
|
|
|
|
|
|
def test_apply_async_injects_id_into_kwargs_from_request(mocker, notify_api, celery_task):
|
|
super_apply = mocker.patch('celery.app.task.Task.apply_async')
|
|
request_id_header = notify_api.config['NOTIFY_TRACE_ID_HEADER']
|
|
request_headers = {request_id_header: '1234'}
|
|
|
|
with notify_api.test_request_context(headers=request_headers):
|
|
celery_task.apply_async()
|
|
|
|
super_apply.assert_called_with(None, {'request_id': '1234'})
|