New queues for the sms/email tasks

Previously there were 4 queues for sending messages

The was based on the fact that each notification has 2 actions - persist in the database and send to provider.

Two queues supported the CSV upload - for the first of these tasks
- bulk-email
- build-sms

And there were two more queues for the tasks that make the 3rd party client calls.
- sms
- email

API Calls just used the latter two queues for both tasks

Added four new queues
- db-email
- db-sms
- send-sms
- send-email

So an API call puts a notification into the db-[type] queue first, which then puts the notification into the send-[type] queue

Build queues stay as before.

This will allow us to target processing of these tasks with separate workers to manage these differently.
This commit is contained in:
Martyn Inglis
2016-08-30 10:42:24 +01:00
parent 58a89a3a80
commit 486697d07c
5 changed files with 20 additions and 16 deletions

View File

@@ -147,7 +147,7 @@ def send_sms(self,
try:
_save_notification(created_at, notification, notification_id, service_id, SMS_TYPE, api_key_id, key_type)
send_sms_to_provider.apply_async((service_id, notification_id), queue='sms')
send_sms_to_provider.apply_async((service_id, notification_id), queue='send-sms')
current_app.logger.info(
"SMS {} created at {}".format(notification_id, created_at)
@@ -182,7 +182,7 @@ def send_email(self, service_id,
try:
_save_notification(created_at, notification, notification_id, service_id, EMAIL_TYPE, api_key_id, key_type)
send_email_to_provider.apply_async((service_id, notification_id), queue='email')
send_email_to_provider.apply_async((service_id, notification_id), queue='send-email')
current_app.logger.info("Email {} created at {}".format(notification_id, created_at))
except SQLAlchemyError as e:

View File

@@ -299,7 +299,7 @@ def send_notification(notification_type):
'api_key_id': str(api_user.id),
'key_type': api_user.key_type
},
queue='sms'
queue='db-sms'
)
else:
send_email.apply_async(
@@ -313,7 +313,7 @@ def send_notification(notification_type):
'api_key_id': str(api_user.id),
'key_type': api_user.key_type
},
queue='email'
queue='db-email'
)
return jsonify(

View File

@@ -79,7 +79,11 @@ class Config(object):
CELERY_QUEUES = [
Queue('periodic', Exchange('default'), routing_key='periodic'),
Queue('sms', Exchange('default'), routing_key='sms'),
Queue('db-sms', Exchange('default'), routing_key='sms'),
Queue('send-sms', Exchange('default'), routing_key='sms'),
Queue('email', Exchange('default'), routing_key='email'),
Queue('db-email', Exchange('default'), routing_key='email'),
Queue('send-email', Exchange('default'), routing_key='email'),
Queue('sms-code', Exchange('default'), routing_key='sms-code'),
Queue('email-code', Exchange('default'), routing_key='email-code'),
Queue('email-reset-password', Exchange('default'), routing_key='email-reset-password'),

View File

@@ -295,7 +295,7 @@ def test_should_send_template_to_correct_sms_task_and_persist(sample_template_wi
provider_tasks.send_sms_to_provider.apply_async.assert_called_once_with(
(sample_template_with_placeholders.service_id,
notification_id),
queue="sms"
queue="send-sms"
)
persisted_notification = Notification.query.filter_by(id=notification_id).one()
@@ -333,7 +333,7 @@ def test_should_send_sms_if_restricted_service_and_valid_number(notify_db, notif
provider_tasks.send_sms_to_provider.apply_async.assert_called_once_with(
(service.id,
notification_id),
queue="sms"
queue="send-sms"
)
persisted_notification = Notification.query.filter_by(id=notification_id).one()
@@ -410,7 +410,7 @@ def test_should_send_sms_template_to_and_persist_with_job_id(sample_job, sample_
provider_tasks.send_sms_to_provider.apply_async.assert_called_once_with(
(sample_job.service.id,
notification_id),
queue="sms"
queue="send-sms"
)
persisted_notification = Notification.query.filter_by(id=notification_id).one()
assert persisted_notification.id == notification_id
@@ -452,7 +452,7 @@ def test_should_use_email_template_and_persist(sample_email_template_with_placeh
persisted_notification = Notification.query.filter_by(id=notification_id).one()
provider_tasks.send_email_to_provider.apply_async.assert_called_once_with(
(sample_email_template_with_placeholders.service_id, notification_id), queue='email')
(sample_email_template_with_placeholders.service_id, notification_id), queue='send-email')
assert persisted_notification.id == notification_id
assert persisted_notification.to == 'my_email@my_email.com'
@@ -490,7 +490,7 @@ def test_send_email_should_use_template_version_from_job_not_latest(sample_email
)
provider_tasks.send_email_to_provider.apply_async.assert_called_once_with((sample_email_template.service_id,
notification_id), queue='email')
notification_id), queue='send-email')
persisted_notification = Notification.query.filter_by(id=notification_id).one()
assert persisted_notification.id == notification_id
@@ -518,7 +518,7 @@ def test_should_use_email_template_subject_placeholders(sample_email_template_wi
now.strftime(DATETIME_FORMAT)
)
provider_tasks.send_email_to_provider.apply_async.assert_called_once_with(
(sample_email_template_with_placeholders.service_id, notification_id, ), queue='email'
(sample_email_template_with_placeholders.service_id, notification_id, ), queue='send-email'
)
persisted_notification = Notification.query.filter_by(id=notification_id).one()
assert persisted_notification.id == notification_id
@@ -545,7 +545,7 @@ def test_should_use_email_template_and_persist_without_personalisation(sample_em
now.strftime(DATETIME_FORMAT)
)
provider_tasks.send_email_to_provider.apply_async.assert_called_once_with((sample_email_template.service_id,
notification_id), queue='email')
notification_id), queue='send-email')
persisted_notification = Notification.query.filter_by(id=notification_id).one()
assert persisted_notification.id == notification_id

View File

@@ -121,7 +121,7 @@ def test_send_notification_with_placeholders_replaced(notify_api, sample_email_t
ANY,
"2016-01-01T11:09:00.061258"),
kwargs=ANY,
queue="email"
queue="db-email"
)
assert response.status_code == 201
assert encryption.decrypt(app.celery.tasks.send_email.apply_async.call_args[0][0][2]) == data
@@ -389,7 +389,7 @@ def test_should_allow_valid_sms_notification(notify_api, sample_template, mocker
"something_encrypted",
"2016-01-01T11:09:00.061258"),
kwargs=ANY,
queue="sms"
queue="db-sms"
)
assert response.status_code == 201
assert notification_id
@@ -558,7 +558,7 @@ def test_should_allow_valid_email_notification(notify_api, sample_email_template
"something_encrypted",
"2016-01-01T11:09:00.061258"),
kwargs=ANY,
queue="email"
queue="db-email"
)
assert response.status_code == 201
@@ -760,7 +760,7 @@ def test_should_send_email_if_team_api_key_and_a_service_user(notify_api, sample
'api_key_id': str(api_key.id),
'key_type': api_key.key_type
},
queue='email')
queue='db-email')
assert response.status_code == 201
@@ -790,5 +790,5 @@ def test_should_send_sms_if_team_api_key_and_a_service_user(notify_api, sample_t
'api_key_id': str(api_key.id),
'key_type': api_key.key_type
},
queue='sms')
queue='db-sms')
assert response.status_code == 201