mirror of
https://github.com/GSA/notifications-api.git
synced 2026-04-06 10:29:17 -04:00
Previously there were 4 queues for sending messages The was based on the fact that each notification has 2 actions - persist in the database and send to provider. Two queues supported the CSV upload - for the first of these tasks - bulk-email - build-sms And there were two more queues for the tasks that make the 3rd party client calls. - sms - email API Calls just used the latter two queues for both tasks Added four new queues - db-email - db-sms - send-sms - send-email So an API call puts a notification into the db-[type] queue first, which then puts the notification into the send-[type] queue Build queues stay as before. This will allow us to target processing of these tasks with separate workers to manage these differently.
228 lines
7.5 KiB
Python
228 lines
7.5 KiB
Python
import itertools
|
|
from datetime import (datetime)
|
|
|
|
from flask import current_app
|
|
from monotonic import monotonic
|
|
from notifications_utils.recipients import (
|
|
RecipientCSV,
|
|
allowed_to_send_to
|
|
)
|
|
from notifications_utils.template import Template
|
|
from sqlalchemy.exc import SQLAlchemyError
|
|
|
|
from app import (
|
|
create_uuid,
|
|
DATETIME_FORMAT,
|
|
DATE_FORMAT,
|
|
notify_celery,
|
|
encryption
|
|
)
|
|
from app.aws import s3
|
|
from app.celery.provider_tasks import send_sms_to_provider, send_email_to_provider
|
|
from app.dao.jobs_dao import (
|
|
dao_update_job,
|
|
dao_get_job_by_id
|
|
)
|
|
from app.dao.notifications_dao import (
|
|
dao_create_notification,
|
|
dao_get_notification_statistics_for_service_and_day
|
|
)
|
|
from app.dao.services_dao import dao_fetch_service_by_id
|
|
from app.dao.templates_dao import dao_get_template_by_id
|
|
from app.models import (
|
|
Notification,
|
|
EMAIL_TYPE,
|
|
SMS_TYPE,
|
|
KEY_TYPE_NORMAL
|
|
)
|
|
from app.statsd_decorators import statsd
|
|
|
|
|
|
@notify_celery.task(name="process-job")
|
|
@statsd(namespace="tasks")
|
|
def process_job(job_id):
|
|
start = datetime.utcnow()
|
|
job = dao_get_job_by_id(job_id)
|
|
|
|
service = job.service
|
|
|
|
stats = dao_get_notification_statistics_for_service_and_day(
|
|
service_id=service.id,
|
|
day=job.created_at.strftime(DATE_FORMAT)
|
|
)
|
|
|
|
total_sent = 0
|
|
if stats:
|
|
total_sent = stats.emails_requested + stats.sms_requested
|
|
|
|
if total_sent + job.notification_count > service.message_limit:
|
|
job.status = 'sending limits exceeded'
|
|
job.processing_finished = datetime.utcnow()
|
|
dao_update_job(job)
|
|
current_app.logger.info(
|
|
"Job {} size {} error. Sending limits {} exceeded".format(
|
|
job_id, job.notification_count, service.message_limit)
|
|
)
|
|
return
|
|
|
|
job.status = 'in progress'
|
|
dao_update_job(job)
|
|
|
|
template = Template(
|
|
dao_get_template_by_id(job.template_id, job.template_version).__dict__
|
|
)
|
|
|
|
for row_number, recipient, personalisation in RecipientCSV(
|
|
s3.get_job_from_s3(str(service.id), str(job_id)),
|
|
template_type=template.template_type,
|
|
placeholders=template.placeholders
|
|
).enumerated_recipients_and_personalisation:
|
|
|
|
encrypted = encryption.encrypt({
|
|
'template': str(template.id),
|
|
'template_version': job.template_version,
|
|
'job': str(job.id),
|
|
'to': recipient,
|
|
'row_number': row_number,
|
|
'personalisation': {
|
|
key: personalisation.get(key)
|
|
for key in template.placeholders
|
|
}
|
|
})
|
|
|
|
if template.template_type == SMS_TYPE:
|
|
send_sms.apply_async((
|
|
str(job.service_id),
|
|
create_uuid(),
|
|
encrypted,
|
|
datetime.utcnow().strftime(DATETIME_FORMAT)),
|
|
queue='bulk-sms'
|
|
)
|
|
|
|
if template.template_type == EMAIL_TYPE:
|
|
send_email.apply_async((
|
|
str(job.service_id),
|
|
create_uuid(),
|
|
encrypted,
|
|
datetime.utcnow().strftime(DATETIME_FORMAT)),
|
|
queue='bulk-email')
|
|
|
|
finished = datetime.utcnow()
|
|
job.status = 'finished'
|
|
job.processing_started = start
|
|
job.processing_finished = finished
|
|
dao_update_job(job)
|
|
remove_job.apply_async((str(job_id),), queue='remove-job')
|
|
current_app.logger.info(
|
|
"Job {} created at {} started at {} finished at {}".format(job_id, job.created_at, start, finished)
|
|
)
|
|
|
|
|
|
@notify_celery.task(name="remove-job")
|
|
@statsd(namespace="tasks")
|
|
def remove_job(job_id):
|
|
job = dao_get_job_by_id(job_id)
|
|
s3.remove_job_from_s3(job.service.id, str(job_id))
|
|
current_app.logger.info("Job {} has been removed from s3.".format(job_id))
|
|
|
|
|
|
@notify_celery.task(bind=True, name="send-sms", max_retries=5, default_retry_delay=300)
|
|
@statsd(namespace="tasks")
|
|
def send_sms(self,
|
|
service_id,
|
|
notification_id,
|
|
encrypted_notification,
|
|
created_at,
|
|
api_key_id=None,
|
|
key_type=KEY_TYPE_NORMAL):
|
|
notification = encryption.decrypt(encrypted_notification)
|
|
service = dao_fetch_service_by_id(service_id)
|
|
|
|
if not service_allowed_to_send_to(notification['to'], service):
|
|
current_app.logger.info(
|
|
"SMS {} failed as restricted service".format(notification_id)
|
|
)
|
|
return
|
|
|
|
try:
|
|
_save_notification(created_at, notification, notification_id, service_id, SMS_TYPE, api_key_id, key_type)
|
|
|
|
send_sms_to_provider.apply_async((service_id, notification_id), queue='send-sms')
|
|
|
|
current_app.logger.info(
|
|
"SMS {} created at {}".format(notification_id, created_at)
|
|
)
|
|
|
|
except SQLAlchemyError as e:
|
|
current_app.logger.exception("RETRY: send_sms notification {}".format(notification_id), e)
|
|
try:
|
|
raise self.retry(queue="retry", exc=e)
|
|
except self.MaxRetriesExceededError:
|
|
current_app.logger.exception(
|
|
"RETRY FAILED: task send_sms failed for notification {}".format(notification.id),
|
|
e
|
|
)
|
|
|
|
|
|
@notify_celery.task(bind=True, name="send-email", max_retries=5, default_retry_delay=300)
|
|
@statsd(namespace="tasks")
|
|
def send_email(self, service_id,
|
|
notification_id,
|
|
encrypted_notification,
|
|
created_at,
|
|
api_key_id=None,
|
|
key_type=KEY_TYPE_NORMAL):
|
|
notification = encryption.decrypt(encrypted_notification)
|
|
service = dao_fetch_service_by_id(service_id)
|
|
|
|
if not service_allowed_to_send_to(notification['to'], service):
|
|
current_app.logger.info("Email {} failed as restricted service".format(notification_id))
|
|
return
|
|
|
|
try:
|
|
_save_notification(created_at, notification, notification_id, service_id, EMAIL_TYPE, api_key_id, key_type)
|
|
|
|
send_email_to_provider.apply_async((service_id, notification_id), queue='send-email')
|
|
|
|
current_app.logger.info("Email {} created at {}".format(notification_id, created_at))
|
|
except SQLAlchemyError as e:
|
|
current_app.logger.exception("RETRY: send_email notification {}".format(notification_id), e)
|
|
try:
|
|
raise self.retry(queue="retry", exc=e)
|
|
except self.MaxRetriesExceededError:
|
|
current_app.logger.error(
|
|
"RETRY FAILED: task send_email failed for notification {}".format(notification.id),
|
|
e
|
|
)
|
|
|
|
|
|
def _save_notification(created_at, notification, notification_id, service_id, notification_type, api_key_id, key_type):
|
|
notification_db_object = Notification(
|
|
id=notification_id,
|
|
template_id=notification['template'],
|
|
template_version=notification['template_version'],
|
|
to=notification['to'],
|
|
service_id=service_id,
|
|
job_id=notification.get('job', None),
|
|
job_row_number=notification.get('row_number', None),
|
|
status='created',
|
|
created_at=datetime.strptime(created_at, DATETIME_FORMAT),
|
|
personalisation=notification.get('personalisation'),
|
|
notification_type=notification_type,
|
|
api_key_id=api_key_id,
|
|
key_type=key_type
|
|
)
|
|
dao_create_notification(notification_db_object, notification_type)
|
|
|
|
|
|
def service_allowed_to_send_to(recipient, service):
|
|
if not service.restricted:
|
|
return True
|
|
|
|
return allowed_to_send_to(
|
|
recipient,
|
|
itertools.chain.from_iterable(
|
|
[user.mobile_number, user.email_address] for user in service.users
|
|
)
|
|
)
|