Files
notifications-api/app/celery/service_callback_tasks.py

141 lines
5.3 KiB
Python
Raw Normal View History

import json
from flask import current_app
2021-03-10 13:55:06 +00:00
from requests import HTTPError, RequestException, request
from app import encryption, notify_celery
from app.config import QueueNames
from app.utils import DATETIME_FORMAT
@notify_celery.task(bind=True, name="send-delivery-status", max_retries=5, default_retry_delay=300)
def send_delivery_status_to_service(
self, notification_id, encrypted_status_update
):
status_update = encryption.decrypt(encrypted_status_update)
data = {
"id": str(notification_id),
"reference": status_update['notification_client_reference'],
"to": status_update['notification_to'],
"status": status_update['notification_status'],
"created_at": status_update['notification_created_at'],
"completed_at": status_update['notification_updated_at'],
"sent_at": status_update['notification_sent_at'],
"notification_type": status_update['notification_type'],
"template_id": status_update['template_id'],
"template_version": status_update['template_version']
}
_send_data_to_service_callback_api(
self,
data,
status_update['service_callback_api_url'],
status_update['service_callback_api_bearer_token'],
'send_delivery_status_to_service'
)
@notify_celery.task(bind=True, name="send-complaint", max_retries=5, default_retry_delay=300)
def send_complaint_to_service(self, complaint_data):
complaint = encryption.decrypt(complaint_data)
data = {
"notification_id": complaint['notification_id'],
"complaint_id": complaint['complaint_id'],
"reference": complaint['reference'],
"to": complaint['to'],
"complaint_date": complaint['complaint_date']
}
_send_data_to_service_callback_api(
self,
data,
complaint['service_callback_api_url'],
complaint['service_callback_api_bearer_token'],
'send_complaint_to_service'
)
Release DB connection before executing service API callback Flask-SQLAlchemy sets up a connection pool with 5 connections and will create up to 10 additional connections if all the pool ones are in use. If all connections in the pool and all overflow connections are in use, SQLAlchemy will block new DB sessions until a connection becomes available. If a session can't acquire a connections for a specified time (we set it to 30s) then a TimeoutError is raised. By default db.session is deleted with the related context object (so when the request is finished or app context is discarded). This effectively limits the number of concurrent requests/tasks with multithreaded gunicorn/celery workers to the maximum DB connection pool size. Most of the time these limits are fine since the API requests are relatively quick and are mainly interacting with the database anyway. Service callbacks however have to make an HTTP request to a third party. If these requests start taking a long time and the number of threads is larger than the number of DB connections then remaining threads will start blocking and potentially failing if it takes more than 30s to acquire a connection. For example if a 100 threads start running tasks that take 20s each with a max DB connection pool size of 10 then first 10 threads will acquire a connection right away, next 10 tasks will block for 20 seconds before the initial connections are released and all other tasks will raise a TimeoutError after 30 seconds. To avoid this, we perform all database operations at the beginning of the task and then explicitly close the DB session before sending the HTTP request to the service callback URL. Closing the session ends the transaction and frees up the connection, making it available for other tasks. Making calls to the DB after calling `close` will acquire a new connection. This means that tasks are still limited to running at most 15 queries at the same time, but can have a lot more concurrent HTTP requests in progress.
2018-02-12 15:44:05 +00:00
def _send_data_to_service_callback_api(self, data, service_callback_url, token, function_name):
notification_id = (data["notification_id"] if "notification_id" in data else data["id"])
try:
response = request(
method="POST",
url=service_callback_url,
data=json.dumps(data),
headers={
'Content-Type': 'application/json',
'Authorization': 'Bearer {}'.format(token)
},
timeout=60
)
current_app.logger.info('{} sending {} to {}, response {}'.format(
function_name,
notification_id,
service_callback_url,
response.status_code
))
response.raise_for_status()
except RequestException as e:
current_app.logger.warning(
"{} request failed for notification_id: {} and url: {}. exception: {}".format(
function_name,
notification_id,
service_callback_url,
e
)
)
if not isinstance(e, HTTPError) or e.response.status_code >= 500:
try:
self.retry(queue=QueueNames.CALLBACKS_RETRY)
except self.MaxRetriesExceededError:
current_app.logger.warning(
"Retry: {} has retried the max num of times for callback url {} and notification_id: {}".format(
function_name,
service_callback_url,
notification_id
)
)
else:
current_app.logger.warning(
"{} callback is not being retried for notification_id: {} and url: {}. exception: {}".format(
function_name,
notification_id,
service_callback_url,
e
)
)
def create_delivery_status_callback_data(notification, service_callback_api):
data = {
"notification_id": str(notification.id),
"notification_client_reference": notification.client_reference,
"notification_to": notification.to,
"notification_status": notification.status,
"notification_created_at": notification.created_at.strftime(DATETIME_FORMAT),
"notification_updated_at":
notification.updated_at.strftime(DATETIME_FORMAT) if notification.updated_at else None,
"notification_sent_at": notification.sent_at.strftime(DATETIME_FORMAT) if notification.sent_at else None,
"notification_type": notification.notification_type,
"service_callback_api_url": service_callback_api.url,
"service_callback_api_bearer_token": service_callback_api.bearer_token,
"template_id": str(notification.template_id),
"template_version": notification.template_version,
}
return encryption.encrypt(data)
def create_complaint_callback_data(complaint, notification, service_callback_api, recipient):
data = {
"complaint_id": str(complaint.id),
"notification_id": str(notification.id),
"reference": notification.client_reference,
"to": recipient,
"complaint_date": complaint.complaint_date.strftime(DATETIME_FORMAT),
"service_callback_api_url": service_callback_api.url,
"service_callback_api_bearer_token": service_callback_api.bearer_token,
}
return encryption.encrypt(data)