From ee46803a120be7a0ebbc24713f4573b444935f56 Mon Sep 17 00:00:00 2001 From: Rebecca Law Date: Mon, 19 Mar 2018 17:38:20 +0000 Subject: [PATCH 1/2] The send_delivery_status_to_service task was refactor to take the details of the notification and service api callback such that the task no longer needed to go to the database to provide the status update. This PR removes the code that is no longer used. This extra step was necessary to keep the tasks backward compatible. --- app/celery/service_callback_tasks.py | 127 +++------------ .../app/celery/test_service_callback_tasks.py | 152 +----------------- 2 files changed, 27 insertions(+), 252 deletions(-) diff --git a/app/celery/service_callback_tasks.py b/app/celery/service_callback_tasks.py index 2da41869c..ebf71d1f7 100644 --- a/app/celery/service_callback_tasks.py +++ b/app/celery/service_callback_tasks.py @@ -1,119 +1,52 @@ import json +from flask import current_app from notifications_utils.statsd_decorators import statsd - -from app import ( - db, - DATETIME_FORMAT, - notify_celery, - encryption -) -from app.dao.notifications_dao import ( - get_notification_by_id, -) - -from app.dao.service_callback_api_dao import get_service_callback_api_for_service from requests import ( HTTPError, request, RequestException ) -from flask import current_app + +from app import ( + notify_celery, + encryption +) from app.config import QueueNames @notify_celery.task(bind=True, name="send-delivery-status", max_retries=5, default_retry_delay=300) @statsd(namespace="tasks") def send_delivery_status_to_service(self, notification_id, - encrypted_status_update=None + encrypted_status_update ): - if not encrypted_status_update: - process_update_with_notification_id(self, notification_id=notification_id) - else: - try: - status_update = encryption.decrypt(encrypted_status_update) - - data = { - "id": str(notification_id), - "reference": status_update['notification_client_reference'], - "to": status_update['notification_to'], - "status": status_update['notification_status'], - "created_at": status_update['notification_created_at'], - "completed_at": status_update['notification_updated_at'], - "sent_at": status_update['notification_sent_at'], - "notification_type": status_update['notification_type'] - } - - response = request( - method="POST", - url=status_update['service_callback_api_url'], - data=json.dumps(data), - headers={ - 'Content-Type': 'application/json', - 'Authorization': 'Bearer {}'.format(status_update['service_callback_api_bearer_token']) - }, - timeout=60 - ) - current_app.logger.info('send_delivery_status_to_service sending {} to {}, response {}'.format( - notification_id, - status_update['service_callback_api_url'], - response.status_code - )) - response.raise_for_status() - except RequestException as e: - current_app.logger.warning( - "send_delivery_status_to_service request failed for notification_id: {} and url: {}. exc: {}".format( - notification_id, - status_update['service_callback_api_url'], - e - ) - ) - if not isinstance(e, HTTPError) or e.response.status_code >= 500: - try: - self.retry(queue=QueueNames.RETRY) - except self.MaxRetriesExceededError: - current_app.logger.exception( - """Retry: send_delivery_status_to_service has retried the max num of times - for notification: {}""".format(notification_id) - ) - - -def process_update_with_notification_id(self, notification_id): - retry = False try: - notification = get_notification_by_id(notification_id) - service_callback_api = get_service_callback_api_for_service(service_id=notification.service_id) - if not service_callback_api: - # No delivery receipt API info set - return - - # Release DB connection before performing an external HTTP request - db.session.close() + status_update = encryption.decrypt(encrypted_status_update) data = { "id": str(notification_id), - "reference": str(notification.client_reference), - "to": notification.to, - "status": notification.status, - "created_at": notification.created_at.strftime(DATETIME_FORMAT), - "completed_at": notification.updated_at.strftime(DATETIME_FORMAT), - "sent_at": notification.sent_at.strftime(DATETIME_FORMAT), - "notification_type": notification.notification_type + "reference": status_update['notification_client_reference'], + "to": status_update['notification_to'], + "status": status_update['notification_status'], + "created_at": status_update['notification_created_at'], + "completed_at": status_update['notification_updated_at'], + "sent_at": status_update['notification_sent_at'], + "notification_type": status_update['notification_type'] } response = request( method="POST", - url=service_callback_api.url, + url=status_update['service_callback_api_url'], data=json.dumps(data), headers={ 'Content-Type': 'application/json', - 'Authorization': 'Bearer {}'.format(service_callback_api.bearer_token) + 'Authorization': 'Bearer {}'.format(status_update['service_callback_api_bearer_token']) }, timeout=60 ) current_app.logger.info('send_delivery_status_to_service sending {} to {}, response {}'.format( notification_id, - service_callback_api.url, + status_update['service_callback_api_url'], response.status_code )) response.raise_for_status() @@ -121,26 +54,18 @@ def process_update_with_notification_id(self, notification_id): current_app.logger.warning( "send_delivery_status_to_service request failed for notification_id: {} and url: {}. exc: {}".format( notification_id, - service_callback_api.url, + status_update['service_callback_api_url'], e ) ) if not isinstance(e, HTTPError) or e.response.status_code >= 500: - retry = True - except Exception as e: - current_app.logger.exception( - 'Unhandled exception when sending callback for notification {}'.format(notification_id) - ) - retry = True - - if retry: - try: - self.retry(queue=QueueNames.RETRY) - except self.MaxRetriesExceededError: - current_app.logger.exception( - """Retry: send_delivery_status_to_service has retried the max num of times - for notification: {}""".format(notification_id) - ) + try: + self.retry(queue=QueueNames.RETRY) + except self.MaxRetriesExceededError: + current_app.logger.exception( + """Retry: send_delivery_status_to_service has retried the max num of times + for notification: {}""".format(notification_id) + ) def create_encrypted_callback_data(notification, service_callback_api): diff --git a/tests/app/celery/test_service_callback_tasks.py b/tests/app/celery/test_service_callback_tasks.py index 3816d65ae..4fbc858b1 100644 --- a/tests/app/celery/test_service_callback_tasks.py +++ b/tests/app/celery/test_service_callback_tasks.py @@ -1,22 +1,17 @@ -import uuid import json from datetime import datetime -from requests import RequestException import pytest import requests_mock -from sqlalchemy.exc import SQLAlchemyError from app import (DATETIME_FORMAT, encryption) - +from app.celery.service_callback_tasks import send_delivery_status_to_service from tests.app.db import ( create_notification, create_service_callback_api, create_service, create_template ) -from app.celery.service_callback_tasks import send_delivery_status_to_service -from app.config import QueueNames @pytest.mark.parametrize("notification_type", @@ -157,148 +152,3 @@ def _set_up_encrypted_data(callback_api, notification): } encrypted_status_update = encryption.encrypt(data) return encrypted_status_update - - -# We are updating the task to take everything it needs so that there are no db calls. -# The following tests will be deleted once that is complete. -@pytest.mark.parametrize("notification_type", - ["email", "letter", "sms"]) -def test_send_delivery_status_to_service_post_https_request_to_service( - notify_db_session, notification_type): - callback_api, template = _set_up_test_data(notification_type) - datestr = datetime(2017, 6, 20) - - notification = create_notification(template=template, - created_at=datestr, - updated_at=datestr, - sent_at=datestr, - status='sent' - ) - - with requests_mock.Mocker() as request_mock: - request_mock.post(callback_api.url, - json={}, - status_code=200) - send_delivery_status_to_service(notification.id) - - mock_data = { - "id": str(notification.id), - "reference": str(notification.client_reference), - "to": notification.to, - "status": notification.status, - "created_at": datestr.strftime(DATETIME_FORMAT), # the time GOV.UK email sent the request - "completed_at": datestr.strftime(DATETIME_FORMAT), # the last time the status was updated - "sent_at": datestr.strftime(DATETIME_FORMAT), # the time the email was sent - "notification_type": notification_type - } - - assert request_mock.call_count == 1 - assert request_mock.request_history[0].url == callback_api.url - assert request_mock.request_history[0].method == 'POST' - assert request_mock.request_history[0].text == json.dumps(mock_data) - assert request_mock.request_history[0].headers["Content-type"] == "application/json" - assert request_mock.request_history[0].headers["Authorization"] == "Bearer {}".format(callback_api.bearer_token) - - -@pytest.mark.parametrize("notification_type", - ["email", "letter", "sms"]) -def test_send_delivery_status_to_service_does_not_sent_request_when_service_callback_api_does_not_exist( - notify_db_session, mocker, notification_type): - service = create_service(restricted=True) - template = create_template(service=service, template_type=notification_type, subject='Hello') - datestr = datetime(2017, 6, 20) - - notification = create_notification(template=template, - created_at=datestr, - updated_at=datestr, - sent_at=datestr, - status='sent' - ) - mocked = mocker.patch("requests.request") - send_delivery_status_to_service(notification.id) - - assert mocked.call_count == 0 - - -@pytest.mark.parametrize("notification_type", - ["email", "letter", "sms"]) -def test_send_delivery_status_to_service_retries_if_request_returns_500(notify_db_session, - mocker, - notification_type): - callback_api, template = _set_up_test_data(notification_type) - datestr = datetime(2017, 6, 20) - notification = create_notification(template=template, - created_at=datestr, - updated_at=datestr, - sent_at=datestr, - status='sent' - ) - mocked = mocker.patch('app.celery.service_callback_tasks.send_delivery_status_to_service.retry') - with requests_mock.Mocker() as request_mock: - request_mock.post(callback_api.url, - json={}, - status_code=500) - send_delivery_status_to_service(notification.id) - - assert mocked.call_count == 1 - assert mocked.call_args[1]['queue'] == 'retry-tasks' - - -@pytest.mark.parametrize("notification_type", - ["email", "letter", "sms"]) -def test_send_delivery_status_to_service_retries_if_request_throws_unknown(notify_db_session, - mocker, - notification_type): - - callback_api, template = _set_up_test_data(notification_type) - datestr = datetime(2017, 6, 20) - notification = create_notification(template=template, - created_at=datestr, - updated_at=datestr, - sent_at=datestr, - status='sent' - ) - - mocked = mocker.patch('app.celery.service_callback_tasks.send_delivery_status_to_service.retry') - mocker.patch("app.celery.tasks.request", side_effect=RequestException()) - - send_delivery_status_to_service(notification.id) - - assert mocked.call_count == 1 - assert mocked.call_args[1]['queue'] == 'retry-tasks' - - -@pytest.mark.parametrize("notification_type", - ["email", "letter", "sms"]) -def test_send_delivery_status_to_service_does_not_retries_if_request_returns_404( - notify_db_session, - mocker, - notification_type -): - callback_api, template = _set_up_test_data(notification_type) - datestr = datetime(2017, 6, 20) - notification = create_notification(template=template, - created_at=datestr, - updated_at=datestr, - sent_at=datestr, - status='sent' - ) - mocked = mocker.patch('app.celery.service_callback_tasks.send_delivery_status_to_service.retry') - with requests_mock.Mocker() as request_mock: - request_mock.post(callback_api.url, - json={}, - status_code=404) - send_delivery_status_to_service(notification.id) - - assert mocked.call_count == 0 - - -def test_send_delivery_status_to_service_retries_if_database_error(client, mocker): - notification_id = uuid.uuid4() - db_call = mocker.patch('app.celery.service_callback_tasks.get_notification_by_id', side_effect=SQLAlchemyError) - retry = mocker.patch('app.celery.service_callback_tasks.send_delivery_status_to_service.retry') - - send_delivery_status_to_service(notification_id) - - db_call.assert_called_once_with(notification_id) - retry.assert_called_once_with(queue=QueueNames.RETRY) From 0798154fa26f965f80b17e7a945055c1030be415 Mon Sep 17 00:00:00 2001 From: Rebecca Law Date: Tue, 20 Mar 2018 15:48:32 +0000 Subject: [PATCH 2/2] Optimize the query used to return the services and todays notification totals. By changing the created_at filter to use a specific date range I found a significant improvement to the queries performance. The unit test needed to change because now were are returning todays date as BST the local timezone. Query plan before Merge Left Join (cost=1226133.76..1226143.77 rows=1753 width=70) (actual time=5800.160..5801.657 rows=1849 loops=1) Merge Cond: (services.id = anon_1.service_id) -> Sort (cost=152.99..157.37 rows=1753 width=46) (actual time=2.205..2.631 rows=1762 loops=1) Sort Key: services.id Sort Method: quicksort Memory: 224kB -> Seq Scan on services (cost=0.00..58.54 rows=1753 width=46) (actual time=0.011..1.156 rows=1762 loops=1) Filter: active Rows Removed by Filter: 101 -> Sort (cost=1225980.77..1225980.99 rows=86 width=40) (actual time=5797.949..5797.984 rows=198 loops=1) Sort Key: anon_1.service_id Sort Method: quicksort Memory: 40kB -> Subquery Scan on anon_1 (cost=1225976.29..1225978.01 rows=86 width=40) (actual time=5797.682..5797.823 rows=198 loops=1) -> HashAggregate (cost=1225976.29..1225977.15 rows=86 width=48) (actual time=5797.681..5797.747 rows=198 loops=1) Group Key: notifications.notification_type, notifications.notification_status, notifications.service_id -> Seq Scan on notifications (cost=0.00..1220610.86 rows=536543 width=48) (actual time=0.064..5482.975 rows=643799 loops=1) Filter: (((key_type)::text <> 'TEST'::text) AND (date(created_at) = '2018-03-20'::date)) Rows Removed by Filter: 6804774 Planning time: 1.106 ms Execution time: 5802.130 ms Query plan after Merge Left Join (cost=953378.30..953388.30 rows=1753 width=70) (actual time=2380.144..2382.499 rows=1852 loops=1) Merge Cond: (services.id = anon_1.service_id) -> Sort (cost=152.99..157.37 rows=1753 width=46) (actual time=2.944..3.570 rows=1762 loops=1) Sort Key: services.id Sort Method: quicksort Memory: 224kB -> Seq Scan on services (cost=0.00..58.54 rows=1753 width=46) (actual time=0.006..1.294 rows=1762 loops=1) Filter: active Rows Removed by Filter: 101 -> Sort (cost=953225.31..953225.53 rows=86 width=40) (actual time=2377.194..2377.262 rows=201 loops=1) Sort Key: anon_1.service_id Sort Method: quicksort Memory: 40kB -> Subquery Scan on anon_1 (cost=953220.83..953222.55 rows=86 width=40) (actual time=2376.797..2377.034 rows=201 loops=1) -> HashAggregate (cost=953220.83..953221.69 rows=86 width=48) (actual time=2376.795..2376.905 rows=201 loops=1) Group Key: notifications.notification_type, notifications.notification_status, notifications.service_id -> Bitmap Heap Scan on notifications (cost=29883.14..947856.24 rows=536459 width=48) (actual time=270.061..1887.754 rows=644735 loops=1) Recheck Cond: ((created_at >= '2018-03-20 00:00:00'::timestamp without time zone) AND (created_at < '2018-03-21 00:00:00'::timestamp without time zone)) Rows Removed by Index Recheck: 947427 Filter: ((key_type)::text <> 'TEST'::text) Heap Blocks: exact=40882 lossy=186483 -> Bitmap Index Scan on ix_notifications_created_at (cost=0.00..29749.02 rows=536459 width=0) (actual time=258.631..258.631 rows=644849 loops=1) Index Cond: ((created_at >= '2018-03-20 00:00:00'::timestamp without time zone) AND (created_at < '2018-03-21 00:00:00'::timestamp without time zone)) Planning time: 0.548 ms Execution time: 2383.485 ms --- app/dao/services_dao.py | 6 +++++- tests/app/service/test_rest.py | 3 ++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/app/dao/services_dao.py b/app/dao/services_dao.py index 027cd9f82..c362e19d9 100644 --- a/app/dao/services_dao.py +++ b/app/dao/services_dao.py @@ -333,6 +333,9 @@ def dao_fetch_monthly_historical_stats_for_service(service_id, year): @statsd(namespace='dao') def dao_fetch_todays_stats_for_all_services(include_from_test_key=True, only_active=True): + today = date.today() + start_date = get_london_midnight_in_utc(today) + end_date = get_london_midnight_in_utc(today + timedelta(days=1)) subquery = db.session.query( Notification.notification_type, @@ -340,7 +343,8 @@ def dao_fetch_todays_stats_for_all_services(include_from_test_key=True, only_act Notification.service_id, func.count(Notification.id).label('count') ).filter( - func.date(Notification.created_at) == date.today(), + Notification.created_at >= start_date, + Notification.created_at < end_date ).group_by( Notification.notification_type, Notification.status, diff --git a/tests/app/service/test_rest.py b/tests/app/service/test_rest.py index 912e8c120..92e8ddd41 100644 --- a/tests/app/service/test_rest.py +++ b/tests/app/service/test_rest.py @@ -1635,6 +1635,7 @@ def test_get_detailed_services_only_includes_todays_notifications(notify_db, not create_sample_notification(notify_db, notify_db_session, created_at=datetime(2015, 10, 9, 23, 59)) create_sample_notification(notify_db, notify_db_session, created_at=datetime(2015, 10, 10, 0, 0)) create_sample_notification(notify_db, notify_db_session, created_at=datetime(2015, 10, 10, 12, 0)) + create_sample_notification(notify_db, notify_db_session, created_at=datetime(2015, 10, 10, 23, 0)) with freeze_time('2015-10-10T12:00:00'): data = get_detailed_services(start_date=datetime.utcnow().date(), end_date=datetime.utcnow().date()) @@ -1643,7 +1644,7 @@ def test_get_detailed_services_only_includes_todays_notifications(notify_db, not assert len(data) == 1 assert data[0]['statistics'] == { EMAIL_TYPE: {'delivered': 0, 'failed': 0, 'requested': 0}, - SMS_TYPE: {'delivered': 0, 'failed': 0, 'requested': 2}, + SMS_TYPE: {'delivered': 0, 'failed': 0, 'requested': 3}, LETTER_TYPE: {'delivered': 0, 'failed': 0, 'requested': 0} }