From 812f4d20dd6af12b7ea684a7bf9d863356c33292 Mon Sep 17 00:00:00 2001 From: Pea Tyczynska Date: Wed, 18 Jul 2018 17:03:16 +0100 Subject: [PATCH] Send complaints on to service callback APIs using an async task --- app/celery/scheduled_tasks.py | 4 +- app/celery/service_callback_tasks.py | 98 ++++++++++++++----- app/dao/service_callback_api_dao.py | 9 +- .../notifications_ses_callback.py | 28 ++++-- app/notifications/process_client_response.py | 4 +- tests/app/celery/test_scheduled_tasks.py | 4 +- .../app/celery/test_service_callback_tasks.py | 76 +++++++++++--- .../test_notifications_ses_callback.py | 31 +++++- .../test_process_client_response.py | 4 +- 9 files changed, 199 insertions(+), 59 deletions(-) diff --git a/app/celery/scheduled_tasks.py b/app/celery/scheduled_tasks.py index 53b57aff5..e949c9326 100644 --- a/app/celery/scheduled_tasks.py +++ b/app/celery/scheduled_tasks.py @@ -16,7 +16,7 @@ from app import performance_platform_client, zendesk_client from app.aws import s3 from app.celery.service_callback_tasks import ( send_delivery_status_to_service, - create_encrypted_callback_data, + create_delivery_status_callback_data, ) from app.celery.tasks import process_job from app.config import QueueNames, TaskNames @@ -212,7 +212,7 @@ def timeout_notifications(): # queue callback task only if the service_callback_api exists service_callback_api = get_service_delivery_status_callback_api_for_service(service_id=notification.service_id) if service_callback_api: - encrypted_notification = create_encrypted_callback_data(notification, service_callback_api) + encrypted_notification = create_delivery_status_callback_data(notification, service_callback_api) send_delivery_status_to_service.apply_async([str(notification.id), encrypted_notification], queue=QueueNames.CALLBACKS) diff --git a/app/celery/service_callback_tasks.py b/app/celery/service_callback_tasks.py index ebf71d1f7..bf436b883 100644 --- a/app/celery/service_callback_tasks.py +++ b/app/celery/service_callback_tasks.py @@ -17,44 +17,78 @@ from app.config import QueueNames @notify_celery.task(bind=True, name="send-delivery-status", max_retries=5, default_retry_delay=300) @statsd(namespace="tasks") -def send_delivery_status_to_service(self, notification_id, - encrypted_status_update - ): +def send_delivery_status_to_service( + self, notification_id, encrypted_status_update +): + status_update = encryption.decrypt(encrypted_status_update) + + data = { + "notification_id": str(notification_id), + "reference": status_update['notification_client_reference'], + "to": status_update['notification_to'], + "status": status_update['notification_status'], + "created_at": status_update['notification_created_at'], + "completed_at": status_update['notification_updated_at'], + "sent_at": status_update['notification_sent_at'], + "notification_type": status_update['notification_type'] + } + _send_data_to_service_callback_api( + self, + data, + status_update['service_callback_api_url'], + status_update['service_callback_api_bearer_token'], + 'send_delivery_status_to_service' + ) + + +@notify_celery.task(bind=True, name="send-complaint", max_retries=5, default_retry_delay=300) +@statsd(namespace="tasks") +def send_complaint_to_service(self, complaint_data): + complaint = encryption.decrypt(complaint_data) + + data = { + "notification_id": complaint['notification_id'], + "complaint_id": complaint['complaint_id'], + "reference": complaint['reference'], + "to": complaint['to'], + "complaint_date": complaint['complaint_date'] + } + + _send_data_to_service_callback_api( + self, + data, + complaint['service_callback_api_url'], + complaint['service_callback_api_bearer_token'], + 'send_complaint_to_service' + ) + + +def _send_data_to_service_callback_api(self, data, service_callback_url, token, function_name): + notification_id = data["notification_id"] try: - status_update = encryption.decrypt(encrypted_status_update) - - data = { - "id": str(notification_id), - "reference": status_update['notification_client_reference'], - "to": status_update['notification_to'], - "status": status_update['notification_status'], - "created_at": status_update['notification_created_at'], - "completed_at": status_update['notification_updated_at'], - "sent_at": status_update['notification_sent_at'], - "notification_type": status_update['notification_type'] - } - response = request( method="POST", - url=status_update['service_callback_api_url'], + url=service_callback_url, data=json.dumps(data), headers={ 'Content-Type': 'application/json', - 'Authorization': 'Bearer {}'.format(status_update['service_callback_api_bearer_token']) + 'Authorization': 'Bearer {}'.format(token) }, timeout=60 ) - current_app.logger.info('send_delivery_status_to_service sending {} to {}, response {}'.format( + current_app.logger.info('{} sending {} to {}, response {}'.format( + function_name, notification_id, - status_update['service_callback_api_url'], + service_callback_url, response.status_code )) response.raise_for_status() except RequestException as e: current_app.logger.warning( - "send_delivery_status_to_service request failed for notification_id: {} and url: {}. exc: {}".format( + "{} request failed for notification_id: {} and url: {}. exc: {}".format( + function_name, notification_id, - status_update['service_callback_api_url'], + service_callback_url, e ) ) @@ -63,12 +97,12 @@ def send_delivery_status_to_service(self, notification_id, self.retry(queue=QueueNames.RETRY) except self.MaxRetriesExceededError: current_app.logger.exception( - """Retry: send_delivery_status_to_service has retried the max num of times - for notification: {}""".format(notification_id) + """Retry: {} has retried the max num of times + for notification: {}""".format(function_name, notification_id) ) -def create_encrypted_callback_data(notification, service_callback_api): +def create_delivery_status_callback_data(notification, service_callback_api): from app import DATETIME_FORMAT, encryption data = { "notification_id": str(notification.id), @@ -84,3 +118,17 @@ def create_encrypted_callback_data(notification, service_callback_api): "service_callback_api_bearer_token": service_callback_api.bearer_token, } return encryption.encrypt(data) + + +def create_complaint_callback_data(complaint, notification, service_callback_api, recipient): + from app import DATETIME_FORMAT, encryption + data = { + "complaint_id": str(complaint.id), + "notification_id": str(notification.id), + "reference": notification.client_reference, + "to": recipient, + "complaint_date": complaint.complaint_date.strftime(DATETIME_FORMAT), + "service_callback_api_url": service_callback_api.url, + "service_callback_api_bearer_token": service_callback_api.bearer_token, + } + return encryption.encrypt(data) diff --git a/app/dao/service_callback_api_dao.py b/app/dao/service_callback_api_dao.py index 26a0478a3..ea23d6306 100644 --- a/app/dao/service_callback_api_dao.py +++ b/app/dao/service_callback_api_dao.py @@ -4,7 +4,7 @@ from app import db, create_uuid from app.dao.dao_utils import transactional, version_class from app.models import ServiceCallbackApi -from app.models import DELIVERY_STATUS_CALLBACK_TYPE +from app.models import DELIVERY_STATUS_CALLBACK_TYPE, COMPLAINT_CALLBACK_TYPE @transactional @@ -39,6 +39,13 @@ def get_service_delivery_status_callback_api_for_service(service_id): ).first() +def get_service_complaint_callback_api_for_service(service_id): + return ServiceCallbackApi.query.filter_by( + service_id=service_id, + callback_type=COMPLAINT_CALLBACK_TYPE + ).first() + + @transactional def delete_service_callback_api(service_callback_api): db.session.delete(service_callback_api) diff --git a/app/notifications/notifications_ses_callback.py b/app/notifications/notifications_ses_callback.py index acd0d605e..e90cfeced 100644 --- a/app/notifications/notifications_ses_callback.py +++ b/app/notifications/notifications_ses_callback.py @@ -12,12 +12,16 @@ from app.dao import ( ) from app.dao.complaint_dao import save_complaint from app.dao.notifications_dao import dao_get_notification_history_by_reference -from app.dao.service_callback_api_dao import get_service_delivery_status_callback_api_for_service +from app.dao.service_callback_api_dao import ( + get_service_delivery_status_callback_api_for_service, get_service_complaint_callback_api_for_service +) from app.models import Complaint from app.notifications.process_client_response import validate_callback_data from app.celery.service_callback_tasks import ( send_delivery_status_to_service, - create_encrypted_callback_data, + send_complaint_to_service, + create_delivery_status_callback_data, + create_complaint_callback_data ) from app.config import QueueNames @@ -38,7 +42,8 @@ def process_ses_response(ses_request): if notification_type == 'Bounce': notification_type = determine_notification_bounce_type(notification_type, ses_message) elif notification_type == 'Complaint': - handle_complaint(ses_message) + complaint, notification, recipient = handle_complaint(ses_message) + _check_and_queue_complaint_callback_task(complaint, notification, recipient) return try: @@ -105,7 +110,7 @@ def determine_notification_bounce_type(notification_type, ses_message): def handle_complaint(ses_message): - remove_emails_from_complaint(ses_message) + recipient_email = remove_emails_from_complaint(ses_message)[0] current_app.logger.info("Complaint from SES: \n{}".format(ses_message)) try: reference = ses_message['mail']['messageId'] @@ -123,6 +128,7 @@ def handle_complaint(ses_message): complaint_date=ses_complaint.get('timestamp', None) if ses_complaint else None ) save_complaint(complaint) + return complaint, notification, recipient_email def remove_mail_headers(dict_to_edit): @@ -141,13 +147,21 @@ def remove_emails_from_bounce(bounce_dict): def remove_emails_from_complaint(complaint_dict): remove_mail_headers(complaint_dict) complaint_dict['complaint'].pop('complainedRecipients') - complaint_dict['mail'].pop('destination') + return complaint_dict['mail'].pop('destination') def _check_and_queue_callback_task(notification): # queue callback task only if the service_callback_api exists service_callback_api = get_service_delivery_status_callback_api_for_service(service_id=notification.service_id) if service_callback_api: - encrypted_notification = create_encrypted_callback_data(notification, service_callback_api) - send_delivery_status_to_service.apply_async([str(notification.id), encrypted_notification], + notification_data = create_delivery_status_callback_data(notification, service_callback_api) + send_delivery_status_to_service.apply_async([str(notification.id), notification_data], queue=QueueNames.CALLBACKS) + + +def _check_and_queue_complaint_callback_task(complaint, notification, recipient): + # queue callback task only if the service_callback_api exists + service_callback_api = get_service_complaint_callback_api_for_service(service_id=notification.service_id) + if service_callback_api: + complaint_data = create_complaint_callback_data(complaint, notification, service_callback_api, recipient) + send_complaint_to_service.apply_async([complaint_data], queue=QueueNames.CALLBACKS) diff --git a/app/notifications/process_client_response.py b/app/notifications/process_client_response.py index 2cfe861a6..ce2f023e5 100644 --- a/app/notifications/process_client_response.py +++ b/app/notifications/process_client_response.py @@ -10,7 +10,7 @@ from app.clients.sms.firetext import get_firetext_responses from app.clients.sms.mmg import get_mmg_responses from app.celery.service_callback_tasks import ( send_delivery_status_to_service, - create_encrypted_callback_data, + create_delivery_status_callback_data, ) from app.config import QueueNames from app.dao.notifications_dao import dao_update_notification @@ -98,7 +98,7 @@ def _process_for_status(notification_status, client_name, provider_reference): service_callback_api = get_service_delivery_status_callback_api_for_service(service_id=notification.service_id) if service_callback_api: - encrypted_notification = create_encrypted_callback_data(notification, service_callback_api) + encrypted_notification = create_delivery_status_callback_data(notification, service_callback_api) send_delivery_status_to_service.apply_async([str(notification.id), encrypted_notification], queue=QueueNames.CALLBACKS) diff --git a/tests/app/celery/test_scheduled_tasks.py b/tests/app/celery/test_scheduled_tasks.py index 573aac282..0b192972d 100644 --- a/tests/app/celery/test_scheduled_tasks.py +++ b/tests/app/celery/test_scheduled_tasks.py @@ -60,7 +60,7 @@ from app.models import ( SMS_TYPE ) from app.utils import get_london_midnight_in_utc -from app.celery.service_callback_tasks import create_encrypted_callback_data +from app.celery.service_callback_tasks import create_delivery_status_callback_data from app.v2.errors import JobIncompleteError from tests.app.db import ( create_notification, create_service, create_template, create_job, create_rate, @@ -224,7 +224,7 @@ def test_timeout_notifications_sends_status_update_to_service(client, sample_tem seconds=current_app.config.get('SENDING_NOTIFICATIONS_TIMEOUT_PERIOD') + 10)) timeout_notifications() - encrypted_data = create_encrypted_callback_data(notification, callback_api) + encrypted_data = create_delivery_status_callback_data(notification, callback_api) mocked.assert_called_once_with([str(notification.id), encrypted_data], queue=QueueNames.CALLBACKS) diff --git a/tests/app/celery/test_service_callback_tasks.py b/tests/app/celery/test_service_callback_tasks.py index 4fbc858b1..543c90cf2 100644 --- a/tests/app/celery/test_service_callback_tasks.py +++ b/tests/app/celery/test_service_callback_tasks.py @@ -3,10 +3,12 @@ from datetime import datetime import pytest import requests_mock +from freezegun import freeze_time from app import (DATETIME_FORMAT, encryption) -from app.celery.service_callback_tasks import send_delivery_status_to_service +from app.celery.service_callback_tasks import send_delivery_status_to_service, send_complaint_to_service from tests.app.db import ( + create_complaint, create_notification, create_service_callback_api, create_service, @@ -19,7 +21,7 @@ from tests.app.db import ( def test_send_delivery_status_to_service_post_https_request_to_service_with_encrypted_data( notify_db_session, notification_type): - callback_api, template = _set_up_test_data(notification_type) + callback_api, template = _set_up_test_data(notification_type, "delivery_status") datestr = datetime(2017, 6, 20) notification = create_notification(template=template, @@ -28,7 +30,7 @@ def test_send_delivery_status_to_service_post_https_request_to_service_with_encr sent_at=datestr, status='sent' ) - encrypted_status_update = _set_up_encrypted_data(callback_api, notification) + encrypted_status_update = _set_up_data_for_status_update(callback_api, notification) with requests_mock.Mocker() as request_mock: request_mock.post(callback_api.url, json={}, @@ -36,7 +38,7 @@ def test_send_delivery_status_to_service_post_https_request_to_service_with_encr send_delivery_status_to_service(notification.id, encrypted_status_update=encrypted_status_update) mock_data = { - "id": str(notification.id), + "notification_id": str(notification.id), "reference": notification.client_reference, "to": notification.to, "status": notification.status, @@ -54,12 +56,42 @@ def test_send_delivery_status_to_service_post_https_request_to_service_with_encr assert request_mock.request_history[0].headers["Authorization"] == "Bearer {}".format(callback_api.bearer_token) +def test_send_complaint_to_service_posts_https_request_to_service_with_encrypted_data(notify_db_session): + with freeze_time('2001-01-01T12:00:00'): + callback_api, template = _set_up_test_data('email', "complaint") + + notification = create_notification(template=template) + complaint = create_complaint(service=template.service, notification=notification) + complaint_data = _set_up_data_for_complaint(callback_api, complaint, notification) + with requests_mock.Mocker() as request_mock: + request_mock.post(callback_api.url, + json={}, + status_code=200) + send_complaint_to_service(complaint_data) + + mock_data = { + "notification_id": str(notification.id), + "complaint_id": str(complaint.id), + "reference": notification.client_reference, + "to": notification.to, + "complaint_date": datetime.utcnow().strftime( + DATETIME_FORMAT), + } + + assert request_mock.call_count == 1 + assert request_mock.request_history[0].url == callback_api.url + assert request_mock.request_history[0].method == 'POST' + assert request_mock.request_history[0].text == json.dumps(mock_data) + assert request_mock.request_history[0].headers["Content-type"] == "application/json" + assert request_mock.request_history[0].headers["Authorization"] == "Bearer {}".format(callback_api.bearer_token) + + @pytest.mark.parametrize("notification_type", ["email", "letter", "sms"]) -def test_send_delivery_status_to_service_retries_if_request_returns_500_with_encrypted_data( +def test__send_data_to_service_callback_api_retries_if_request_returns_500_with_encrypted_data( notify_db_session, mocker, notification_type ): - callback_api, template = _set_up_test_data(notification_type) + callback_api, template = _set_up_test_data(notification_type, "delivery_status") datestr = datetime(2017, 6, 20) notification = create_notification(template=template, created_at=datestr, @@ -67,7 +99,7 @@ def test_send_delivery_status_to_service_retries_if_request_returns_500_with_enc sent_at=datestr, status='sent' ) - encrypted_data = _set_up_encrypted_data(callback_api, notification) + encrypted_data = _set_up_data_for_status_update(callback_api, notification) mocked = mocker.patch('app.celery.service_callback_tasks.send_delivery_status_to_service.retry') with requests_mock.Mocker() as request_mock: request_mock.post(callback_api.url, @@ -81,12 +113,12 @@ def test_send_delivery_status_to_service_retries_if_request_returns_500_with_enc @pytest.mark.parametrize("notification_type", ["email", "letter", "sms"]) -def test_send_delivery_status_to_service_does_not_retries_if_request_returns_404_with_encrypted_data( +def test__send_data_to_service_callback_api_does_not_retry_if_request_returns_404_with_encrypted_data( notify_db_session, mocker, notification_type ): - callback_api, template = _set_up_test_data(notification_type) + callback_api, template = _set_up_test_data(notification_type, "delivery_status") datestr = datetime(2017, 6, 20) notification = create_notification(template=template, created_at=datestr, @@ -94,7 +126,7 @@ def test_send_delivery_status_to_service_does_not_retries_if_request_returns_404 sent_at=datestr, status='sent' ) - encrypted_data = _set_up_encrypted_data(callback_api, notification) + encrypted_data = _set_up_data_for_status_update(callback_api, notification) mocked = mocker.patch('app.celery.service_callback_tasks.send_delivery_status_to_service.retry') with requests_mock.Mocker() as request_mock: request_mock.post(callback_api.url, @@ -109,7 +141,7 @@ def test_send_delivery_status_to_service_succeeds_if_sent_at_is_none( notify_db_session, mocker ): - callback_api, template = _set_up_test_data('email') + callback_api, template = _set_up_test_data('email', "delivery_status") datestr = datetime(2017, 6, 20) notification = create_notification(template=template, created_at=datestr, @@ -117,7 +149,7 @@ def test_send_delivery_status_to_service_succeeds_if_sent_at_is_none( sent_at=None, status='technical-failure' ) - encrypted_data = _set_up_encrypted_data(callback_api, notification) + encrypted_data = _set_up_data_for_status_update(callback_api, notification) mocked = mocker.patch('app.celery.service_callback_tasks.send_delivery_status_to_service.retry') with requests_mock.Mocker() as request_mock: request_mock.post(callback_api.url, @@ -128,15 +160,15 @@ def test_send_delivery_status_to_service_succeeds_if_sent_at_is_none( assert mocked.call_count == 0 -def _set_up_test_data(notification_type): +def _set_up_test_data(notification_type, callback_type): service = create_service(restricted=True) template = create_template(service=service, template_type=notification_type, subject='Hello') callback_api = create_service_callback_api(service=service, url="https://some.service.gov.uk/", - bearer_token="something_unique") + bearer_token="something_unique", callback_type=callback_type) return callback_api, template -def _set_up_encrypted_data(callback_api, notification): +def _set_up_data_for_status_update(callback_api, notification): data = { "notification_id": str(notification.id), "notification_client_reference": notification.client_reference, @@ -152,3 +184,17 @@ def _set_up_encrypted_data(callback_api, notification): } encrypted_status_update = encryption.encrypt(data) return encrypted_status_update + + +def _set_up_data_for_complaint(callback_api, complaint, notification): + data = { + "complaint_id": str(complaint.id), + "notification_id": str(notification.id), + "reference": notification.client_reference, + "to": notification.to, + "complaint_date": complaint.complaint_date.strftime(DATETIME_FORMAT), + "service_callback_api_url": callback_api.url, + "service_callback_api_bearer_token": callback_api.bearer_token, + } + obscured_status_update = encryption.encrypt(data) + return obscured_status_update diff --git a/tests/app/notifications/test_notifications_ses_callback.py b/tests/app/notifications/test_notifications_ses_callback.py index 701c1a21c..72166b9e8 100644 --- a/tests/app/notifications/test_notifications_ses_callback.py +++ b/tests/app/notifications/test_notifications_ses_callback.py @@ -5,7 +5,7 @@ from flask import json from freezegun import freeze_time from sqlalchemy.exc import SQLAlchemyError -from app import statsd_client +from app import statsd_client, encryption from app.dao.notifications_dao import get_notification_by_id from app.models import Notification, Complaint from app.notifications.notifications_ses_callback import ( @@ -13,7 +13,7 @@ from app.notifications.notifications_ses_callback import ( handle_complaint ) from app.celery.research_mode_tasks import ses_hard_bounce_callback, ses_soft_bounce_callback, ses_notification_callback -from app.celery.service_callback_tasks import create_encrypted_callback_data +from app.celery.service_callback_tasks import create_delivery_status_callback_data from tests.app.conftest import sample_notification as create_sample_notification from tests.app.db import ( @@ -55,7 +55,7 @@ def test_ses_callback_should_update_notification_status( ) statsd_client.incr.assert_any_call("callback.ses.delivered") updated_notification = Notification.query.get(notification.id) - encrypted_data = create_encrypted_callback_data(updated_notification, callback_api) + encrypted_data = create_delivery_status_callback_data(updated_notification, callback_api) send_mock.assert_called_once_with([str(notification.id), encrypted_data], queue="service-callbacks") @@ -220,3 +220,28 @@ def test_process_ses_results_in_complaint_save_complaint_with_null_complaint_typ assert len(complaints) == 1 assert complaints[0].notification_id == notification.id assert not complaints[0].complaint_type + + +def test_ses_callback_should_send_on_complaint_to_user_callback_api(sample_email_template, mocker): + send_mock = mocker.patch( + 'app.celery.service_callback_tasks.send_complaint_to_service.apply_async' + ) + create_service_callback_api( + service=sample_email_template.service, url="https://original_url.com", callback_type="complaint" + ) + + notification = create_notification(template=sample_email_template, reference='ref1') + response = ses_complaint_callback() + errors = process_ses_response(response) + assert errors is None + + assert send_mock.call_count == 1 + assert encryption.decrypt(send_mock.call_args[0][0][0]) == { + 'complaint_date': '2018-06-05T13:59:58.000000Z', + 'complaint_id': str(Complaint.query.one().id), + 'notification_id': str(notification.id), + 'reference': None, + 'service_callback_api_bearer_token': 'some_super_secret', + 'service_callback_api_url': 'https://original_url.com', + 'to': 'recipient1@example.com' + } diff --git a/tests/app/notifications/test_process_client_response.py b/tests/app/notifications/test_process_client_response.py index 1ea18cb69..6a22a2b03 100644 --- a/tests/app/notifications/test_process_client_response.py +++ b/tests/app/notifications/test_process_client_response.py @@ -7,7 +7,7 @@ from app.notifications.process_client_response import ( validate_callback_data, process_sms_client_response ) -from app.celery.service_callback_tasks import create_encrypted_callback_data +from app.celery.service_callback_tasks import create_delivery_status_callback_data from tests.app.db import create_service_callback_api @@ -64,7 +64,7 @@ def test_outcome_statistics_called_for_successful_callback(sample_notification, success, error = process_sms_client_response(status='3', provider_reference=reference, client_name='MMG') assert success == "MMG callback succeeded. reference {} updated".format(str(reference)) assert error is None - encrypted_data = create_encrypted_callback_data(sample_notification, callback_api) + encrypted_data = create_delivery_status_callback_data(sample_notification, callback_api) send_mock.assert_called_once_with([str(sample_notification.id), encrypted_data], queue="service-callbacks")