Files
notifications-api/app/celery/service_callback_tasks.py
Alexey Bezhan 9eada23392 Release DB connection before executing service API callback
Flask-SQLAlchemy sets up a connection pool with 5 connections and will
create up to 10 additional connections if all the pool ones are in use.

If all connections in the pool and all overflow connections are in
use, SQLAlchemy will block new DB sessions until a connection becomes
available. If a session can't acquire a connections for a specified
time (we set it to 30s) then a TimeoutError is raised.

By default db.session is deleted with the related context object
(so when the request is finished or app context is discarded).

This effectively limits the number of concurrent requests/tasks with
multithreaded gunicorn/celery workers to the maximum DB connection pool
size. Most of the time these limits are fine since the API requests are
relatively quick and are mainly interacting with the database anyway.

Service callbacks however have to make an HTTP request to a third party.
If these requests start taking a long time and the number of threads is
larger than the number of DB connections then remaining threads will
start blocking and potentially failing if it takes more than 30s to
acquire a connection. For example if a 100 threads start running tasks
that take 20s each with a max DB connection pool size of 10 then first 10
threads will acquire a connection right away, next 10 tasks will block for
20 seconds before the initial connections are released and all other tasks
will raise a TimeoutError after 30 seconds.

To avoid this, we perform all database operations at the beginning of
the task and then explicitly close the DB session before sending the
HTTP request to the service callback URL. Closing the session ends
the transaction and frees up the connection, making it available for
other tasks. Making calls to the DB after calling `close` will acquire
a new connection. This means that tasks are still limited to running
at most 15 queries at the same time, but can have a lot more concurrent
HTTP requests in progress.
2018-02-13 16:44:30 +00:00

78 lines
2.7 KiB
Python

import json
from notifications_utils.statsd_decorators import statsd
from app import (
db,
DATETIME_FORMAT,
notify_celery,
)
from app.dao.notifications_dao import (
get_notification_by_id,
)
from app.dao.service_callback_api_dao import get_service_callback_api_for_service
from requests import (
HTTPError,
request,
RequestException
)
from flask import current_app
from app.config import QueueNames
@notify_celery.task(bind=True, name="send-delivery-status", max_retries=5, default_retry_delay=300)
@statsd(namespace="tasks")
def send_delivery_status_to_service(self, notification_id):
# TODO: do we need to do rate limit this?
notification = get_notification_by_id(notification_id)
service_callback_api = get_service_callback_api_for_service(service_id=notification.service_id)
if not service_callback_api:
# No delivery receipt API info set
return
# Release DB connection before performing an external HTTP request
db.session.close()
data = {
"id": str(notification_id),
"reference": str(notification.client_reference),
"to": notification.to,
"status": notification.status,
"created_at": notification.created_at.strftime(DATETIME_FORMAT), # the time GOV.UK email sent the request
"completed_at": notification.updated_at.strftime(DATETIME_FORMAT), # the last time the status was updated
"sent_at": notification.sent_at.strftime(DATETIME_FORMAT), # the time the email was sent
"notification_type": notification.notification_type
}
try:
response = request(
method="POST",
url=service_callback_api.url,
data=json.dumps(data),
headers={
'Content-Type': 'application/json',
'Authorization': 'Bearer {}'.format(service_callback_api.bearer_token)
},
timeout=60
)
current_app.logger.info('send_delivery_status_to_service sending {} to {}, response {}'.format(
notification_id,
service_callback_api.url,
response.status_code
))
response.raise_for_status()
except RequestException as e:
current_app.logger.warning(
"send_delivery_status_to_service request failed for service_id: {} and url: {}. exc: {}".format(
notification_id,
service_callback_api.url,
e
)
)
if not isinstance(e, HTTPError) or e.response.status_code >= 500:
try:
self.retry(queue=QueueNames.RETRY)
except self.MaxRetriesExceededError:
current_app.logger.exception('Retry: send_delivery_status_to_service has retried the max num of times')