From 486697d07cec545f418e505fc24af079106fb0ed Mon Sep 17 00:00:00 2001 From: Martyn Inglis Date: Tue, 30 Aug 2016 10:42:24 +0100 Subject: [PATCH] New queues for the sms/email tasks Previously there were 4 queues for sending messages The was based on the fact that each notification has 2 actions - persist in the database and send to provider. Two queues supported the CSV upload - for the first of these tasks - bulk-email - build-sms And there were two more queues for the tasks that make the 3rd party client calls. - sms - email API Calls just used the latter two queues for both tasks Added four new queues - db-email - db-sms - send-sms - send-email So an API call puts a notification into the db-[type] queue first, which then puts the notification into the send-[type] queue Build queues stay as before. This will allow us to target processing of these tasks with separate workers to manage these differently. --- app/celery/tasks.py | 4 ++-- app/notifications/rest.py | 4 ++-- config.py | 4 ++++ tests/app/celery/test_tasks.py | 14 +++++++------- .../notifications/rest/test_send_notification.py | 10 +++++----- 5 files changed, 20 insertions(+), 16 deletions(-) diff --git a/app/celery/tasks.py b/app/celery/tasks.py index 9dc332469..ac6f037fe 100644 --- a/app/celery/tasks.py +++ b/app/celery/tasks.py @@ -147,7 +147,7 @@ def send_sms(self, try: _save_notification(created_at, notification, notification_id, service_id, SMS_TYPE, api_key_id, key_type) - send_sms_to_provider.apply_async((service_id, notification_id), queue='sms') + send_sms_to_provider.apply_async((service_id, notification_id), queue='send-sms') current_app.logger.info( "SMS {} created at {}".format(notification_id, created_at) @@ -182,7 +182,7 @@ def send_email(self, service_id, try: _save_notification(created_at, notification, notification_id, service_id, EMAIL_TYPE, api_key_id, key_type) - send_email_to_provider.apply_async((service_id, notification_id), queue='email') + send_email_to_provider.apply_async((service_id, notification_id), queue='send-email') current_app.logger.info("Email {} created at {}".format(notification_id, created_at)) except SQLAlchemyError as e: diff --git a/app/notifications/rest.py b/app/notifications/rest.py index 8e4af63b2..2d286c6ef 100644 --- a/app/notifications/rest.py +++ b/app/notifications/rest.py @@ -299,7 +299,7 @@ def send_notification(notification_type): 'api_key_id': str(api_user.id), 'key_type': api_user.key_type }, - queue='sms' + queue='db-sms' ) else: send_email.apply_async( @@ -313,7 +313,7 @@ def send_notification(notification_type): 'api_key_id': str(api_user.id), 'key_type': api_user.key_type }, - queue='email' + queue='db-email' ) return jsonify( diff --git a/config.py b/config.py index d8067dc53..99b51a2dd 100644 --- a/config.py +++ b/config.py @@ -79,7 +79,11 @@ class Config(object): CELERY_QUEUES = [ Queue('periodic', Exchange('default'), routing_key='periodic'), Queue('sms', Exchange('default'), routing_key='sms'), + Queue('db-sms', Exchange('default'), routing_key='sms'), + Queue('send-sms', Exchange('default'), routing_key='sms'), Queue('email', Exchange('default'), routing_key='email'), + Queue('db-email', Exchange('default'), routing_key='email'), + Queue('send-email', Exchange('default'), routing_key='email'), Queue('sms-code', Exchange('default'), routing_key='sms-code'), Queue('email-code', Exchange('default'), routing_key='email-code'), Queue('email-reset-password', Exchange('default'), routing_key='email-reset-password'), diff --git a/tests/app/celery/test_tasks.py b/tests/app/celery/test_tasks.py index 547ee769e..d1c4ba18e 100644 --- a/tests/app/celery/test_tasks.py +++ b/tests/app/celery/test_tasks.py @@ -295,7 +295,7 @@ def test_should_send_template_to_correct_sms_task_and_persist(sample_template_wi provider_tasks.send_sms_to_provider.apply_async.assert_called_once_with( (sample_template_with_placeholders.service_id, notification_id), - queue="sms" + queue="send-sms" ) persisted_notification = Notification.query.filter_by(id=notification_id).one() @@ -333,7 +333,7 @@ def test_should_send_sms_if_restricted_service_and_valid_number(notify_db, notif provider_tasks.send_sms_to_provider.apply_async.assert_called_once_with( (service.id, notification_id), - queue="sms" + queue="send-sms" ) persisted_notification = Notification.query.filter_by(id=notification_id).one() @@ -410,7 +410,7 @@ def test_should_send_sms_template_to_and_persist_with_job_id(sample_job, sample_ provider_tasks.send_sms_to_provider.apply_async.assert_called_once_with( (sample_job.service.id, notification_id), - queue="sms" + queue="send-sms" ) persisted_notification = Notification.query.filter_by(id=notification_id).one() assert persisted_notification.id == notification_id @@ -452,7 +452,7 @@ def test_should_use_email_template_and_persist(sample_email_template_with_placeh persisted_notification = Notification.query.filter_by(id=notification_id).one() provider_tasks.send_email_to_provider.apply_async.assert_called_once_with( - (sample_email_template_with_placeholders.service_id, notification_id), queue='email') + (sample_email_template_with_placeholders.service_id, notification_id), queue='send-email') assert persisted_notification.id == notification_id assert persisted_notification.to == 'my_email@my_email.com' @@ -490,7 +490,7 @@ def test_send_email_should_use_template_version_from_job_not_latest(sample_email ) provider_tasks.send_email_to_provider.apply_async.assert_called_once_with((sample_email_template.service_id, - notification_id), queue='email') + notification_id), queue='send-email') persisted_notification = Notification.query.filter_by(id=notification_id).one() assert persisted_notification.id == notification_id @@ -518,7 +518,7 @@ def test_should_use_email_template_subject_placeholders(sample_email_template_wi now.strftime(DATETIME_FORMAT) ) provider_tasks.send_email_to_provider.apply_async.assert_called_once_with( - (sample_email_template_with_placeholders.service_id, notification_id, ), queue='email' + (sample_email_template_with_placeholders.service_id, notification_id, ), queue='send-email' ) persisted_notification = Notification.query.filter_by(id=notification_id).one() assert persisted_notification.id == notification_id @@ -545,7 +545,7 @@ def test_should_use_email_template_and_persist_without_personalisation(sample_em now.strftime(DATETIME_FORMAT) ) provider_tasks.send_email_to_provider.apply_async.assert_called_once_with((sample_email_template.service_id, - notification_id), queue='email') + notification_id), queue='send-email') persisted_notification = Notification.query.filter_by(id=notification_id).one() assert persisted_notification.id == notification_id diff --git a/tests/app/notifications/rest/test_send_notification.py b/tests/app/notifications/rest/test_send_notification.py index 47626ee85..a06a286b5 100644 --- a/tests/app/notifications/rest/test_send_notification.py +++ b/tests/app/notifications/rest/test_send_notification.py @@ -121,7 +121,7 @@ def test_send_notification_with_placeholders_replaced(notify_api, sample_email_t ANY, "2016-01-01T11:09:00.061258"), kwargs=ANY, - queue="email" + queue="db-email" ) assert response.status_code == 201 assert encryption.decrypt(app.celery.tasks.send_email.apply_async.call_args[0][0][2]) == data @@ -389,7 +389,7 @@ def test_should_allow_valid_sms_notification(notify_api, sample_template, mocker "something_encrypted", "2016-01-01T11:09:00.061258"), kwargs=ANY, - queue="sms" + queue="db-sms" ) assert response.status_code == 201 assert notification_id @@ -558,7 +558,7 @@ def test_should_allow_valid_email_notification(notify_api, sample_email_template "something_encrypted", "2016-01-01T11:09:00.061258"), kwargs=ANY, - queue="email" + queue="db-email" ) assert response.status_code == 201 @@ -760,7 +760,7 @@ def test_should_send_email_if_team_api_key_and_a_service_user(notify_api, sample 'api_key_id': str(api_key.id), 'key_type': api_key.key_type }, - queue='email') + queue='db-email') assert response.status_code == 201 @@ -790,5 +790,5 @@ def test_should_send_sms_if_team_api_key_and_a_service_user(notify_api, sample_t 'api_key_id': str(api_key.id), 'key_type': api_key.key_type }, - queue='sms') + queue='db-sms') assert response.status_code == 201