mirror of
https://github.com/GSA/notifications-api.git
synced 2026-02-02 09:26:08 -05:00
Merge pull request #3205 from alphagov/celery-consistency-tweaks
Small refactor for new Celery / StatsD code
This commit is contained in:
@@ -110,7 +110,7 @@ def create_app(application):
|
|||||||
email_clients = [aws_ses_stub_client] if application.config['SES_STUB_URL'] else [aws_ses_client]
|
email_clients = [aws_ses_stub_client] if application.config['SES_STUB_URL'] else [aws_ses_client]
|
||||||
notification_provider_clients.init_app(sms_clients=[firetext_client, mmg_client], email_clients=email_clients)
|
notification_provider_clients.init_app(sms_clients=[firetext_client, mmg_client], email_clients=email_clients)
|
||||||
|
|
||||||
notify_celery.init_app(application, statsd_client)
|
notify_celery.init_app(application)
|
||||||
encryption.init_app(application)
|
encryption.init_app(application)
|
||||||
redis_store.init_app(application)
|
redis_store.init_app(application)
|
||||||
document_download_client.init_app(application)
|
document_download_client.init_app(application)
|
||||||
|
|||||||
@@ -18,7 +18,7 @@ def log_on_worker_shutdown(sender, signal, pid, exitcode, **kwargs):
|
|||||||
notify_celery._app.logger.info('worker shutdown: PID: {} Exitcode: {}'.format(pid, exitcode))
|
notify_celery._app.logger.info('worker shutdown: PID: {} Exitcode: {}'.format(pid, exitcode))
|
||||||
|
|
||||||
|
|
||||||
def make_task(app, statsd_client):
|
def make_task(app):
|
||||||
class NotifyTask(Task):
|
class NotifyTask(Task):
|
||||||
abstract = True
|
abstract = True
|
||||||
start = None
|
start = None
|
||||||
@@ -36,7 +36,7 @@ def make_task(app, statsd_client):
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
statsd_client.timing(
|
app.statsd_client.timing(
|
||||||
"celery.{queue_name}.{task_name}.success".format(
|
"celery.{queue_name}.{task_name}.success".format(
|
||||||
task_name=self.name,
|
task_name=self.name,
|
||||||
queue_name=queue_name
|
queue_name=queue_name
|
||||||
@@ -54,7 +54,7 @@ def make_task(app, statsd_client):
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
statsd_client.incr(
|
app.statsd_client.incr(
|
||||||
"celery.{queue_name}.{task_name}.failure".format(
|
"celery.{queue_name}.{task_name}.failure".format(
|
||||||
task_name=self.name,
|
task_name=self.name,
|
||||||
queue_name=queue_name
|
queue_name=queue_name
|
||||||
@@ -73,27 +73,26 @@ def make_task(app, statsd_client):
|
|||||||
|
|
||||||
return super().__call__(*args, **kwargs)
|
return super().__call__(*args, **kwargs)
|
||||||
|
|
||||||
def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
|
def apply_async(self, *args, **kwargs):
|
||||||
link=None, link_error=None, **options):
|
kwargs['kwargs'] = kwargs.get('kwargs', {})
|
||||||
kwargs = kwargs or {}
|
|
||||||
|
|
||||||
if has_request_context() and hasattr(request, 'request_id'):
|
if has_request_context() and hasattr(request, 'request_id'):
|
||||||
kwargs['request_id'] = request.request_id
|
kwargs['kwargs']['request_id'] = request.request_id
|
||||||
elif has_app_context() and 'request_id' in g:
|
elif has_app_context() and 'request_id' in g:
|
||||||
kwargs['request_id'] = g.request_id
|
kwargs['kwargs']['request_id'] = g.request_id
|
||||||
|
|
||||||
return super().apply_async(args, kwargs, task_id, producer, link, link_error, **options)
|
return super().apply_async(*args, **kwargs)
|
||||||
|
|
||||||
return NotifyTask
|
return NotifyTask
|
||||||
|
|
||||||
|
|
||||||
class NotifyCelery(Celery):
|
class NotifyCelery(Celery):
|
||||||
|
|
||||||
def init_app(self, app, statsd_client):
|
def init_app(self, app):
|
||||||
super().__init__(
|
super().__init__(
|
||||||
app.import_name,
|
app.import_name,
|
||||||
broker=app.config['BROKER_URL'],
|
broker=app.config['BROKER_URL'],
|
||||||
task_cls=make_task(app, statsd_client),
|
task_cls=make_task(app),
|
||||||
)
|
)
|
||||||
|
|
||||||
self.conf.update(app.config)
|
self.conf.update(app.config)
|
||||||
|
|||||||
@@ -89,15 +89,7 @@ def test_apply_async_injects_global_request_id_into_kwargs(mocker, celery_task):
|
|||||||
super_apply = mocker.patch('celery.app.task.Task.apply_async')
|
super_apply = mocker.patch('celery.app.task.Task.apply_async')
|
||||||
g.request_id = '1234'
|
g.request_id = '1234'
|
||||||
celery_task.apply_async()
|
celery_task.apply_async()
|
||||||
|
super_apply.assert_called_with(kwargs={'request_id': '1234'})
|
||||||
super_apply.assert_called_with(
|
|
||||||
None,
|
|
||||||
{'request_id': '1234'},
|
|
||||||
None,
|
|
||||||
None,
|
|
||||||
None,
|
|
||||||
None
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def test_apply_async_injects_id_into_kwargs_from_request(mocker, notify_api, celery_task):
|
def test_apply_async_injects_id_into_kwargs_from_request(mocker, notify_api, celery_task):
|
||||||
@@ -108,11 +100,4 @@ def test_apply_async_injects_id_into_kwargs_from_request(mocker, notify_api, cel
|
|||||||
with notify_api.test_request_context(headers=request_headers):
|
with notify_api.test_request_context(headers=request_headers):
|
||||||
celery_task.apply_async()
|
celery_task.apply_async()
|
||||||
|
|
||||||
super_apply.assert_called_with(
|
super_apply.assert_called_with(kwargs={'request_id': '1234'})
|
||||||
None,
|
|
||||||
{'request_id': '1234'},
|
|
||||||
None,
|
|
||||||
None,
|
|
||||||
None,
|
|
||||||
None
|
|
||||||
)
|
|
||||||
|
|||||||
Reference in New Issue
Block a user