2017-06-20 17:13:40 +01:00
|
|
|
import json
|
2024-12-20 09:58:03 -08:00
|
|
|
from time import sleep
|
2016-11-25 17:32:01 +00:00
|
|
|
|
2024-12-18 14:09:22 -08:00
|
|
|
from celery.signals import task_postrun
|
2016-03-31 15:57:50 +01:00
|
|
|
from flask import current_app
|
2021-03-10 13:55:06 +00:00
|
|
|
from requests import HTTPError, RequestException, request
|
|
|
|
|
from sqlalchemy.exc import IntegrityError, SQLAlchemyError
|
2017-11-27 15:11:58 +00:00
|
|
|
|
2023-03-02 20:20:31 -05:00
|
|
|
from app import create_uuid, encryption, notify_celery
|
2016-03-31 15:57:50 +01:00
|
|
|
from app.aws import s3
|
2023-03-02 20:20:31 -05:00
|
|
|
from app.celery import provider_tasks
|
2024-12-04 07:37:59 -08:00
|
|
|
from app.config import Config, QueueNames
|
2021-03-10 13:55:06 +00:00
|
|
|
from app.dao.inbound_sms_dao import dao_get_inbound_sms_by_id
|
|
|
|
|
from app.dao.jobs_dao import dao_get_job_by_id, dao_update_job
|
2017-09-20 11:12:37 +01:00
|
|
|
from app.dao.notifications_dao import (
|
2017-12-01 21:59:46 +00:00
|
|
|
dao_get_last_notification_added_for_job_id,
|
2021-03-10 13:55:06 +00:00
|
|
|
get_notification_by_id,
|
2017-10-27 13:53:55 +01:00
|
|
|
)
|
2018-11-01 17:00:44 +00:00
|
|
|
from app.dao.service_email_reply_to_dao import dao_get_reply_to_by_id
|
2017-06-20 17:13:40 +01:00
|
|
|
from app.dao.service_inbound_api_dao import get_service_inbound_api_for_service
|
2018-11-01 17:00:44 +00:00
|
|
|
from app.dao.service_sms_sender_dao import dao_get_service_sms_senders_by_id
|
2016-06-20 13:33:53 +01:00
|
|
|
from app.dao.templates_dao import dao_get_template_by_id
|
2024-01-16 07:37:21 -05:00
|
|
|
from app.enums import JobStatus, KeyType, NotificationType
|
2024-05-30 12:27:07 -07:00
|
|
|
from app.errors import TotalRequestsError
|
2024-12-03 13:50:42 -08:00
|
|
|
from app.notifications.process_notifications import (
|
|
|
|
|
get_notification,
|
|
|
|
|
persist_notification,
|
|
|
|
|
)
|
2023-08-31 10:28:44 -04:00
|
|
|
from app.notifications.validators import check_service_over_total_message_limit
|
2020-09-26 15:11:38 +01:00
|
|
|
from app.serialised_models import SerialisedService, SerialisedTemplate
|
2021-03-10 13:55:06 +00:00
|
|
|
from app.service.utils import service_allowed_to_send_to
|
2024-06-11 10:34:57 -07:00
|
|
|
from app.utils import DATETIME_FORMAT, hilite, utc_now
|
2024-05-16 10:17:45 -04:00
|
|
|
from notifications_utils.recipients import RecipientCSV
|
2016-03-31 15:57:50 +01:00
|
|
|
|
2016-03-09 14:41:36 +00:00
|
|
|
|
2016-02-24 17:12:30 +00:00
|
|
|
@notify_celery.task(name="process-job")
|
2018-11-01 17:00:44 +00:00
|
|
|
def process_job(job_id, sender_id=None):
|
2024-03-28 09:35:53 -06:00
|
|
|
"""Update job status, get csv data from s3, and begin processing csv rows."""
|
2024-05-23 13:59:51 -07:00
|
|
|
start = utc_now()
|
2016-02-24 17:12:30 +00:00
|
|
|
job = dao_get_job_by_id(job_id)
|
2023-08-29 14:54:30 -07:00
|
|
|
current_app.logger.info(
|
2024-12-26 08:35:58 -05:00
|
|
|
f"Starting process-job task for job id {job_id} with status: {job.job_status}"
|
2023-08-29 14:54:30 -07:00
|
|
|
)
|
2016-03-09 11:28:52 +00:00
|
|
|
|
2024-01-15 14:22:56 -05:00
|
|
|
if job.job_status != JobStatus.PENDING:
|
2016-10-07 12:54:04 +01:00
|
|
|
return
|
|
|
|
|
|
2016-03-09 11:28:52 +00:00
|
|
|
service = job.service
|
|
|
|
|
|
2024-01-15 14:22:56 -05:00
|
|
|
job.job_status = JobStatus.IN_PROGRESS
|
2021-06-15 07:58:17 +01:00
|
|
|
job.processing_started = start
|
|
|
|
|
dao_update_job(job)
|
|
|
|
|
|
2017-02-02 11:34:00 +00:00
|
|
|
if not service.active:
|
2024-01-15 14:22:56 -05:00
|
|
|
job.job_status = JobStatus.CANCELLED
|
2017-02-02 11:34:00 +00:00
|
|
|
dao_update_job(job)
|
2018-09-19 10:49:11 +01:00
|
|
|
current_app.logger.warning(
|
2024-12-26 08:35:58 -05:00
|
|
|
f"Job {job_id} has been cancelled, service {service.id} is inactive".format(
|
2023-08-29 14:54:30 -07:00
|
|
|
job_id, service.id
|
|
|
|
|
)
|
|
|
|
|
)
|
2023-04-28 12:37:06 -07:00
|
|
|
return
|
|
|
|
|
|
|
|
|
|
if __total_sending_limits_for_job_exceeded(service, job, job_id):
|
2016-03-09 13:57:53 +00:00
|
|
|
return
|
2016-03-09 11:28:52 +00:00
|
|
|
|
2023-08-29 14:54:30 -07:00
|
|
|
recipient_csv, template, sender_id = get_recipient_csv_and_template_and_sender_id(
|
|
|
|
|
job
|
|
|
|
|
)
|
2016-03-09 07:27:26 +00:00
|
|
|
|
2023-08-29 14:54:30 -07:00
|
|
|
current_app.logger.info(
|
2024-12-26 08:35:58 -05:00
|
|
|
f"Starting job {job_id} processing {job.notification_count} notifications"
|
2023-08-29 14:54:30 -07:00
|
|
|
)
|
2017-10-10 15:04:55 +01:00
|
|
|
|
2024-12-20 09:58:03 -08:00
|
|
|
# notify-api-1495 we are going to sleep periodically to give other
|
|
|
|
|
# jobs running at the same time a chance to get some of their messages
|
2024-12-23 13:51:21 -08:00
|
|
|
# sent. Sleep for 1 second after every 3 sends, which gives us throughput
|
|
|
|
|
# of about 3600*3 per hour and would keep the queue clear assuming only one sender.
|
|
|
|
|
# It will also hopefully eliminate throttling when we send messages which we are
|
|
|
|
|
# currently seeing.
|
2024-12-20 09:58:03 -08:00
|
|
|
count = 0
|
2019-11-05 16:47:00 +00:00
|
|
|
for row in recipient_csv.get_rows():
|
2018-11-05 16:16:48 +00:00
|
|
|
process_row(row, template, job, service, sender_id=sender_id)
|
2024-12-20 09:58:03 -08:00
|
|
|
count = count + 1
|
2024-12-23 13:51:21 -08:00
|
|
|
if count % 3 == 0:
|
|
|
|
|
sleep(1)
|
2016-02-24 17:12:30 +00:00
|
|
|
|
2024-03-18 11:32:29 -06:00
|
|
|
# End point/Exit point for message send flow.
|
2018-02-13 18:38:32 +00:00
|
|
|
job_complete(job, start=start)
|
2017-10-13 16:46:17 +01:00
|
|
|
|
|
|
|
|
|
2018-02-13 18:38:32 +00:00
|
|
|
def job_complete(job, resumed=False, start=None):
|
2024-01-15 14:22:56 -05:00
|
|
|
job.job_status = JobStatus.FINISHED
|
2017-04-05 11:57:56 +01:00
|
|
|
|
2024-05-23 13:59:51 -07:00
|
|
|
finished = utc_now()
|
2016-02-25 11:23:04 +00:00
|
|
|
job.processing_finished = finished
|
2016-02-24 17:12:30 +00:00
|
|
|
dao_update_job(job)
|
2017-10-13 16:46:17 +01:00
|
|
|
|
|
|
|
|
if resumed:
|
|
|
|
|
current_app.logger.info(
|
2017-12-08 17:32:18 +00:00
|
|
|
"Resumed Job {} completed at {}".format(job.id, job.created_at)
|
2017-10-13 16:46:17 +01:00
|
|
|
)
|
|
|
|
|
else:
|
|
|
|
|
current_app.logger.info(
|
2023-08-29 14:54:30 -07:00
|
|
|
"Job {} created at {} started at {} finished at {}".format(
|
|
|
|
|
job.id, job.created_at, start, finished
|
|
|
|
|
)
|
2017-10-13 16:46:17 +01:00
|
|
|
)
|
2016-02-09 13:31:45 +00:00
|
|
|
|
|
|
|
|
|
2019-11-08 10:30:26 +00:00
|
|
|
def get_recipient_csv_and_template_and_sender_id(job):
|
2019-11-05 16:47:00 +00:00
|
|
|
db_template = dao_get_template_by_id(job.template_id, job.template_version)
|
2020-04-06 12:50:22 +01:00
|
|
|
template = db_template._as_utils_template()
|
2019-11-05 16:47:00 +00:00
|
|
|
|
2023-08-29 14:54:30 -07:00
|
|
|
contents, meta_data = s3.get_job_and_metadata_from_s3(
|
|
|
|
|
service_id=str(job.service_id), job_id=str(job.id)
|
|
|
|
|
)
|
2020-04-20 15:18:40 +01:00
|
|
|
recipient_csv = RecipientCSV(contents, template=template)
|
2019-11-08 10:30:26 +00:00
|
|
|
|
|
|
|
|
return recipient_csv, template, meta_data.get("sender_id")
|
2019-11-05 16:47:00 +00:00
|
|
|
|
|
|
|
|
|
2018-11-05 16:16:48 +00:00
|
|
|
def process_row(row, template, job, service, sender_id=None):
|
2024-03-27 10:32:40 -06:00
|
|
|
"""Branch off based on notification type, sms or email."""
|
2017-01-17 12:00:34 +00:00
|
|
|
template_type = template.template_type
|
2023-08-29 14:54:30 -07:00
|
|
|
encrypted = encryption.encrypt(
|
|
|
|
|
{
|
|
|
|
|
"template": str(template.id),
|
|
|
|
|
"template_version": job.template_version,
|
|
|
|
|
"job": str(job.id),
|
|
|
|
|
"to": row.recipient,
|
|
|
|
|
"row_number": row.index,
|
|
|
|
|
"personalisation": dict(row.personalisation),
|
|
|
|
|
}
|
|
|
|
|
)
|
|
|
|
|
|
2024-03-18 11:32:29 -06:00
|
|
|
# Both save_sms and save_email have the same general
|
|
|
|
|
# persist logic.
|
2024-02-28 12:40:52 -05:00
|
|
|
send_fns = {NotificationType.SMS: save_sms, NotificationType.EMAIL: save_email}
|
2017-01-17 12:00:34 +00:00
|
|
|
|
|
|
|
|
send_fn = send_fns[template_type]
|
|
|
|
|
|
2018-11-05 16:16:48 +00:00
|
|
|
task_kwargs = {}
|
|
|
|
|
if sender_id:
|
2023-08-29 14:54:30 -07:00
|
|
|
task_kwargs["sender_id"] = sender_id
|
2018-11-05 16:16:48 +00:00
|
|
|
|
2019-09-26 14:19:09 +01:00
|
|
|
notification_id = create_uuid()
|
2024-03-18 11:32:29 -06:00
|
|
|
# Kick-off persisting notification in save_sms/save_email.
|
2017-01-18 11:29:38 +00:00
|
|
|
send_fn.apply_async(
|
|
|
|
|
(
|
|
|
|
|
str(service.id),
|
2019-09-26 14:19:09 +01:00
|
|
|
notification_id,
|
2017-01-18 11:29:38 +00:00
|
|
|
encrypted,
|
|
|
|
|
),
|
2018-11-05 16:16:48 +00:00
|
|
|
task_kwargs,
|
2023-08-29 14:54:30 -07:00
|
|
|
queue=QueueNames.DATABASE,
|
2024-12-04 07:37:59 -08:00
|
|
|
expires=Config.DEFAULT_REDIS_EXPIRE_TIME,
|
2017-01-17 12:00:34 +00:00
|
|
|
)
|
2019-09-26 14:19:09 +01:00
|
|
|
return notification_id
|
2017-01-17 12:00:34 +00:00
|
|
|
|
|
|
|
|
|
2025-01-23 10:12:17 -08:00
|
|
|
# TODO
|
|
|
|
|
# Originally this was checking a daily limit
|
|
|
|
|
# It is now checking an overall limit (annual?) for the free tier
|
|
|
|
|
# Is there any limit for the paid tier?
|
|
|
|
|
# Assuming the limit is annual, is it calendar year, fiscal year, MOU year?
|
|
|
|
|
# Do we need a command to run to clear the redis value, or should it happen automatically?
|
2023-04-28 12:37:06 -07:00
|
|
|
def __total_sending_limits_for_job_exceeded(service, job, job_id):
|
2025-01-23 13:28:26 -08:00
|
|
|
print(hilite("ENTER __total_sending_limits_for_job_exceeded"))
|
2023-04-28 12:37:06 -07:00
|
|
|
try:
|
2024-01-18 10:28:50 -05:00
|
|
|
total_sent = check_service_over_total_message_limit(KeyType.NORMAL, service)
|
2023-04-28 12:37:06 -07:00
|
|
|
if total_sent + job.notification_count > service.total_message_limit:
|
2023-08-31 10:57:54 -04:00
|
|
|
raise TotalRequestsError(service.total_message_limit)
|
2023-04-28 12:37:06 -07:00
|
|
|
else:
|
|
|
|
|
return False
|
2023-08-31 10:57:54 -04:00
|
|
|
except TotalRequestsError:
|
|
|
|
|
job.job_status = "sending limits exceeded"
|
2024-05-23 13:59:51 -07:00
|
|
|
job.processing_finished = utc_now()
|
2023-04-28 12:37:06 -07:00
|
|
|
dao_update_job(job)
|
2024-09-11 09:39:18 -07:00
|
|
|
current_app.logger.exception(
|
2023-04-28 12:37:06 -07:00
|
|
|
"Job {} size {} error. Total sending limits {} exceeded".format(
|
2023-08-31 10:57:54 -04:00
|
|
|
job_id, job.notification_count, service.message_limit
|
2024-08-15 10:31:02 -07:00
|
|
|
),
|
2016-11-11 10:41:39 +00:00
|
|
|
)
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
|
2024-12-18 14:09:22 -08:00
|
|
|
@task_postrun.connect
|
|
|
|
|
def log_task_ejection(sender=None, task_id=None, **kwargs):
|
|
|
|
|
current_app.logger.info(
|
|
|
|
|
f"Task {task_id} ({sender.name if sender else 'unknown_task'}) has been completed and removed"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
2024-12-12 11:49:51 -08:00
|
|
|
@notify_celery.task(bind=True, name="save-sms", max_retries=2, default_retry_delay=600)
|
2023-08-29 14:54:30 -07:00
|
|
|
def save_sms(self, service_id, notification_id, encrypted_notification, sender_id=None):
|
2024-03-27 10:32:40 -06:00
|
|
|
"""Persist notification to db and place notification in queue to send to sns."""
|
2017-10-18 17:00:37 +01:00
|
|
|
notification = encryption.decrypt(encrypted_notification)
|
2024-03-18 11:32:29 -06:00
|
|
|
# SerialisedService and SerialisedTemplate classes are
|
|
|
|
|
# used here to grab the same service and template from the cache
|
|
|
|
|
# to improve performance.
|
2020-09-26 15:11:38 +01:00
|
|
|
service = SerialisedService.from_id(service_id)
|
|
|
|
|
template = SerialisedTemplate.from_id_and_service_id(
|
2023-08-29 14:54:30 -07:00
|
|
|
notification["template"],
|
2020-09-26 15:11:38 +01:00
|
|
|
service_id=service.id,
|
2023-08-29 14:54:30 -07:00
|
|
|
version=notification["template_version"],
|
2020-09-26 15:11:38 +01:00
|
|
|
)
|
2017-10-18 17:00:37 +01:00
|
|
|
|
2018-11-01 17:00:44 +00:00
|
|
|
if sender_id:
|
2023-08-29 14:54:30 -07:00
|
|
|
reply_to_text = dao_get_service_sms_senders_by_id(
|
|
|
|
|
service_id, sender_id
|
|
|
|
|
).sms_sender
|
2018-11-01 17:00:44 +00:00
|
|
|
else:
|
2020-09-26 15:11:38 +01:00
|
|
|
reply_to_text = template.reply_to_text
|
2024-03-18 11:32:29 -06:00
|
|
|
# Return False when trial mode services try sending notifications
|
|
|
|
|
# to non-team and non-simulated recipients.
|
2024-01-18 10:28:50 -05:00
|
|
|
if not service_allowed_to_send_to(notification["to"], service, KeyType.NORMAL):
|
2024-06-11 10:34:57 -07:00
|
|
|
current_app.logger.info(
|
2024-06-06 13:18:00 -07:00
|
|
|
hilite(
|
2024-06-17 11:12:30 -07:00
|
|
|
f"service not allowed to send for job_id {notification.get('job', None)}, aborting"
|
2024-06-06 13:18:00 -07:00
|
|
|
)
|
2024-06-11 10:34:57 -07:00
|
|
|
)
|
2024-12-27 12:17:44 -05:00
|
|
|
current_app.logger.debug(f"SMS {notification_id} failed as restricted service")
|
2017-10-18 17:00:37 +01:00
|
|
|
return
|
|
|
|
|
|
|
|
|
|
try:
|
2023-09-21 13:50:36 -07:00
|
|
|
job_id = notification.get("job", None)
|
|
|
|
|
created_by_id = None
|
|
|
|
|
if job_id:
|
|
|
|
|
job = dao_get_job_by_id(job_id)
|
|
|
|
|
created_by_id = job.created_by_id
|
|
|
|
|
|
2024-12-23 11:17:51 -05:00
|
|
|
try:
|
|
|
|
|
saved_notification = persist_notification(
|
|
|
|
|
template_id=notification["template"],
|
|
|
|
|
template_version=notification["template_version"],
|
|
|
|
|
recipient=notification["to"],
|
|
|
|
|
service=service,
|
|
|
|
|
personalisation=notification.get("personalisation"),
|
|
|
|
|
notification_type=NotificationType.SMS,
|
|
|
|
|
api_key_id=None,
|
|
|
|
|
key_type=KeyType.NORMAL,
|
|
|
|
|
created_at=utc_now(),
|
|
|
|
|
created_by_id=created_by_id,
|
|
|
|
|
job_id=notification.get("job", None),
|
|
|
|
|
job_row_number=notification.get("row_number", None),
|
|
|
|
|
notification_id=notification_id,
|
|
|
|
|
reply_to_text=reply_to_text,
|
|
|
|
|
)
|
2024-12-26 08:35:58 -05:00
|
|
|
except IntegrityError:
|
2024-12-27 12:17:44 -05:00
|
|
|
current_app.logger.warning(
|
|
|
|
|
f"{NotificationType.SMS}: {notification_id} already exists."
|
|
|
|
|
)
|
|
|
|
|
# If we don't have the return statement here, we will fall through and end
|
|
|
|
|
# up retrying because IntegrityError is a subclass of SQLAlchemyError
|
|
|
|
|
return
|
2017-10-18 17:00:37 +01:00
|
|
|
|
2024-03-18 11:32:29 -06:00
|
|
|
# Kick off sns process in provider_tasks.py
|
2024-06-17 11:12:30 -07:00
|
|
|
sn = saved_notification
|
2024-06-11 10:34:57 -07:00
|
|
|
current_app.logger.info(
|
2024-06-17 11:12:30 -07:00
|
|
|
hilite(
|
|
|
|
|
f"Deliver sms for job_id: {sn.job_id} row_number: {sn.job_row_number}"
|
|
|
|
|
)
|
2024-06-11 10:34:57 -07:00
|
|
|
)
|
2017-10-18 17:00:37 +01:00
|
|
|
provider_tasks.deliver_sms.apply_async(
|
2025-01-13 13:35:40 -08:00
|
|
|
[str(saved_notification.id)], queue=QueueNames.SEND_SMS, countdown=60
|
2017-10-18 17:00:37 +01:00
|
|
|
)
|
|
|
|
|
|
As Notify matures we probably need less logging, especially to report happy path events.
This PR is a proposal to reduce the average messages we see for a single notification from about 7 messages to 2.
Messaging would change to something like this:
February 2nd 2018, 15:39:05.885 Full delivery response from Firetext for notification: 8eda51d5-cd82-4569-bfc9-d5570cdf2126
{'status': ['0'], 'reference': ['8eda51d5-cd82-4569-bfc9-d5570cdf2126'], 'time': ['2018-02-02 15:39:01'], 'code': ['000']}
February 2nd 2018, 15:39:05.885 Firetext callback return status of 0 for reference: 8eda51d5-cd82-4569-bfc9-d5570cdf2126
February 2nd 2018, 15:38:57.727 SMS 8eda51d5-cd82-4569-bfc9-d5570cdf2126 sent to provider firetext at 2018-02-02 15:38:56.716814
February 2nd 2018, 15:38:56.727 Starting sending SMS 8eda51d5-cd82-4569-bfc9-d5570cdf2126 to provider at 2018-02-02 15:38:56.408181
February 2nd 2018, 15:38:56.727 Firetext request for 8eda51d5-cd82-4569-bfc9-d5570cdf2126 finished in 0.30376038211397827
February 2nd 2018, 15:38:49.449 sms 8eda51d5-cd82-4569-bfc9-d5570cdf2126 created at 2018-02-02 15:38:48.439113
February 2nd 2018, 15:38:49.449 sms 8eda51d5-cd82-4569-bfc9-d5570cdf2126 sent to the priority-tasks queue for delivery
To somthing like this:
February 2nd 2018, 15:39:05.885 Firetext callback return status of 0 for reference: 8eda51d5-cd82-4569-bfc9-d5570cdf2126
February 2nd 2018, 15:38:49.449 sms 8eda51d5-cd82-4569-bfc9-d5570cdf2126 created at 2018-02-02 15:38:48.439113
2018-02-02 15:55:25 +00:00
|
|
|
current_app.logger.debug(
|
2024-12-23 11:17:51 -05:00
|
|
|
f"SMS {saved_notification.id} created at {saved_notification.created_at} "
|
|
|
|
|
f"for job {notification.get('job', None)}"
|
2017-10-18 17:00:37 +01:00
|
|
|
)
|
|
|
|
|
|
|
|
|
|
except SQLAlchemyError as e:
|
|
|
|
|
handle_exception(self, notification, notification_id, e)
|
|
|
|
|
|
|
|
|
|
|
2023-08-29 14:54:30 -07:00
|
|
|
@notify_celery.task(
|
|
|
|
|
bind=True, name="save-email", max_retries=5, default_retry_delay=300
|
|
|
|
|
)
|
|
|
|
|
def save_email(
|
|
|
|
|
self, service_id, notification_id, encrypted_notification, sender_id=None
|
|
|
|
|
):
|
2017-10-18 17:00:37 +01:00
|
|
|
notification = encryption.decrypt(encrypted_notification)
|
2017-12-15 17:15:31 +00:00
|
|
|
|
2020-09-26 15:11:38 +01:00
|
|
|
service = SerialisedService.from_id(service_id)
|
|
|
|
|
template = SerialisedTemplate.from_id_and_service_id(
|
2023-08-29 14:54:30 -07:00
|
|
|
notification["template"],
|
2020-09-26 15:11:38 +01:00
|
|
|
service_id=service.id,
|
2023-08-29 14:54:30 -07:00
|
|
|
version=notification["template_version"],
|
2020-09-26 15:11:38 +01:00
|
|
|
)
|
2017-10-18 17:00:37 +01:00
|
|
|
|
2018-11-01 17:00:44 +00:00
|
|
|
if sender_id:
|
|
|
|
|
reply_to_text = dao_get_reply_to_by_id(service_id, sender_id).email_address
|
|
|
|
|
else:
|
2020-09-26 15:11:38 +01:00
|
|
|
reply_to_text = template.reply_to_text
|
2018-11-01 17:00:44 +00:00
|
|
|
|
2024-01-18 10:28:50 -05:00
|
|
|
if not service_allowed_to_send_to(notification["to"], service, KeyType.NORMAL):
|
2023-08-29 14:54:30 -07:00
|
|
|
current_app.logger.info(
|
|
|
|
|
"Email {} failed as restricted service".format(notification_id)
|
|
|
|
|
)
|
2017-10-18 17:00:37 +01:00
|
|
|
return
|
2024-12-03 14:05:25 -08:00
|
|
|
original_notification = get_notification(notification_id)
|
2017-10-18 17:00:37 +01:00
|
|
|
try:
|
|
|
|
|
saved_notification = persist_notification(
|
2023-08-29 14:54:30 -07:00
|
|
|
template_id=notification["template"],
|
|
|
|
|
template_version=notification["template_version"],
|
|
|
|
|
recipient=notification["to"],
|
2017-10-18 17:00:37 +01:00
|
|
|
service=service,
|
2023-08-29 14:54:30 -07:00
|
|
|
personalisation=notification.get("personalisation"),
|
2024-02-28 12:40:52 -05:00
|
|
|
notification_type=NotificationType.EMAIL,
|
2018-11-01 16:06:07 +00:00
|
|
|
api_key_id=None,
|
2024-01-18 10:28:50 -05:00
|
|
|
key_type=KeyType.NORMAL,
|
2024-05-23 13:59:51 -07:00
|
|
|
created_at=utc_now(),
|
2023-08-29 14:54:30 -07:00
|
|
|
job_id=notification.get("job", None),
|
|
|
|
|
job_row_number=notification.get("row_number", None),
|
2017-11-25 11:31:36 +00:00
|
|
|
notification_id=notification_id,
|
2023-08-29 14:54:30 -07:00
|
|
|
reply_to_text=reply_to_text,
|
2017-10-18 17:00:37 +01:00
|
|
|
)
|
2024-12-03 14:05:25 -08:00
|
|
|
# we only want to send once
|
|
|
|
|
if original_notification is None:
|
|
|
|
|
provider_tasks.deliver_email.apply_async(
|
|
|
|
|
[str(saved_notification.id)], queue=QueueNames.SEND_EMAIL
|
|
|
|
|
)
|
2017-10-18 17:00:37 +01:00
|
|
|
|
2023-08-29 14:54:30 -07:00
|
|
|
current_app.logger.debug(
|
|
|
|
|
"Email {} created at {}".format(
|
|
|
|
|
saved_notification.id, saved_notification.created_at
|
|
|
|
|
)
|
|
|
|
|
)
|
2017-10-18 17:00:37 +01:00
|
|
|
except SQLAlchemyError as e:
|
|
|
|
|
handle_exception(self, notification, notification_id, e)
|
|
|
|
|
|
|
|
|
|
|
2023-08-29 14:54:30 -07:00
|
|
|
@notify_celery.task(
|
|
|
|
|
bind=True, name="save-api-email", max_retries=5, default_retry_delay=300
|
|
|
|
|
)
|
2020-10-29 11:12:46 +00:00
|
|
|
def save_api_email(self, encrypted_notification):
|
|
|
|
|
save_api_email_or_sms(self, encrypted_notification)
|
|
|
|
|
|
|
|
|
|
|
2023-08-29 14:54:30 -07:00
|
|
|
@notify_celery.task(
|
2024-12-12 11:49:51 -08:00
|
|
|
bind=True, name="save-api-sms", max_retries=2, default_retry_delay=600
|
2023-08-29 14:54:30 -07:00
|
|
|
)
|
2020-10-29 11:12:46 +00:00
|
|
|
def save_api_sms(self, encrypted_notification):
|
|
|
|
|
save_api_email_or_sms(self, encrypted_notification)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def save_api_email_or_sms(self, encrypted_notification):
|
2020-03-25 07:59:05 +00:00
|
|
|
notification = encryption.decrypt(encrypted_notification)
|
2023-08-29 14:54:30 -07:00
|
|
|
service = SerialisedService.from_id(notification["service_id"])
|
|
|
|
|
q = (
|
|
|
|
|
QueueNames.SEND_EMAIL
|
2024-02-28 12:40:52 -05:00
|
|
|
if notification["notification_type"] == NotificationType.EMAIL
|
2023-08-29 14:54:30 -07:00
|
|
|
else QueueNames.SEND_SMS
|
|
|
|
|
)
|
|
|
|
|
provider_task = (
|
|
|
|
|
provider_tasks.deliver_email
|
2024-02-28 12:40:52 -05:00
|
|
|
if notification["notification_type"] == NotificationType.EMAIL
|
2020-10-29 11:12:46 +00:00
|
|
|
else provider_tasks.deliver_sms
|
2023-08-29 14:54:30 -07:00
|
|
|
)
|
2024-12-03 13:50:42 -08:00
|
|
|
|
|
|
|
|
original_notification = get_notification(notification["id"])
|
2020-03-25 07:59:05 +00:00
|
|
|
try:
|
2020-03-25 12:39:15 +00:00
|
|
|
persist_notification(
|
2020-03-25 07:59:05 +00:00
|
|
|
notification_id=notification["id"],
|
2023-08-29 14:54:30 -07:00
|
|
|
template_id=notification["template_id"],
|
|
|
|
|
template_version=notification["template_version"],
|
|
|
|
|
recipient=notification["to"],
|
2020-03-25 07:59:05 +00:00
|
|
|
service=service,
|
2023-08-29 14:54:30 -07:00
|
|
|
personalisation=notification.get("personalisation"),
|
|
|
|
|
notification_type=notification["notification_type"],
|
|
|
|
|
client_reference=notification["client_reference"],
|
|
|
|
|
api_key_id=notification.get("api_key_id"),
|
2024-01-18 10:28:50 -05:00
|
|
|
key_type=KeyType.NORMAL,
|
2023-08-29 14:54:30 -07:00
|
|
|
created_at=notification["created_at"],
|
|
|
|
|
reply_to_text=notification["reply_to_text"],
|
|
|
|
|
status=notification["status"],
|
|
|
|
|
document_download_count=notification["document_download_count"],
|
2020-03-25 07:59:05 +00:00
|
|
|
)
|
2024-12-02 12:15:32 -08:00
|
|
|
# Only get here if save to the db was successful (i.e. first time)
|
2024-12-03 13:50:42 -08:00
|
|
|
if original_notification is None:
|
|
|
|
|
provider_task.apply_async([notification["id"]], queue=q)
|
|
|
|
|
current_app.logger.debug(
|
|
|
|
|
f"{notification['id']} has been persisted and sent to delivery queue."
|
|
|
|
|
)
|
2020-03-25 07:59:05 +00:00
|
|
|
|
2020-03-25 12:39:15 +00:00
|
|
|
except IntegrityError:
|
2024-12-02 12:15:32 -08:00
|
|
|
current_app.logger.warning(
|
2023-08-29 14:54:30 -07:00
|
|
|
f"{notification['notification_type']} {notification['id']} already exists."
|
|
|
|
|
)
|
2024-12-02 11:48:26 -08:00
|
|
|
# If we don't have the return statement here, we will fall through and end
|
|
|
|
|
# up retrying because IntegrityError is a subclass of SQLAlchemyError
|
|
|
|
|
return
|
2020-03-25 07:59:05 +00:00
|
|
|
|
2020-03-26 08:44:28 +00:00
|
|
|
except SQLAlchemyError:
|
2020-03-25 07:59:05 +00:00
|
|
|
try:
|
2024-12-04 07:37:59 -08:00
|
|
|
self.retry(queue=QueueNames.RETRY, expires=Config.DEFAULT_REDIS_EXPIRE_TIME)
|
2020-03-25 07:59:05 +00:00
|
|
|
except self.MaxRetriesExceededError:
|
2024-09-11 09:39:18 -07:00
|
|
|
current_app.logger.exception(
|
2024-08-15 10:31:02 -07:00
|
|
|
f"Max retry failed Failed to persist notification {notification['id']}",
|
2023-08-29 14:54:30 -07:00
|
|
|
)
|
2020-03-25 07:59:05 +00:00
|
|
|
|
|
|
|
|
|
2017-01-17 16:51:27 +00:00
|
|
|
def handle_exception(task, notification, notification_id, exc):
|
|
|
|
|
if not get_notification_by_id(notification_id):
|
2023-08-29 14:54:30 -07:00
|
|
|
retry_msg = "{task} notification for job {job} row number {row} and notification id {noti}".format(
|
2017-01-17 16:51:27 +00:00
|
|
|
task=task.__name__,
|
2023-08-29 14:54:30 -07:00
|
|
|
job=notification.get("job", None),
|
|
|
|
|
row=notification.get("row_number", None),
|
|
|
|
|
noti=notification_id,
|
2017-01-17 16:51:27 +00:00
|
|
|
)
|
|
|
|
|
# Sometimes, SQS plays the same message twice. We should be able to catch an IntegrityError, but it seems
|
|
|
|
|
# SQLAlchemy is throwing a FlushError. So we check if the notification id already exists then do not
|
|
|
|
|
# send to the retry queue.
|
2022-09-28 16:27:37 -04:00
|
|
|
# This probably (hopefully) is not an issue with Redis as the celery backing store
|
2023-08-29 14:54:30 -07:00
|
|
|
current_app.logger.exception("Retry" + retry_msg)
|
2017-01-17 16:51:27 +00:00
|
|
|
try:
|
2024-12-04 07:37:59 -08:00
|
|
|
task.retry(
|
|
|
|
|
queue=QueueNames.RETRY,
|
|
|
|
|
exc=exc,
|
|
|
|
|
expires=Config.DEFAULT_REDIS_EXPIRE_TIME,
|
|
|
|
|
)
|
2017-01-17 16:51:27 +00:00
|
|
|
except task.MaxRetriesExceededError:
|
2024-09-11 09:39:18 -07:00
|
|
|
current_app.logger.exception("Max retry failed" + retry_msg)
|
2017-01-17 16:51:27 +00:00
|
|
|
|
|
|
|
|
|
2023-08-29 14:54:30 -07:00
|
|
|
@notify_celery.task(
|
|
|
|
|
bind=True, name="send-inbound-sms", max_retries=5, default_retry_delay=300
|
|
|
|
|
)
|
2017-06-20 17:13:40 +01:00
|
|
|
def send_inbound_sms_to_service(self, inbound_sms_id, service_id):
|
|
|
|
|
inbound_api = get_service_inbound_api_for_service(service_id=service_id)
|
|
|
|
|
if not inbound_api:
|
|
|
|
|
# No API data has been set for this service
|
|
|
|
|
return
|
|
|
|
|
|
2023-08-29 14:54:30 -07:00
|
|
|
inbound_sms = dao_get_inbound_sms_by_id(
|
|
|
|
|
service_id=service_id, inbound_id=inbound_sms_id
|
|
|
|
|
)
|
2017-06-20 17:13:40 +01:00
|
|
|
data = {
|
|
|
|
|
"id": str(inbound_sms.id),
|
2017-11-23 15:22:18 +00:00
|
|
|
# TODO: should we be validating and formatting the phone number here?
|
2017-06-22 10:15:08 +01:00
|
|
|
"source_number": inbound_sms.user_number,
|
|
|
|
|
"destination_number": inbound_sms.notify_number,
|
|
|
|
|
"message": inbound_sms.content,
|
2023-08-29 14:54:30 -07:00
|
|
|
"date_received": inbound_sms.provider_date.strftime(DATETIME_FORMAT),
|
2017-06-20 17:13:40 +01:00
|
|
|
}
|
|
|
|
|
|
2017-12-01 16:15:21 +00:00
|
|
|
try:
|
|
|
|
|
response = request(
|
|
|
|
|
method="POST",
|
|
|
|
|
url=inbound_api.url,
|
|
|
|
|
data=json.dumps(data),
|
|
|
|
|
headers={
|
2023-08-29 14:54:30 -07:00
|
|
|
"Content-Type": "application/json",
|
|
|
|
|
"Authorization": "Bearer {}".format(inbound_api.bearer_token),
|
2017-12-01 16:15:21 +00:00
|
|
|
},
|
2023-08-29 14:54:30 -07:00
|
|
|
timeout=60,
|
2017-12-01 16:15:21 +00:00
|
|
|
)
|
2020-11-13 16:41:48 +00:00
|
|
|
current_app.logger.debug(
|
2023-08-29 14:54:30 -07:00
|
|
|
f"send_inbound_sms_to_service sending {inbound_sms_id} to {inbound_api.url}, "
|
|
|
|
|
+ f"response {response.status_code}"
|
2020-11-13 16:41:48 +00:00
|
|
|
)
|
2017-12-01 16:15:21 +00:00
|
|
|
response.raise_for_status()
|
|
|
|
|
except RequestException as e:
|
|
|
|
|
current_app.logger.warning(
|
2023-08-29 14:54:30 -07:00
|
|
|
f"send_inbound_sms_to_service failed for service_id: {service_id} for inbound_sms_id: {inbound_sms_id} "
|
|
|
|
|
+ f"and url: {inbound_api.url}. exception: {e}"
|
2017-12-01 16:15:21 +00:00
|
|
|
)
|
|
|
|
|
if not isinstance(e, HTTPError) or e.response.status_code >= 500:
|
|
|
|
|
try:
|
2024-12-04 07:37:59 -08:00
|
|
|
self.retry(
|
|
|
|
|
queue=QueueNames.RETRY, expires=Config.DEFAULT_REDIS_EXPIRE_TIME
|
|
|
|
|
)
|
2017-12-01 16:15:21 +00:00
|
|
|
except self.MaxRetriesExceededError:
|
2024-09-11 09:39:18 -07:00
|
|
|
current_app.logger.exception(
|
2023-08-29 14:54:30 -07:00
|
|
|
"Retry: send_inbound_sms_to_service has retried the max number of"
|
|
|
|
|
+ f"times for service: {service_id} and inbound_sms {inbound_sms_id}"
|
2018-10-22 11:11:07 +01:00
|
|
|
)
|
2020-11-13 16:41:48 +00:00
|
|
|
else:
|
|
|
|
|
current_app.logger.warning(
|
2023-08-29 14:54:30 -07:00
|
|
|
f"send_inbound_sms_to_service is not being retried for service_id: {service_id} for "
|
|
|
|
|
+ f"inbound_sms id: {inbound_sms_id} and url: {inbound_api.url}. exception: {e}"
|
2020-11-13 16:41:48 +00:00
|
|
|
)
|
2017-12-01 14:06:13 +00:00
|
|
|
|
|
|
|
|
|
2024-07-19 13:58:23 -07:00
|
|
|
@notify_celery.task(name="regenerate-job-cache")
|
|
|
|
|
def regenerate_job_cache():
|
|
|
|
|
s3.get_s3_files()
|
|
|
|
|
|
|
|
|
|
|
2024-09-26 11:56:39 -07:00
|
|
|
@notify_celery.task(name="clean-job-cache")
|
|
|
|
|
def clean_job_cache():
|
|
|
|
|
s3.clean_cache()
|
|
|
|
|
|
|
|
|
|
|
2024-09-06 11:13:13 -07:00
|
|
|
@notify_celery.task(name="delete-old-s3-objects")
|
|
|
|
|
def delete_old_s3_objects():
|
|
|
|
|
s3.cleanup_old_s3_objects()
|
|
|
|
|
|
|
|
|
|
|
2023-08-29 14:54:30 -07:00
|
|
|
@notify_celery.task(name="process-incomplete-jobs")
|
2017-10-16 12:32:44 +01:00
|
|
|
def process_incomplete_jobs(job_ids):
|
2018-03-09 17:16:48 +00:00
|
|
|
jobs = [dao_get_job_by_id(job_id) for job_id in job_ids]
|
|
|
|
|
|
|
|
|
|
# reset the processing start time so that the check_job_status scheduled task doesn't pick this job up again
|
|
|
|
|
for job in jobs:
|
2024-01-15 14:22:56 -05:00
|
|
|
job.job_status = JobStatus.IN_PROGRESS
|
2024-05-23 13:59:51 -07:00
|
|
|
job.processing_started = utc_now()
|
2018-03-09 17:16:48 +00:00
|
|
|
dao_update_job(job)
|
|
|
|
|
|
2017-10-16 12:32:44 +01:00
|
|
|
current_app.logger.info("Resuming Job(s) {}".format(job_ids))
|
|
|
|
|
for job_id in job_ids:
|
|
|
|
|
process_incomplete_job(job_id)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def process_incomplete_job(job_id):
|
2017-10-17 11:07:36 +01:00
|
|
|
job = dao_get_job_by_id(job_id)
|
2017-10-16 12:32:44 +01:00
|
|
|
|
2017-10-17 11:07:36 +01:00
|
|
|
last_notification_added = dao_get_last_notification_added_for_job_id(job_id)
|
2017-10-16 12:32:44 +01:00
|
|
|
|
|
|
|
|
if last_notification_added:
|
|
|
|
|
resume_from_row = last_notification_added.job_row_number
|
|
|
|
|
else:
|
|
|
|
|
resume_from_row = -1 # The first row in the csv with a number is row 0
|
|
|
|
|
|
2023-08-29 14:54:30 -07:00
|
|
|
current_app.logger.info(
|
|
|
|
|
"Resuming job {} from row {}".format(job_id, resume_from_row)
|
|
|
|
|
)
|
2017-10-16 12:32:44 +01:00
|
|
|
|
2023-08-29 14:54:30 -07:00
|
|
|
recipient_csv, template, sender_id = get_recipient_csv_and_template_and_sender_id(
|
|
|
|
|
job
|
|
|
|
|
)
|
2017-10-16 12:32:44 +01:00
|
|
|
|
2019-11-05 16:47:00 +00:00
|
|
|
for row in recipient_csv.get_rows():
|
2018-03-05 15:55:46 +00:00
|
|
|
if row.index > resume_from_row:
|
2019-11-08 10:30:26 +00:00
|
|
|
process_row(row, template, job, job.service, sender_id=sender_id)
|
2017-10-16 12:32:44 +01:00
|
|
|
|
2018-02-13 18:38:32 +00:00
|
|
|
job_complete(job, resumed=True)
|