mirror of
https://github.com/GSA/notifications-api.git
synced 2026-02-02 09:26:08 -05:00
@@ -24,6 +24,7 @@ from app.dao.templates_dao import dao_get_template_by_id
|
||||
from app.models import (
|
||||
EMAIL_TYPE,
|
||||
SMS_TYPE,
|
||||
LETTER_TYPE,
|
||||
KEY_TYPE_NORMAL
|
||||
)
|
||||
from app.notifications.process_notifications import persist_notification
|
||||
@@ -50,7 +51,7 @@ def process_job(job_id):
|
||||
|
||||
db_template = dao_get_template_by_id(job.template_id, job.template_version)
|
||||
|
||||
TemplateClass = SMSMessageTemplate if db_template.template_type == SMS_TYPE else WithSubjectTemplate
|
||||
TemplateClass = get_template_class(db_template.template_type)
|
||||
template = TemplateClass(db_template.__dict__)
|
||||
|
||||
for row_number, recipient, personalisation in RecipientCSV(
|
||||
@@ -58,33 +59,7 @@ def process_job(job_id):
|
||||
template_type=template.template_type,
|
||||
placeholders=template.placeholders
|
||||
).enumerated_recipients_and_personalisation:
|
||||
|
||||
encrypted = encryption.encrypt({
|
||||
'template': str(template.id),
|
||||
'template_version': job.template_version,
|
||||
'job': str(job.id),
|
||||
'to': recipient,
|
||||
'row_number': row_number,
|
||||
'personalisation': dict(personalisation)
|
||||
})
|
||||
|
||||
if template.template_type == SMS_TYPE:
|
||||
send_sms.apply_async((
|
||||
str(job.service_id),
|
||||
create_uuid(),
|
||||
encrypted,
|
||||
datetime.utcnow().strftime(DATETIME_FORMAT)),
|
||||
queue='db-sms' if not service.research_mode else 'research-mode'
|
||||
)
|
||||
|
||||
if template.template_type == EMAIL_TYPE:
|
||||
send_email.apply_async((
|
||||
str(job.service_id),
|
||||
create_uuid(),
|
||||
encrypted,
|
||||
datetime.utcnow().strftime(DATETIME_FORMAT)),
|
||||
queue='db-email' if not service.research_mode else 'research-mode'
|
||||
)
|
||||
process_row(row_number, recipient, personalisation, template, job, service)
|
||||
|
||||
finished = datetime.utcnow()
|
||||
job.job_status = 'finished'
|
||||
@@ -96,6 +71,42 @@ def process_job(job_id):
|
||||
)
|
||||
|
||||
|
||||
def process_row(row_number, recipient, personalisation, template, job, service):
|
||||
template_type = template.template_type
|
||||
encrypted = encryption.encrypt({
|
||||
'template': str(template.id),
|
||||
'template_version': job.template_version,
|
||||
'job': str(job.id),
|
||||
'to': recipient,
|
||||
'row_number': row_number,
|
||||
'personalisation': dict(personalisation)
|
||||
})
|
||||
|
||||
send_fns = {
|
||||
SMS_TYPE: send_sms,
|
||||
EMAIL_TYPE: send_email,
|
||||
LETTER_TYPE: persist_letter
|
||||
}
|
||||
|
||||
queues = {
|
||||
SMS_TYPE: 'db-sms',
|
||||
EMAIL_TYPE: 'db-email',
|
||||
LETTER_TYPE: 'db-letter',
|
||||
}
|
||||
|
||||
send_fn = send_fns[template_type]
|
||||
|
||||
send_fn.apply_async(
|
||||
(
|
||||
str(service.id),
|
||||
create_uuid(),
|
||||
encrypted,
|
||||
datetime.utcnow().strftime(DATETIME_FORMAT)
|
||||
),
|
||||
queue=queues[template_type] if not service.research_mode else 'research-mode'
|
||||
)
|
||||
|
||||
|
||||
def __sending_limits_for_job_exceeded(service, job, job_id):
|
||||
total_sent = fetch_todays_total_message_count(service.id)
|
||||
|
||||
@@ -154,30 +165,13 @@ def send_sms(self,
|
||||
)
|
||||
|
||||
except SQLAlchemyError as e:
|
||||
if not get_notification_by_id(notification_id):
|
||||
# Sometimes, SQS plays the same message twice. We should be able to catch an IntegrityError, but it seems
|
||||
# SQLAlchemy is throwing a FlushError. So we check if the notification id already exists then do not
|
||||
# send to the retry queue.
|
||||
current_app.logger.error(
|
||||
"RETRY: send_sms notification for job {} row number {} and notification id {}".format(
|
||||
notification.get('job', None),
|
||||
notification.get('row_number', None),
|
||||
notification_id))
|
||||
current_app.logger.exception(e)
|
||||
try:
|
||||
raise self.retry(queue="retry", exc=e)
|
||||
except self.MaxRetriesExceededError:
|
||||
current_app.logger.error(
|
||||
"RETRY FAILED: send_sms notification for job {} row number {} and notification id {}".format(
|
||||
notification.get('job', None),
|
||||
notification.get('row_number', None),
|
||||
notification_id))
|
||||
current_app.logger.exception(e)
|
||||
handle_exception(self, notification, notification_id, e)
|
||||
|
||||
|
||||
@notify_celery.task(bind=True, name="send-email", max_retries=5, default_retry_delay=300)
|
||||
@statsd(namespace="tasks")
|
||||
def send_email(self, service_id,
|
||||
def send_email(self,
|
||||
service_id,
|
||||
notification_id,
|
||||
encrypted_notification,
|
||||
created_at,
|
||||
@@ -213,22 +207,69 @@ def send_email(self, service_id,
|
||||
|
||||
current_app.logger.info("Email {} created at {}".format(saved_notification.id, created_at))
|
||||
except SQLAlchemyError as e:
|
||||
if not get_notification_by_id(notification_id):
|
||||
# Sometimes, SQS plays the same message twice. We should be able to catch an IntegrityError, but it seems
|
||||
# SQLAlchemy is throwing a FlushError. So we check if the notification id already exists then do not
|
||||
# send to the retry queue.
|
||||
current_app.logger.error(
|
||||
"RETRY: send_sms notification for job {} row number {} and notification id {}".format(
|
||||
notification.get('job', None),
|
||||
notification.get('row_number', None),
|
||||
notification_id))
|
||||
current_app.logger.exception(e)
|
||||
try:
|
||||
raise self.retry(queue="retry", exc=e)
|
||||
except self.MaxRetriesExceededError:
|
||||
current_app.logger.error(
|
||||
"RETRY FAILED: send_sms notification for job {} row number {} and notification id {}".format(
|
||||
notification.get('job', None),
|
||||
notification.get('row_number', None),
|
||||
notification_id))
|
||||
current_app.logger.exception(e)
|
||||
handle_exception(self, notification, notification_id, e)
|
||||
|
||||
|
||||
@notify_celery.task(bind=True, name="persist-letter", max_retries=5, default_retry_delay=300)
|
||||
@statsd(namespace="tasks")
|
||||
def persist_letter(
|
||||
self,
|
||||
service_id,
|
||||
notification_id,
|
||||
encrypted_notification,
|
||||
created_at
|
||||
):
|
||||
notification = encryption.decrypt(encrypted_notification)
|
||||
|
||||
# we store the recipient as just the first item of the person's address
|
||||
recipient = notification['personalisation']['addressline1']
|
||||
|
||||
service = dao_fetch_service_by_id(service_id)
|
||||
try:
|
||||
saved_notification = persist_notification(
|
||||
template_id=notification['template'],
|
||||
template_version=notification['template_version'],
|
||||
recipient=recipient,
|
||||
service=service,
|
||||
personalisation=notification['personalisation'],
|
||||
notification_type=LETTER_TYPE,
|
||||
api_key_id=None,
|
||||
key_type=KEY_TYPE_NORMAL,
|
||||
created_at=created_at,
|
||||
job_id=notification['job'],
|
||||
job_row_number=notification['row_number'],
|
||||
notification_id=notification_id
|
||||
)
|
||||
|
||||
# TODO: deliver letters
|
||||
|
||||
current_app.logger.info("Letter {} created at {}".format(saved_notification.id, created_at))
|
||||
except SQLAlchemyError as e:
|
||||
handle_exception(self, notification, notification_id, e)
|
||||
|
||||
|
||||
def handle_exception(task, notification, notification_id, exc):
|
||||
if not get_notification_by_id(notification_id):
|
||||
retry_msg = '{task} notification for job {job} row number {row} and notification id {noti}'.format(
|
||||
task=task.__name__,
|
||||
job=notification.get('job', None),
|
||||
row=notification.get('row_number', None),
|
||||
noti=notification_id
|
||||
)
|
||||
# Sometimes, SQS plays the same message twice. We should be able to catch an IntegrityError, but it seems
|
||||
# SQLAlchemy is throwing a FlushError. So we check if the notification id already exists then do not
|
||||
# send to the retry queue.
|
||||
current_app.logger.exception('Retry' + retry_msg)
|
||||
try:
|
||||
task.retry(queue="retry", exc=exc)
|
||||
except task.MaxRetriesExceededError:
|
||||
current_app.logger.exception('Retry' + retry_msg)
|
||||
|
||||
|
||||
def get_template_class(template_type):
|
||||
if template_type == SMS_TYPE:
|
||||
return SMSMessageTemplate
|
||||
elif template_type in (EMAIL_TYPE, LETTER_TYPE):
|
||||
# since we don't need rendering capabilities (we only need to extract placeholders) both email and letter can
|
||||
# use the same base template
|
||||
return WithSubjectTemplate
|
||||
|
||||
@@ -177,8 +177,9 @@ class Development(Config):
|
||||
SQLALCHEMY_ECHO = False
|
||||
CELERY_QUEUES = Config.CELERY_QUEUES + [
|
||||
Queue('db-sms', Exchange('default'), routing_key='db-sms'),
|
||||
Queue('send-sms', Exchange('default'), routing_key='send-sms'),
|
||||
Queue('db-email', Exchange('default'), routing_key='db-email'),
|
||||
Queue('db-letter', Exchange('default'), routing_key='db-letter'),
|
||||
Queue('send-sms', Exchange('default'), routing_key='send-sms'),
|
||||
Queue('send-email', Exchange('default'), routing_key='send-email'),
|
||||
Queue('research-mode', Exchange('default'), routing_key='research-mode')
|
||||
]
|
||||
@@ -196,8 +197,9 @@ class Test(Config):
|
||||
STATSD_PORT = 1000
|
||||
CELERY_QUEUES = Config.CELERY_QUEUES + [
|
||||
Queue('db-sms', Exchange('default'), routing_key='db-sms'),
|
||||
Queue('send-sms', Exchange('default'), routing_key='send-sms'),
|
||||
Queue('db-email', Exchange('default'), routing_key='db-email'),
|
||||
Queue('db-letter', Exchange('default'), routing_key='db-letter'),
|
||||
Queue('send-sms', Exchange('default'), routing_key='send-sms'),
|
||||
Queue('send-email', Exchange('default'), routing_key='send-email'),
|
||||
Queue('research-mode', Exchange('default'), routing_key='research-mode')
|
||||
]
|
||||
|
||||
Reference in New Issue
Block a user