From 00b17b5ad7e49c55ee8762212e25015676d056b5 Mon Sep 17 00:00:00 2001 From: Rebecca Law Date: Thu, 8 Mar 2018 16:03:16 +0000 Subject: [PATCH 1/3] When we sent the service the status callback for a notification, we have all the information we need. Which means we can remove the need to request the data from the database. In order for the PR to be backwards compatible I have added an optional parameter "encrypted_status_update". If this is not None then the new code is called. The next PR will send the encrypted data to this task. A final PR will remove the code that uses the database to get the notification and service callback api. --- app/celery/service_callback_tasks.py | 62 +++++- .../app/celery/test_service_callback_tasks.py | 186 +++++++++++++----- 2 files changed, 191 insertions(+), 57 deletions(-) diff --git a/app/celery/service_callback_tasks.py b/app/celery/service_callback_tasks.py index eaa63dc00..79f85e651 100644 --- a/app/celery/service_callback_tasks.py +++ b/app/celery/service_callback_tasks.py @@ -6,6 +6,7 @@ from app import ( db, DATETIME_FORMAT, notify_celery, + encryption ) from app.dao.notifications_dao import ( get_notification_by_id, @@ -23,10 +24,61 @@ from app.config import QueueNames @notify_celery.task(bind=True, name="send-delivery-status", max_retries=5, default_retry_delay=300) @statsd(namespace="tasks") -def send_delivery_status_to_service(self, notification_id): +def send_delivery_status_to_service(self, notification_id, + encrypted_status_update=None + ): + if not encrypted_status_update: + process_update_with_notification_id(self, notification_id=notification_id) + else: + try: + status_update = encryption.decrypt(encrypted_status_update) + + data = { + "id": str(notification_id), + "reference": status_update['notification_client_reference'], + "to": status_update['notification_to'], + "status": status_update['notification_status'], + "created_at": status_update['notification_created_at'], + "completed_at": status_update['notification_updated_at'], + "sent_at": status_update['notification_sent_at'], + "notification_type": status_update['notification_type'] + } + + response = request( + method="POST", + url=status_update['service_callback_api_url'], + data=json.dumps(data), + headers={ + 'Content-Type': 'application/json', + 'Authorization': 'Bearer {}'.format(status_update['service_callback_api_bearer_token']) + }, + timeout=60 + ) + current_app.logger.info('send_delivery_status_to_service sending {} to {}, response {}'.format( + notification_id, + status_update['service_callback_api_url'], + response.status_code + )) + response.raise_for_status() + except RequestException as e: + current_app.logger.warning( + "send_delivery_status_to_service request failed for service_id: {} and url: {}. exc: {}".format( + notification_id, + status_update['service_callback_api_url'], + e + ) + ) + if not isinstance(e, HTTPError) or e.response.status_code >= 500: + try: + self.retry(queue=QueueNames.RETRY) + except self.MaxRetriesExceededError: + current_app.logger.exception( + 'Retry: send_delivery_status_to_service has retried the max num of times') + + +def process_update_with_notification_id(self, notification_id): retry = False try: - # TODO: do we need to do rate limit this? notification = get_notification_by_id(notification_id) service_callback_api = get_service_callback_api_for_service(service_id=notification.service_id) if not service_callback_api: @@ -41,9 +93,9 @@ def send_delivery_status_to_service(self, notification_id): "reference": str(notification.client_reference), "to": notification.to, "status": notification.status, - "created_at": notification.created_at.strftime(DATETIME_FORMAT), # the time service sent the request - "completed_at": notification.updated_at.strftime(DATETIME_FORMAT), # the last time the status was updated - "sent_at": notification.sent_at.strftime(DATETIME_FORMAT), # the time the email was sent + "created_at": notification.created_at.strftime(DATETIME_FORMAT), + "completed_at": notification.updated_at.strftime(DATETIME_FORMAT), + "sent_at": notification.sent_at.strftime(DATETIME_FORMAT), "notification_type": notification.notification_type } diff --git a/tests/app/celery/test_service_callback_tasks.py b/tests/app/celery/test_service_callback_tasks.py index c387fdb44..11abf5f0f 100644 --- a/tests/app/celery/test_service_callback_tasks.py +++ b/tests/app/celery/test_service_callback_tasks.py @@ -7,16 +7,13 @@ import pytest import requests_mock from sqlalchemy.exc import SQLAlchemyError -from app import (DATETIME_FORMAT) +from app import (DATETIME_FORMAT, encryption) -from tests.app.conftest import ( - sample_service as create_sample_service, - sample_template as create_sample_template, -) from tests.app.db import ( create_notification, - create_user, - create_service_callback_api + create_service_callback_api, + create_service, + create_template ) from app.celery.service_callback_tasks import send_delivery_status_to_service from app.config import QueueNames @@ -24,18 +21,127 @@ from app.config import QueueNames @pytest.mark.parametrize("notification_type", ["email", "letter", "sms"]) -def test_send_delivery_status_to_service_post_https_request_to_service(notify_db, - notify_db_session, - notification_type): - user = create_user() - service = create_sample_service(notify_db, notify_db_session, user=user, restricted=True) +def test_send_delivery_status_to_service_post_https_request_to_service_with_encrypted_data( + notify_db_session, notification_type): + callback_api, template = _set_up_test_data(notification_type) + datestr = datetime(2017, 6, 20) + + notification = create_notification(template=template, + created_at=datestr, + updated_at=datestr, + sent_at=datestr, + status='sent' + ) + encrypted_status_update = _set_up_encrypted_data(callback_api, notification) + with requests_mock.Mocker() as request_mock: + request_mock.post(callback_api.url, + json={}, + status_code=200) + send_delivery_status_to_service(notification.id, encrypted_status_update=encrypted_status_update) + + mock_data = { + "id": str(notification.id), + "reference": notification.client_reference, + "to": notification.to, + "status": notification.status, + "created_at": datestr.strftime(DATETIME_FORMAT), + "completed_at": datestr.strftime(DATETIME_FORMAT), + "sent_at": datestr.strftime(DATETIME_FORMAT), + "notification_type": notification_type + } + + assert request_mock.call_count == 1 + assert request_mock.request_history[0].url == callback_api.url + assert request_mock.request_history[0].method == 'POST' + assert request_mock.request_history[0].text == json.dumps(mock_data) + assert request_mock.request_history[0].headers["Content-type"] == "application/json" + assert request_mock.request_history[0].headers["Authorization"] == "Bearer {}".format(callback_api.bearer_token) + + +@pytest.mark.parametrize("notification_type", + ["email", "letter", "sms"]) +def test_send_delivery_status_to_service_retries_if_request_returns_500_with_encrypted_data( + notify_db_session, mocker, notification_type +): + callback_api, template = _set_up_test_data(notification_type) + datestr = datetime(2017, 6, 20) + notification = create_notification(template=template, + created_at=datestr, + updated_at=datestr, + sent_at=datestr, + status='sent' + ) + encrypted_data = _set_up_encrypted_data(callback_api, notification) + mocked = mocker.patch('app.celery.service_callback_tasks.send_delivery_status_to_service.retry') + with requests_mock.Mocker() as request_mock: + request_mock.post(callback_api.url, + json={}, + status_code=500) + send_delivery_status_to_service(notification.id, encrypted_status_update=encrypted_data) + + assert mocked.call_count == 1 + assert mocked.call_args[1]['queue'] == 'retry-tasks' + + +@pytest.mark.parametrize("notification_type", + ["email", "letter", "sms"]) +def test_send_delivery_status_to_service_does_not_retries_if_request_returns_404_with_encrypted_data( + notify_db_session, + mocker, + notification_type +): + callback_api, template = _set_up_test_data(notification_type) + datestr = datetime(2017, 6, 20) + notification = create_notification(template=template, + created_at=datestr, + updated_at=datestr, + sent_at=datestr, + status='sent' + ) + encrypted_data = _set_up_encrypted_data(callback_api, notification) + mocked = mocker.patch('app.celery.service_callback_tasks.send_delivery_status_to_service.retry') + with requests_mock.Mocker() as request_mock: + request_mock.post(callback_api.url, + json={}, + status_code=404) + send_delivery_status_to_service(notification.id, encrypted_status_update=encrypted_data) + + assert mocked.call_count == 0 + + +def _set_up_test_data(notification_type): + service = create_service(restricted=True) + template = create_template(service=service, template_type=notification_type, subject='Hello') callback_api = create_service_callback_api(service=service, url="https://some.service.gov.uk/", bearer_token="something_unique") - template = create_sample_template( - notify_db, notify_db_session, service=service, template_type=notification_type, subject_line='Hello' - ) + return callback_api, template + +def _set_up_encrypted_data(callback_api, notification): + data = { + "notification_id": str(notification.id), + "notification_client_reference": notification.client_reference, + "notification_to": notification.to, + "notification_status": notification.status, + "notification_created_at": notification.created_at.strftime(DATETIME_FORMAT), + "notification_updated_at": notification.updated_at.strftime(DATETIME_FORMAT), + "notification_sent_at": notification.sent_at.strftime(DATETIME_FORMAT), + "notification_type": notification.notification_type, + "service_callback_api_url": callback_api.url, + "service_callback_api_bearer_token": callback_api.bearer_token, + } + encrypted_status_update = encryption.encrypt(data) + return encrypted_status_update + + +# We are updating the task to take everything it needs so that there are no db calls. +# The following tests will be deleted once that is complete. +@pytest.mark.parametrize("notification_type", + ["email", "letter", "sms"]) +def test_send_delivery_status_to_service_post_https_request_to_service( + notify_db_session, notification_type): + callback_api, template = _set_up_test_data(notification_type) datestr = datetime(2017, 6, 20) notification = create_notification(template=template, @@ -73,12 +179,9 @@ def test_send_delivery_status_to_service_post_https_request_to_service(notify_db @pytest.mark.parametrize("notification_type", ["email", "letter", "sms"]) def test_send_delivery_status_to_service_does_not_sent_request_when_service_callback_api_does_not_exist( - notify_db, notify_db_session, mocker, notification_type): - service = create_sample_service(notify_db, notify_db_session, restricted=True) - - template = create_sample_template( - notify_db, notify_db_session, service=service, template_type=notification_type, subject_line='Hello' - ) + notify_db_session, mocker, notification_type): + service = create_service(restricted=True) + template = create_template(service=service, template_type=notification_type, subject='Hello') datestr = datetime(2017, 6, 20) notification = create_notification(template=template, @@ -95,18 +198,10 @@ def test_send_delivery_status_to_service_does_not_sent_request_when_service_call @pytest.mark.parametrize("notification_type", ["email", "letter", "sms"]) -def test_send_delivery_status_to_service_retries_if_request_returns_500(notify_db, - notify_db_session, +def test_send_delivery_status_to_service_retries_if_request_returns_500(notify_db_session, mocker, notification_type): - user = create_user() - service = create_sample_service(notify_db, notify_db_session, user=user, restricted=True) - - template = create_sample_template( - notify_db, notify_db_session, service=service, template_type=notification_type, subject_line='Hello' - ) - callback_api = create_service_callback_api(service=service, url="https://some.service.gov.uk/", - bearer_token="something_unique") + callback_api, template = _set_up_test_data(notification_type) datestr = datetime(2017, 6, 20) notification = create_notification(template=template, created_at=datestr, @@ -127,18 +222,11 @@ def test_send_delivery_status_to_service_retries_if_request_returns_500(notify_d @pytest.mark.parametrize("notification_type", ["email", "letter", "sms"]) -def test_send_delivery_status_to_service_retries_if_request_throws_unknown(notify_db, - notify_db_session, +def test_send_delivery_status_to_service_retries_if_request_throws_unknown(notify_db_session, mocker, notification_type): - user = create_user() - service = create_sample_service(notify_db, notify_db_session, user=user, restricted=True) - template = create_sample_template( - notify_db, notify_db_session, service=service, template_type=notification_type, subject_line='Hello' - ) - create_service_callback_api(service=service, url="https://some.service.gov.uk/", - bearer_token="something_unique") + callback_api, template = _set_up_test_data(notification_type) datestr = datetime(2017, 6, 20) notification = create_notification(template=template, created_at=datestr, @@ -158,18 +246,12 @@ def test_send_delivery_status_to_service_retries_if_request_throws_unknown(notif @pytest.mark.parametrize("notification_type", ["email", "letter", "sms"]) -def test_send_delivery_status_to_service_does_not_retries_if_request_returns_404(notify_db, - notify_db_session, - mocker, - notification_type): - user = create_user() - service = create_sample_service(notify_db, notify_db_session, user=user, restricted=True) - - template = create_sample_template( - notify_db, notify_db_session, service=service, template_type=notification_type, subject_line='Hello' - ) - callback_api = create_service_callback_api(service=service, url="https://some.service.gov.uk/", - bearer_token="something_unique") +def test_send_delivery_status_to_service_does_not_retries_if_request_returns_404( + notify_db_session, + mocker, + notification_type +): + callback_api, template = _set_up_test_data(notification_type) datestr = datetime(2017, 6, 20) notification = create_notification(template=template, created_at=datestr, From e95740a6b517b4da7141087c19dbeb26e1c38df7 Mon Sep 17 00:00:00 2001 From: Rebecca Law Date: Fri, 9 Mar 2018 11:06:47 +0000 Subject: [PATCH 2/3] There was a problem with the worker that was sending the service updates for the notification. The problem has been resolved but we need to replay the messages that are missing. We have been sent a file containing client_references for all the notificaitons that the service would needs updates for. --- app/commands.py | 70 +++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 62 insertions(+), 8 deletions(-) diff --git a/app/commands.py b/app/commands.py index 8c807f1e0..1098863ae 100644 --- a/app/commands.py +++ b/app/commands.py @@ -1,29 +1,33 @@ +import functools import uuid from datetime import datetime, timedelta from decimal import Decimal -import functools -import flask -from flask import current_app import click +import flask from click_datetime import Datetime as click_dt +from flask import current_app +from sqlalchemy.orm.exc import NoResultFound -from app import db +from app import db, DATETIME_FORMAT, encryption +from app.celery.scheduled_tasks import send_total_sent_notifications_to_performance_platform +from app.celery.service_callback_tasks import send_delivery_status_to_service +from app.config import QueueNames from app.dao.monthly_billing_dao import ( create_or_update_monthly_billing, get_monthly_billing_by_notification_type, get_service_ids_that_need_billing_populated ) -from app.models import PROVIDERS, User, SMS_TYPE, EMAIL_TYPE +from app.dao.provider_rates_dao import create_provider_rates as dao_create_provider_rates +from app.dao.service_callback_api_dao import get_service_callback_api_for_service from app.dao.services_dao import ( delete_service_and_all_associated_db_objects, dao_fetch_all_services_by_user ) -from app.dao.provider_rates_dao import create_provider_rates as dao_create_provider_rates from app.dao.users_dao import (delete_model_user, delete_user_verify_codes) -from app.utils import get_midnight_for_day_before, get_london_midnight_in_utc +from app.models import PROVIDERS, User, SMS_TYPE, EMAIL_TYPE, Notification from app.performance_platform.processing_time import (send_processing_time_for_start_and_end) -from app.celery.scheduled_tasks import send_total_sent_notifications_to_performance_platform +from app.utils import get_midnight_for_day_before, get_london_midnight_in_utc @click.group(name='command', help='Additional commands') @@ -311,5 +315,55 @@ def insert_inbound_numbers_from_file(file_name): file.close() +@notify_command(name='replay-service-callbacks') +@click.option('-f', '--file_name', required=True, + help="""Full path of the file to upload, file is a contains client references of + notifications that need the status to be sent to the service.""") +@click.option('-s', '--service_id', required=True, + help="""The service that the callbacks are for""") +def replay_service_callbacks(file_name, service_id): + print("Start send service callbacks for service: ", service_id) + callback_api = get_service_callback_api_for_service(service_id=service_id) + if not callback_api: + print("Callback api was not found for service: {}".format(service_id)) + return + + errors = [] + notifications = [] + file = open(file_name) + + for ref in file: + try: + notification = Notification.query.filter_by(client_reference=ref.strip()).one() + notifications.append(notification) + except NoResultFound as e: + errors.append("Reference: {} was not found in notifications.".format(ref)) + + for e in errors: + print(e) + if errors: + raise Exception("Some notifications for the given references were not found") + + for n in notifications: + data = { + "notification_id": str(n.id), + "notification_client_reference": n.client_reference, + "notification_to": n.to, + "notification_status": n.status, + "notification_created_at": n.created_at.strftime(DATETIME_FORMAT), + "notification_updated_at": n.updated_at.strftime(DATETIME_FORMAT), + "notification_sent_at": n.sent_at.strftime(DATETIME_FORMAT), + "notification_type": n.notification_type, + "service_callback_api_url": callback_api.url, + "service_callback_api_bearer_token": callback_api.bearer_token, + } + encrypted_status_update = encryption.encrypt(data) + send_delivery_status_to_service.apply_async([str(n.id), encrypted_status_update], + queue=QueueNames.CALLBACKS) + + print("Replay service status for service: {}. Sent {} notification status updates".format(service_id, + len(notifications))) + + def setup_commands(application): application.cli.add_command(command_group) From a3d04ca67229e1d3bcb0b201527f367b3248b9ff Mon Sep 17 00:00:00 2001 From: Rebecca Law Date: Fri, 9 Mar 2018 12:01:08 +0000 Subject: [PATCH 3/3] Improve log message --- app/celery/service_callback_tasks.py | 9 +++++++-- app/commands.py | 4 ++-- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/app/celery/service_callback_tasks.py b/app/celery/service_callback_tasks.py index 79f85e651..21e87266e 100644 --- a/app/celery/service_callback_tasks.py +++ b/app/celery/service_callback_tasks.py @@ -73,7 +73,9 @@ def send_delivery_status_to_service(self, notification_id, self.retry(queue=QueueNames.RETRY) except self.MaxRetriesExceededError: current_app.logger.exception( - 'Retry: send_delivery_status_to_service has retried the max num of times') + """Retry: send_delivery_status_to_service has retried the max num of times + for notification: {}""".format(notification_id) + ) def process_update_with_notification_id(self, notification_id): @@ -135,4 +137,7 @@ def process_update_with_notification_id(self, notification_id): try: self.retry(queue=QueueNames.RETRY) except self.MaxRetriesExceededError: - current_app.logger.exception('Retry: send_delivery_status_to_service has retried the max num of times') + current_app.logger.exception( + """Retry: send_delivery_status_to_service has retried the max num of times + for notification: {}""".format(notification_id) + ) diff --git a/app/commands.py b/app/commands.py index 1098863ae..23fc18c0a 100644 --- a/app/commands.py +++ b/app/commands.py @@ -361,8 +361,8 @@ def replay_service_callbacks(file_name, service_id): send_delivery_status_to_service.apply_async([str(n.id), encrypted_status_update], queue=QueueNames.CALLBACKS) - print("Replay service status for service: {}. Sent {} notification status updates".format(service_id, - len(notifications))) + print("Replay service status for service: {}. Sent {} notification status updates to the queue".format( + service_id, len(notifications))) def setup_commands(application):