From 22f86aa1b5c85b6f4b73b8b9a5a1f4e21195ecd1 Mon Sep 17 00:00:00 2001 From: Leo Hemsted Date: Thu, 8 Mar 2018 13:35:53 +0000 Subject: [PATCH 1/2] Revert service callback worker eventlets We have seen problems with the service callback workers due to the db connection pool being exhausted. When the worker picks up the task, it makes a db query to get the notification, a query to get the callback url, and then closes the session before it makes the 3rd party request. However, even closing the session before the (potentially lengthy) web request wasn't enough - we've seen significant amounts of `sqlalchemy.exc.TimeoutError`s. This reverts commit 2dfbd93c7ed65028706e04762ea5102c60e5acbf --- manifest-delivery-base.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/manifest-delivery-base.yml b/manifest-delivery-base.yml index e141c6482..82bad9b2d 100644 --- a/manifest-delivery-base.yml +++ b/manifest-delivery-base.yml @@ -95,6 +95,6 @@ applications: NOTIFY_APP_NAME: delivery-worker-receipts - name: notify-delivery-worker-service-callbacks - command: scripts/run_app_paas.sh celery -A run_celery.notify_celery worker --loglevel=INFO -P eventlet -c 1000 -Q service-callbacks + command: scripts/run_app_paas.sh celery -A run_celery.notify_celery worker --loglevel=INFO --concurrency=11 -Q service-callbacks env: - NOTIFY_APP_NAME: delivery-worker-service-callbacks + NOTIFY_APP_NAME: delivery-worker-service-callbacks \ No newline at end of file From 651c3062b96c7329bd73f67a1a2cf6a2acf571c6 Mon Sep 17 00:00:00 2001 From: Leo Hemsted Date: Thu, 8 Mar 2018 14:03:01 +0000 Subject: [PATCH 2/2] retry service callbacks if the db queries fail we don't expect them to fail, but they might if we accidentally exhaust our connection pool. Just in case, lets retry. --- app/celery/service_callback_tasks.py | 59 +++++++++++-------- .../app/celery/test_service_callback_tasks.py | 21 +++++-- 2 files changed, 51 insertions(+), 29 deletions(-) diff --git a/app/celery/service_callback_tasks.py b/app/celery/service_callback_tasks.py index d1179ff17..eaa63dc00 100644 --- a/app/celery/service_callback_tasks.py +++ b/app/celery/service_callback_tasks.py @@ -24,28 +24,29 @@ from app.config import QueueNames @notify_celery.task(bind=True, name="send-delivery-status", max_retries=5, default_retry_delay=300) @statsd(namespace="tasks") def send_delivery_status_to_service(self, notification_id): - # TODO: do we need to do rate limit this? - notification = get_notification_by_id(notification_id) - service_callback_api = get_service_callback_api_for_service(service_id=notification.service_id) - if not service_callback_api: - # No delivery receipt API info set - return - - # Release DB connection before performing an external HTTP request - db.session.close() - - data = { - "id": str(notification_id), - "reference": str(notification.client_reference), - "to": notification.to, - "status": notification.status, - "created_at": notification.created_at.strftime(DATETIME_FORMAT), # the time service sent the request - "completed_at": notification.updated_at.strftime(DATETIME_FORMAT), # the last time the status was updated - "sent_at": notification.sent_at.strftime(DATETIME_FORMAT), # the time the email was sent - "notification_type": notification.notification_type - } - + retry = False try: + # TODO: do we need to do rate limit this? + notification = get_notification_by_id(notification_id) + service_callback_api = get_service_callback_api_for_service(service_id=notification.service_id) + if not service_callback_api: + # No delivery receipt API info set + return + + # Release DB connection before performing an external HTTP request + db.session.close() + + data = { + "id": str(notification_id), + "reference": str(notification.client_reference), + "to": notification.to, + "status": notification.status, + "created_at": notification.created_at.strftime(DATETIME_FORMAT), # the time service sent the request + "completed_at": notification.updated_at.strftime(DATETIME_FORMAT), # the last time the status was updated + "sent_at": notification.sent_at.strftime(DATETIME_FORMAT), # the time the email was sent + "notification_type": notification.notification_type + } + response = request( method="POST", url=service_callback_api.url, @@ -71,7 +72,15 @@ def send_delivery_status_to_service(self, notification_id): ) ) if not isinstance(e, HTTPError) or e.response.status_code >= 500: - try: - self.retry(queue=QueueNames.RETRY) - except self.MaxRetriesExceededError: - current_app.logger.exception('Retry: send_delivery_status_to_service has retried the max num of times') + retry = True + except Exception as e: + current_app.logger.exception( + 'Unhandled exception when sending callback for notification {}'.format(notification_id) + ) + retry = True + + if retry: + try: + self.retry(queue=QueueNames.RETRY) + except self.MaxRetriesExceededError: + current_app.logger.exception('Retry: send_delivery_status_to_service has retried the max num of times') diff --git a/tests/app/celery/test_service_callback_tasks.py b/tests/app/celery/test_service_callback_tasks.py index a7c9aaeb4..c387fdb44 100644 --- a/tests/app/celery/test_service_callback_tasks.py +++ b/tests/app/celery/test_service_callback_tasks.py @@ -1,10 +1,11 @@ +import uuid import json from datetime import datetime +from requests import RequestException import pytest import requests_mock - -from requests import RequestException +from sqlalchemy.exc import SQLAlchemyError from app import (DATETIME_FORMAT) @@ -18,6 +19,7 @@ from tests.app.db import ( create_service_callback_api ) from app.celery.service_callback_tasks import send_delivery_status_to_service +from app.config import QueueNames @pytest.mark.parametrize("notification_type", @@ -88,7 +90,7 @@ def test_send_delivery_status_to_service_does_not_sent_request_when_service_call mocked = mocker.patch("requests.request") send_delivery_status_to_service(notification.id) - mocked.call_count == 0 + assert mocked.call_count == 0 @pytest.mark.parametrize("notification_type", @@ -182,4 +184,15 @@ def test_send_delivery_status_to_service_does_not_retries_if_request_returns_404 status_code=404) send_delivery_status_to_service(notification.id) - mocked.call_count == 0 + assert mocked.call_count == 0 + + +def test_send_delivery_status_to_service_retries_if_database_error(client, mocker): + notification_id = uuid.uuid4() + db_call = mocker.patch('app.celery.service_callback_tasks.get_notification_by_id', side_effect=SQLAlchemyError) + retry = mocker.patch('app.celery.service_callback_tasks.send_delivery_status_to_service.retry') + + send_delivery_status_to_service(notification_id) + + db_call.assert_called_once_with(notification_id) + retry.assert_called_once_with(queue=QueueNames.RETRY)