Files
notifications-api/app/celery/tasks.py
Kenneth Kehl cbed8b8104 retry
2023-07-12 14:52:40 -07:00

403 lines
15 KiB
Python

import json
from datetime import datetime
from flask import current_app
from notifications_utils.recipients import RecipientCSV
from requests import HTTPError, RequestException, request
from sqlalchemy.exc import IntegrityError, SQLAlchemyError
from app import create_uuid, encryption, notify_celery
from app.aws import s3
from app.celery import provider_tasks
from app.config import QueueNames
from app.dao.inbound_sms_dao import dao_get_inbound_sms_by_id
from app.dao.jobs_dao import dao_get_job_by_id, dao_update_job
from app.dao.notifications_dao import (
dao_get_last_notification_added_for_job_id,
get_notification_by_id,
)
from app.dao.service_email_reply_to_dao import dao_get_reply_to_by_id
from app.dao.service_inbound_api_dao import get_service_inbound_api_for_service
from app.dao.service_sms_sender_dao import dao_get_service_sms_senders_by_id
from app.dao.templates_dao import dao_get_template_by_id
from app.models import (
EMAIL_TYPE,
JOB_STATUS_CANCELLED,
JOB_STATUS_FINISHED,
JOB_STATUS_IN_PROGRESS,
JOB_STATUS_PENDING,
KEY_TYPE_NORMAL,
SMS_TYPE,
)
from app.notifications.process_notifications import persist_notification
from app.serialised_models import SerialisedService, SerialisedTemplate
from app.service.utils import service_allowed_to_send_to
from app.utils import DATETIME_FORMAT
@notify_celery.task(name="process-job")
def process_job(job_id, sender_id=None):
start = datetime.utcnow()
job = dao_get_job_by_id(job_id)
current_app.logger.info("Starting process-job task for job id {} with status: {}".format(job_id, job.job_status))
if job.job_status != JOB_STATUS_PENDING:
return
service = job.service
job.job_status = JOB_STATUS_IN_PROGRESS
job.processing_started = start
dao_update_job(job)
if not service.active:
job.job_status = JOB_STATUS_CANCELLED
dao_update_job(job)
current_app.logger.warning(
"Job {} has been cancelled, service {} is inactive".format(job_id, service.id))
return
recipient_csv, template, sender_id = get_recipient_csv_and_template_and_sender_id(job)
current_app.logger.info("Starting job {} processing {} notifications".format(job_id, job.notification_count))
for row in recipient_csv.get_rows():
process_row(row, template, job, service, sender_id=sender_id)
job_complete(job, start=start)
def job_complete(job, resumed=False, start=None):
job.job_status = JOB_STATUS_FINISHED
finished = datetime.utcnow()
job.processing_finished = finished
dao_update_job(job)
if resumed:
current_app.logger.info(
"Resumed Job {} completed at {}".format(job.id, job.created_at)
)
else:
current_app.logger.info(
"Job {} created at {} started at {} finished at {}".format(job.id, job.created_at, start, finished)
)
def get_recipient_csv_and_template_and_sender_id(job):
db_template = dao_get_template_by_id(job.template_id, job.template_version)
template = db_template._as_utils_template()
contents, meta_data = s3.get_job_and_metadata_from_s3(service_id=str(job.service_id), job_id=str(job.id))
recipient_csv = RecipientCSV(contents, template=template)
return recipient_csv, template, meta_data.get("sender_id")
def process_row(row, template, job, service, sender_id=None):
template_type = template.template_type
encrypted = encryption.encrypt({
'template': str(template.id),
'template_version': job.template_version,
'job': str(job.id),
'to': row.recipient,
'row_number': row.index,
'personalisation': dict(row.personalisation)
})
send_fns = {
SMS_TYPE: save_sms,
EMAIL_TYPE: save_email
}
send_fn = send_fns[template_type]
task_kwargs = {}
if sender_id:
task_kwargs['sender_id'] = sender_id
notification_id = create_uuid()
send_fn.apply_async(
(
str(service.id),
notification_id,
encrypted,
),
task_kwargs,
queue=QueueNames.DATABASE if not service.research_mode else QueueNames.RESEARCH_MODE
)
return notification_id
@notify_celery.task(bind=True, name="save-sms", max_retries=5, default_retry_delay=300)
def save_sms(self,
service_id,
notification_id,
encrypted_notification,
sender_id=None):
notification = encryption.decrypt(encrypted_notification)
service = SerialisedService.from_id(service_id)
template = SerialisedTemplate.from_id_and_service_id(
notification['template'],
service_id=service.id,
version=notification['template_version'],
)
if sender_id:
reply_to_text = dao_get_service_sms_senders_by_id(service_id, sender_id).sms_sender
else:
reply_to_text = template.reply_to_text
if not service_allowed_to_send_to(notification['to'], service, KEY_TYPE_NORMAL):
current_app.logger.debug(
"SMS {} failed as restricted service".format(notification_id)
)
return
try:
saved_notification = persist_notification(
template_id=notification['template'],
template_version=notification['template_version'],
recipient=notification['to'],
service=service,
personalisation=notification.get('personalisation'),
notification_type=SMS_TYPE,
api_key_id=None,
key_type=KEY_TYPE_NORMAL,
created_at=datetime.utcnow(),
job_id=notification.get('job', None),
job_row_number=notification.get('row_number', None),
notification_id=notification_id,
reply_to_text=reply_to_text
)
provider_tasks.deliver_sms.apply_async(
[str(saved_notification.id)],
queue=QueueNames.SEND_SMS if not service.research_mode else QueueNames.RESEARCH_MODE
)
current_app.logger.debug(
"SMS {} created at {} for job {}".format(
saved_notification.id,
saved_notification.created_at,
notification.get('job', None))
)
except SQLAlchemyError as e:
handle_exception(self, notification, notification_id, e)
@notify_celery.task(bind=True, name="save-email", max_retries=5, default_retry_delay=300)
def save_email(self,
service_id,
notification_id,
encrypted_notification,
sender_id=None):
notification = encryption.decrypt(encrypted_notification)
service = SerialisedService.from_id(service_id)
template = SerialisedTemplate.from_id_and_service_id(
notification['template'],
service_id=service.id,
version=notification['template_version'],
)
if sender_id:
reply_to_text = dao_get_reply_to_by_id(service_id, sender_id).email_address
else:
reply_to_text = template.reply_to_text
if not service_allowed_to_send_to(notification['to'], service, KEY_TYPE_NORMAL):
current_app.logger.info("Email {} failed as restricted service".format(notification_id))
return
try:
saved_notification = persist_notification(
template_id=notification['template'],
template_version=notification['template_version'],
recipient=notification['to'],
service=service,
personalisation=notification.get('personalisation'),
notification_type=EMAIL_TYPE,
api_key_id=None,
key_type=KEY_TYPE_NORMAL,
created_at=datetime.utcnow(),
job_id=notification.get('job', None),
job_row_number=notification.get('row_number', None),
notification_id=notification_id,
reply_to_text=reply_to_text
)
provider_tasks.deliver_email.apply_async(
[str(saved_notification.id)],
queue=QueueNames.SEND_EMAIL if not service.research_mode else QueueNames.RESEARCH_MODE
)
current_app.logger.debug("Email {} created at {}".format(saved_notification.id, saved_notification.created_at))
except SQLAlchemyError as e:
handle_exception(self, notification, notification_id, e)
@notify_celery.task(bind=True, name="save-api-email", max_retries=5, default_retry_delay=300)
def save_api_email(self, encrypted_notification):
save_api_email_or_sms(self, encrypted_notification)
@notify_celery.task(bind=True, name="save-api-sms", max_retries=5, default_retry_delay=300)
def save_api_sms(self, encrypted_notification):
save_api_email_or_sms(self, encrypted_notification)
def save_api_email_or_sms(self, encrypted_notification):
notification = encryption.decrypt(encrypted_notification)
service = SerialisedService.from_id(notification['service_id'])
q = QueueNames.SEND_EMAIL if notification['notification_type'] == EMAIL_TYPE else QueueNames.SEND_SMS
provider_task = provider_tasks.deliver_email if notification['notification_type'] == EMAIL_TYPE \
else provider_tasks.deliver_sms
try:
persist_notification(
notification_id=notification["id"],
template_id=notification['template_id'],
template_version=notification['template_version'],
recipient=notification['to'],
service=service,
personalisation=notification.get('personalisation'),
notification_type=notification['notification_type'],
client_reference=notification['client_reference'],
api_key_id=notification.get('api_key_id'),
key_type=KEY_TYPE_NORMAL,
created_at=notification['created_at'],
reply_to_text=notification['reply_to_text'],
status=notification['status'],
document_download_count=notification['document_download_count']
)
q = q if not service.research_mode else QueueNames.RESEARCH_MODE
provider_task.apply_async(
[notification['id']],
queue=q
)
current_app.logger.debug(
f"{notification['notification_type']} {notification['id']} has been persisted and sent to delivery queue."
)
except IntegrityError:
current_app.logger.info(f"{notification['notification_type']} {notification['id']} already exists.")
except SQLAlchemyError:
try:
self.retry(queue=QueueNames.RETRY)
except self.MaxRetriesExceededError:
current_app.logger.error(f"Max retry failed Failed to persist notification {notification['id']}")
def handle_exception(task, notification, notification_id, exc):
if not get_notification_by_id(notification_id):
retry_msg = '{task} notification for job {job} row number {row} and notification id {noti}'.format(
task=task.__name__,
job=notification.get('job', None),
row=notification.get('row_number', None),
noti=notification_id
)
# Sometimes, SQS plays the same message twice. We should be able to catch an IntegrityError, but it seems
# SQLAlchemy is throwing a FlushError. So we check if the notification id already exists then do not
# send to the retry queue.
# This probably (hopefully) is not an issue with Redis as the celery backing store
current_app.logger.exception('Retry' + retry_msg)
try:
task.retry(queue=QueueNames.RETRY, exc=exc)
except task.MaxRetriesExceededError:
current_app.logger.error('Max retry failed' + retry_msg)
@notify_celery.task(bind=True, name="send-inbound-sms", max_retries=5, default_retry_delay=300)
def send_inbound_sms_to_service(self, inbound_sms_id, service_id):
inbound_api = get_service_inbound_api_for_service(service_id=service_id)
if not inbound_api:
# No API data has been set for this service
return
inbound_sms = dao_get_inbound_sms_by_id(service_id=service_id,
inbound_id=inbound_sms_id)
data = {
"id": str(inbound_sms.id),
# TODO: should we be validating and formatting the phone number here?
"source_number": inbound_sms.user_number,
"destination_number": inbound_sms.notify_number,
"message": inbound_sms.content,
"date_received": inbound_sms.provider_date.strftime(DATETIME_FORMAT)
}
try:
response = request(
method="POST",
url=inbound_api.url,
data=json.dumps(data),
headers={
'Content-Type': 'application/json',
'Authorization': 'Bearer {}'.format(inbound_api.bearer_token)
},
timeout=60
)
current_app.logger.debug(
f"send_inbound_sms_to_service sending {inbound_sms_id} to {inbound_api.url}, " +
f"response {response.status_code}"
)
response.raise_for_status()
except RequestException as e:
current_app.logger.warning(
f"send_inbound_sms_to_service failed for service_id: {service_id} for inbound_sms_id: {inbound_sms_id} " +
f"and url: {inbound_api.url}. exception: {e}"
)
if not isinstance(e, HTTPError) or e.response.status_code >= 500:
try:
self.retry(queue=QueueNames.RETRY)
except self.MaxRetriesExceededError:
current_app.logger.error(
"Retry: send_inbound_sms_to_service has retried the max number of" +
f"times for service: {service_id} and inbound_sms {inbound_sms_id}"
)
else:
current_app.logger.warning(
f"send_inbound_sms_to_service is not being retried for service_id: {service_id} for " +
f"inbound_sms id: {inbound_sms_id} and url: {inbound_api.url}. exception: {e}"
)
@notify_celery.task(name='process-incomplete-jobs')
def process_incomplete_jobs(job_ids):
jobs = [dao_get_job_by_id(job_id) for job_id in job_ids]
# reset the processing start time so that the check_job_status scheduled task doesn't pick this job up again
for job in jobs:
job.job_status = JOB_STATUS_IN_PROGRESS
job.processing_started = datetime.utcnow()
dao_update_job(job)
current_app.logger.info("Resuming Job(s) {}".format(job_ids))
for job_id in job_ids:
process_incomplete_job(job_id)
def process_incomplete_job(job_id):
job = dao_get_job_by_id(job_id)
last_notification_added = dao_get_last_notification_added_for_job_id(job_id)
if last_notification_added:
resume_from_row = last_notification_added.job_row_number
else:
resume_from_row = -1 # The first row in the csv with a number is row 0
current_app.logger.info("Resuming job {} from row {}".format(job_id, resume_from_row))
recipient_csv, template, sender_id = get_recipient_csv_and_template_and_sender_id(job)
for row in recipient_csv.get_rows():
if row.index > resume_from_row:
process_row(row, template, job, job.service, sender_id=sender_id)
job_complete(job, resumed=True)