if message too big to put on high volume queue then save to queue

SQS fails if the message body is over 256kb. Normally our messages are
quite small, but if we're using the new save-api-email task with an
email that has a large body, we can get over that limit. If so, handle
the exception and fall back to the existing code path (saving to the
database and sending a deliver-email task, which only has a notification
id.
This commit is contained in:
Leo Hemsted
2020-03-28 09:39:12 +00:00
parent c81388d004
commit 0b3d711652
2 changed files with 58 additions and 12 deletions

View File

@@ -3,6 +3,7 @@ import functools
import uuid import uuid
from datetime import datetime from datetime import datetime
from boto.exception import SQSError
from flask import request, jsonify, current_app, abort from flask import request, jsonify, current_app, abort
from notifications_utils.recipients import try_validate_and_format_phone_number from notifications_utils.recipients import try_validate_and_format_phone_number
@@ -187,6 +188,7 @@ def post_notification(notification_type):
def process_sms_or_email_notification(*, form, notification_type, api_key, template, service, reply_to_text=None): def process_sms_or_email_notification(*, form, notification_type, api_key, template, service, reply_to_text=None):
notification_id = None
form_send_to = form['email_address'] if notification_type == EMAIL_TYPE else form['phone_number'] form_send_to = form['email_address'] if notification_type == EMAIL_TYPE else form['phone_number']
send_to = validate_and_format_recipient(send_to=form_send_to, send_to = validate_and_format_recipient(send_to=form_send_to,
@@ -209,19 +211,29 @@ def process_sms_or_email_notification(*, form, notification_type, api_key, templ
# To take the pressure off the db for API requests put the notification for our high volume service onto a queue # To take the pressure off the db for API requests put the notification for our high volume service onto a queue
# the task will then save the notification, then call send_notification_to_queue. # the task will then save the notification, then call send_notification_to_queue.
# We know that this team does not use the GET request, but relies on callbacks to get the status updates. # We know that this team does not use the GET request, but relies on callbacks to get the status updates.
notification = save_email_to_queue( try:
form=form, notification_id = uuid.uuid4()
notification_type=notification_type, notification = save_email_to_queue(
api_key=api_key, form=form,
template=template, notification_id=str(notification_id),
service_id=service.id, notification_type=notification_type,
personalisation=personalisation, api_key=api_key,
document_download_count=document_download_count, template=template,
reply_to_text=reply_to_text service_id=service.id,
) personalisation=personalisation,
return notification document_download_count=document_download_count,
reply_to_text=reply_to_text
)
return notification
except SQSError:
# if SQS cannot put the task on the queue, it's probably because the notification body was too long and it
# went over SQS's 256kb message limit. If so, we
current_app.logger.info(
f'Notification {notification_id} failed to save to high volume queue. Using normal flow instead'
)
notification = persist_notification( notification = persist_notification(
notification_id=notification_id,
template_id=template.id, template_id=template.id,
template_version=template.version, template_version=template.version,
recipient=form_send_to, recipient=form_send_to,
@@ -255,6 +267,7 @@ def process_sms_or_email_notification(*, form, notification_type, api_key, templ
def save_email_to_queue( def save_email_to_queue(
*, *,
notification_id,
form, form,
notification_type, notification_type,
api_key, api_key,
@@ -265,7 +278,7 @@ def save_email_to_queue(
reply_to_text=None reply_to_text=None
): ):
data = { data = {
"id": str(uuid.uuid4()), "id": notification_id,
"template_id": str(template.id), "template_id": str(template.id),
"template_version": template.version, "template_version": template.version,
"to": form['email_address'], "to": form['email_address'],

View File

@@ -4,6 +4,7 @@ from unittest.mock import call
import pytest import pytest
from freezegun import freeze_time from freezegun import freeze_time
from boto.exception import SQSError
from app.dao.service_sms_sender_dao import dao_update_service_sms_sender from app.dao.service_sms_sender_dao import dao_update_service_sms_sender
from app.models import ( from app.models import (
@@ -981,6 +982,38 @@ def test_post_notifications_saves_email_to_queue(client, notify_db_session, mock
assert len(Notification.query.all()) == 0 assert len(Notification.query.all()) == 0
def test_post_notifications_saves_email_normally_if_save_email_to_queue_fails(client, notify_db_session, mocker):
save_email_task = mocker.patch(
"app.celery.tasks.save_api_email.apply_async",
side_effect=SQSError({'some': 'json'}, 'some opname')
)
mock_send_task = mocker.patch('app.celery.provider_tasks.deliver_email.apply_async')
service = create_service(service_id='941b6f9a-50d7-4742-8d50-f365ca74bf27', service_name='high volume service')
template = create_template(service=service, content='((message))', template_type=EMAIL_TYPE)
data = {
"email_address": "joe.citizen@example.com",
"template_id": template.id,
"personalisation": {"message": "Dear citizen, have a nice day"}
}
response = client.post(
path='/v2/notifications/email',
data=json.dumps(data),
headers=[('Content-Type', 'application/json'), create_authorization_header(service_id=service.id)]
)
json_resp = response.get_json()
assert response.status_code == 201
assert json_resp['id']
assert json_resp['content']['body'] == "Dear citizen, have a nice day"
assert json_resp['template']['id'] == str(template.id)
# save email
save_email_task.assert_called_once_with([mock.ANY], queue='save-api-email-tasks')
mock_send_task.assert_called_once_with([json_resp['id']], queue='send-email-tasks')
assert Notification.query.count() == 1
def test_post_notifications_doesnt_save_email_to_queue_for_test_emails(client, notify_db_session, mocker): def test_post_notifications_doesnt_save_email_to_queue_for_test_emails(client, notify_db_session, mocker):
save_email_task = mocker.patch("app.celery.tasks.save_api_email.apply_async") save_email_task = mocker.patch("app.celery.tasks.save_api_email.apply_async")
mock_send_task = mocker.patch('app.celery.provider_tasks.deliver_email.apply_async') mock_send_task = mocker.patch('app.celery.provider_tasks.deliver_email.apply_async')