From 58a89a3a802d5dedc897d9b4b3ff62b9d8893bed Mon Sep 17 00:00:00 2001 From: Martyn Inglis Date: Tue, 30 Aug 2016 10:37:06 +0100 Subject: [PATCH 1/4] Only init the statsd client if enabled - allows us to switch it off if not internet access --- app/clients/statsd/statsd_client.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/app/clients/statsd/statsd_client.py b/app/clients/statsd/statsd_client.py index 6b127da93..19e89557e 100644 --- a/app/clients/statsd/statsd_client.py +++ b/app/clients/statsd/statsd_client.py @@ -3,15 +3,17 @@ from statsd import StatsClient class StatsdClient(StatsClient): def init_app(self, app, *args, **kwargs): - StatsClient.__init__( - self, - app.config.get('STATSD_HOST'), - app.config.get('STATSD_PORT'), - prefix=app.config.get('STATSD_PREFIX') - ) self.active = app.config.get('STATSD_ENABLED') self.namespace = app.config.get('NOTIFY_ENVIRONMENT') + ".notifications.api." + if self.active: + StatsClient.__init__( + self, + app.config.get('STATSD_HOST'), + app.config.get('STATSD_PORT'), + prefix=app.config.get('STATSD_PREFIX') + ) + def format_stat_name(self, stat): return self.namespace + stat From 486697d07cec545f418e505fc24af079106fb0ed Mon Sep 17 00:00:00 2001 From: Martyn Inglis Date: Tue, 30 Aug 2016 10:42:24 +0100 Subject: [PATCH 2/4] New queues for the sms/email tasks Previously there were 4 queues for sending messages The was based on the fact that each notification has 2 actions - persist in the database and send to provider. Two queues supported the CSV upload - for the first of these tasks - bulk-email - build-sms And there were two more queues for the tasks that make the 3rd party client calls. - sms - email API Calls just used the latter two queues for both tasks Added four new queues - db-email - db-sms - send-sms - send-email So an API call puts a notification into the db-[type] queue first, which then puts the notification into the send-[type] queue Build queues stay as before. This will allow us to target processing of these tasks with separate workers to manage these differently. --- app/celery/tasks.py | 4 ++-- app/notifications/rest.py | 4 ++-- config.py | 4 ++++ tests/app/celery/test_tasks.py | 14 +++++++------- .../notifications/rest/test_send_notification.py | 10 +++++----- 5 files changed, 20 insertions(+), 16 deletions(-) diff --git a/app/celery/tasks.py b/app/celery/tasks.py index 9dc332469..ac6f037fe 100644 --- a/app/celery/tasks.py +++ b/app/celery/tasks.py @@ -147,7 +147,7 @@ def send_sms(self, try: _save_notification(created_at, notification, notification_id, service_id, SMS_TYPE, api_key_id, key_type) - send_sms_to_provider.apply_async((service_id, notification_id), queue='sms') + send_sms_to_provider.apply_async((service_id, notification_id), queue='send-sms') current_app.logger.info( "SMS {} created at {}".format(notification_id, created_at) @@ -182,7 +182,7 @@ def send_email(self, service_id, try: _save_notification(created_at, notification, notification_id, service_id, EMAIL_TYPE, api_key_id, key_type) - send_email_to_provider.apply_async((service_id, notification_id), queue='email') + send_email_to_provider.apply_async((service_id, notification_id), queue='send-email') current_app.logger.info("Email {} created at {}".format(notification_id, created_at)) except SQLAlchemyError as e: diff --git a/app/notifications/rest.py b/app/notifications/rest.py index 8e4af63b2..2d286c6ef 100644 --- a/app/notifications/rest.py +++ b/app/notifications/rest.py @@ -299,7 +299,7 @@ def send_notification(notification_type): 'api_key_id': str(api_user.id), 'key_type': api_user.key_type }, - queue='sms' + queue='db-sms' ) else: send_email.apply_async( @@ -313,7 +313,7 @@ def send_notification(notification_type): 'api_key_id': str(api_user.id), 'key_type': api_user.key_type }, - queue='email' + queue='db-email' ) return jsonify( diff --git a/config.py b/config.py index d8067dc53..99b51a2dd 100644 --- a/config.py +++ b/config.py @@ -79,7 +79,11 @@ class Config(object): CELERY_QUEUES = [ Queue('periodic', Exchange('default'), routing_key='periodic'), Queue('sms', Exchange('default'), routing_key='sms'), + Queue('db-sms', Exchange('default'), routing_key='sms'), + Queue('send-sms', Exchange('default'), routing_key='sms'), Queue('email', Exchange('default'), routing_key='email'), + Queue('db-email', Exchange('default'), routing_key='email'), + Queue('send-email', Exchange('default'), routing_key='email'), Queue('sms-code', Exchange('default'), routing_key='sms-code'), Queue('email-code', Exchange('default'), routing_key='email-code'), Queue('email-reset-password', Exchange('default'), routing_key='email-reset-password'), diff --git a/tests/app/celery/test_tasks.py b/tests/app/celery/test_tasks.py index 547ee769e..d1c4ba18e 100644 --- a/tests/app/celery/test_tasks.py +++ b/tests/app/celery/test_tasks.py @@ -295,7 +295,7 @@ def test_should_send_template_to_correct_sms_task_and_persist(sample_template_wi provider_tasks.send_sms_to_provider.apply_async.assert_called_once_with( (sample_template_with_placeholders.service_id, notification_id), - queue="sms" + queue="send-sms" ) persisted_notification = Notification.query.filter_by(id=notification_id).one() @@ -333,7 +333,7 @@ def test_should_send_sms_if_restricted_service_and_valid_number(notify_db, notif provider_tasks.send_sms_to_provider.apply_async.assert_called_once_with( (service.id, notification_id), - queue="sms" + queue="send-sms" ) persisted_notification = Notification.query.filter_by(id=notification_id).one() @@ -410,7 +410,7 @@ def test_should_send_sms_template_to_and_persist_with_job_id(sample_job, sample_ provider_tasks.send_sms_to_provider.apply_async.assert_called_once_with( (sample_job.service.id, notification_id), - queue="sms" + queue="send-sms" ) persisted_notification = Notification.query.filter_by(id=notification_id).one() assert persisted_notification.id == notification_id @@ -452,7 +452,7 @@ def test_should_use_email_template_and_persist(sample_email_template_with_placeh persisted_notification = Notification.query.filter_by(id=notification_id).one() provider_tasks.send_email_to_provider.apply_async.assert_called_once_with( - (sample_email_template_with_placeholders.service_id, notification_id), queue='email') + (sample_email_template_with_placeholders.service_id, notification_id), queue='send-email') assert persisted_notification.id == notification_id assert persisted_notification.to == 'my_email@my_email.com' @@ -490,7 +490,7 @@ def test_send_email_should_use_template_version_from_job_not_latest(sample_email ) provider_tasks.send_email_to_provider.apply_async.assert_called_once_with((sample_email_template.service_id, - notification_id), queue='email') + notification_id), queue='send-email') persisted_notification = Notification.query.filter_by(id=notification_id).one() assert persisted_notification.id == notification_id @@ -518,7 +518,7 @@ def test_should_use_email_template_subject_placeholders(sample_email_template_wi now.strftime(DATETIME_FORMAT) ) provider_tasks.send_email_to_provider.apply_async.assert_called_once_with( - (sample_email_template_with_placeholders.service_id, notification_id, ), queue='email' + (sample_email_template_with_placeholders.service_id, notification_id, ), queue='send-email' ) persisted_notification = Notification.query.filter_by(id=notification_id).one() assert persisted_notification.id == notification_id @@ -545,7 +545,7 @@ def test_should_use_email_template_and_persist_without_personalisation(sample_em now.strftime(DATETIME_FORMAT) ) provider_tasks.send_email_to_provider.apply_async.assert_called_once_with((sample_email_template.service_id, - notification_id), queue='email') + notification_id), queue='send-email') persisted_notification = Notification.query.filter_by(id=notification_id).one() assert persisted_notification.id == notification_id diff --git a/tests/app/notifications/rest/test_send_notification.py b/tests/app/notifications/rest/test_send_notification.py index 47626ee85..a06a286b5 100644 --- a/tests/app/notifications/rest/test_send_notification.py +++ b/tests/app/notifications/rest/test_send_notification.py @@ -121,7 +121,7 @@ def test_send_notification_with_placeholders_replaced(notify_api, sample_email_t ANY, "2016-01-01T11:09:00.061258"), kwargs=ANY, - queue="email" + queue="db-email" ) assert response.status_code == 201 assert encryption.decrypt(app.celery.tasks.send_email.apply_async.call_args[0][0][2]) == data @@ -389,7 +389,7 @@ def test_should_allow_valid_sms_notification(notify_api, sample_template, mocker "something_encrypted", "2016-01-01T11:09:00.061258"), kwargs=ANY, - queue="sms" + queue="db-sms" ) assert response.status_code == 201 assert notification_id @@ -558,7 +558,7 @@ def test_should_allow_valid_email_notification(notify_api, sample_email_template "something_encrypted", "2016-01-01T11:09:00.061258"), kwargs=ANY, - queue="email" + queue="db-email" ) assert response.status_code == 201 @@ -760,7 +760,7 @@ def test_should_send_email_if_team_api_key_and_a_service_user(notify_api, sample 'api_key_id': str(api_key.id), 'key_type': api_key.key_type }, - queue='email') + queue='db-email') assert response.status_code == 201 @@ -790,5 +790,5 @@ def test_should_send_sms_if_team_api_key_and_a_service_user(notify_api, sample_t 'api_key_id': str(api_key.id), 'key_type': api_key.key_type }, - queue='sms') + queue='db-sms') assert response.status_code == 201 From 354efbdd2cffb89d365112dfd2566feb634445b5 Mon Sep 17 00:00:00 2001 From: Martyn Inglis Date: Tue, 30 Aug 2016 14:25:09 +0100 Subject: [PATCH 3/4] Ensure routing keys correct --- config.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/config.py b/config.py index 99b51a2dd..f16d05dbf 100644 --- a/config.py +++ b/config.py @@ -79,11 +79,11 @@ class Config(object): CELERY_QUEUES = [ Queue('periodic', Exchange('default'), routing_key='periodic'), Queue('sms', Exchange('default'), routing_key='sms'), - Queue('db-sms', Exchange('default'), routing_key='sms'), - Queue('send-sms', Exchange('default'), routing_key='sms'), + Queue('db-sms', Exchange('default'), routing_key='db-sms'), + Queue('send-sms', Exchange('default'), routing_key='send-sms'), Queue('email', Exchange('default'), routing_key='email'), - Queue('db-email', Exchange('default'), routing_key='email'), - Queue('send-email', Exchange('default'), routing_key='email'), + Queue('db-email', Exchange('default'), routing_key='db-email'), + Queue('send-email', Exchange('default'), routing_key='send-email'), Queue('sms-code', Exchange('default'), routing_key='sms-code'), Queue('email-code', Exchange('default'), routing_key='email-code'), Queue('email-reset-password', Exchange('default'), routing_key='email-reset-password'), From eff7c044a28768b5ba4ca64220f6ee3d6120e705 Mon Sep 17 00:00:00 2001 From: Martyn Inglis Date: Tue, 30 Aug 2016 14:35:18 +0100 Subject: [PATCH 4/4] Scheduled jobs every minute --- config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config.py b/config.py index 14c422400..6541b8f65 100644 --- a/config.py +++ b/config.py @@ -52,7 +52,7 @@ class Config(object): CELERYBEAT_SCHEDULE = { 'run-scheduled-jobs': { 'task': 'run-scheduled-jobs', - 'schedule': crontab(), + 'schedule': crontab(minute=1), 'options': {'queue': 'periodic'} }, 'delete-verify-codes': {