2016-06-20 13:33:53 +01:00
|
|
|
from datetime import (datetime)
|
2016-03-31 15:57:50 +01:00
|
|
|
from flask import current_app
|
2016-04-13 15:31:08 +01:00
|
|
|
from notifications_utils.recipients import (
|
2016-09-28 17:00:17 +01:00
|
|
|
RecipientCSV
|
2016-03-31 15:57:50 +01:00
|
|
|
)
|
2016-06-20 13:33:53 +01:00
|
|
|
from notifications_utils.template import Template
|
|
|
|
|
from sqlalchemy.exc import SQLAlchemyError
|
2016-03-31 15:57:50 +01:00
|
|
|
from app import (
|
|
|
|
|
create_uuid,
|
|
|
|
|
DATETIME_FORMAT,
|
|
|
|
|
notify_celery,
|
2016-05-10 09:04:22 +01:00
|
|
|
encryption
|
2016-03-31 15:57:50 +01:00
|
|
|
)
|
|
|
|
|
from app.aws import s3
|
2016-09-28 15:05:50 +01:00
|
|
|
from app.celery import provider_tasks
|
2016-06-20 13:33:53 +01:00
|
|
|
from app.dao.jobs_dao import (
|
|
|
|
|
dao_update_job,
|
|
|
|
|
dao_get_job_by_id
|
|
|
|
|
)
|
2016-10-03 10:57:10 +01:00
|
|
|
from app.dao.services_dao import dao_fetch_service_by_id, fetch_todays_total_message_count
|
2016-06-20 13:33:53 +01:00
|
|
|
from app.dao.templates_dao import dao_get_template_by_id
|
2016-03-09 17:46:01 +00:00
|
|
|
from app.models import (
|
2016-06-29 11:50:54 +01:00
|
|
|
EMAIL_TYPE,
|
2016-06-30 17:32:49 +01:00
|
|
|
SMS_TYPE,
|
2016-10-03 10:57:10 +01:00
|
|
|
KEY_TYPE_NORMAL
|
2016-03-09 17:46:01 +00:00
|
|
|
)
|
2016-11-11 10:41:39 +00:00
|
|
|
from app.notifications.process_notifications import persist_notification
|
2016-09-28 17:00:17 +01:00
|
|
|
from app.service.utils import service_allowed_to_send_to
|
2016-08-05 10:44:43 +01:00
|
|
|
from app.statsd_decorators import statsd
|
2016-11-11 17:36:38 +00:00
|
|
|
from app import redis_store
|
2016-11-22 12:53:20 +00:00
|
|
|
from app.clients.redis import daily_limit_cache_key
|
2016-03-31 15:57:50 +01:00
|
|
|
|
2016-03-09 14:41:36 +00:00
|
|
|
|
2016-02-24 17:12:30 +00:00
|
|
|
@notify_celery.task(name="process-job")
|
2016-08-05 10:44:43 +01:00
|
|
|
@statsd(namespace="tasks")
|
2016-02-24 17:12:30 +00:00
|
|
|
def process_job(job_id):
|
2016-02-25 11:23:04 +00:00
|
|
|
start = datetime.utcnow()
|
2016-02-24 17:12:30 +00:00
|
|
|
job = dao_get_job_by_id(job_id)
|
2016-03-09 11:28:52 +00:00
|
|
|
|
2016-10-07 12:54:04 +01:00
|
|
|
if job.job_status != 'pending':
|
|
|
|
|
return
|
|
|
|
|
|
2016-03-09 11:28:52 +00:00
|
|
|
service = job.service
|
|
|
|
|
|
2016-11-11 10:41:39 +00:00
|
|
|
if __sending_limits_for_job_exceeded(service, job, job_id):
|
2016-03-09 13:57:53 +00:00
|
|
|
return
|
2016-03-09 11:28:52 +00:00
|
|
|
|
2016-10-05 14:56:32 +01:00
|
|
|
job.job_status = 'in progress'
|
2016-02-24 17:12:30 +00:00
|
|
|
dao_update_job(job)
|
|
|
|
|
|
2016-03-09 07:27:26 +00:00
|
|
|
template = Template(
|
2016-05-11 17:04:51 +01:00
|
|
|
dao_get_template_by_id(job.template_id, job.template_version).__dict__
|
2016-03-09 07:27:26 +00:00
|
|
|
)
|
|
|
|
|
|
2016-05-19 10:46:03 +01:00
|
|
|
for row_number, recipient, personalisation in RecipientCSV(
|
2016-04-07 13:44:04 +01:00
|
|
|
s3.get_job_from_s3(str(service.id), str(job_id)),
|
2016-03-09 14:41:36 +00:00
|
|
|
template_type=template.template_type,
|
|
|
|
|
placeholders=template.placeholders
|
2016-05-19 10:46:03 +01:00
|
|
|
).enumerated_recipients_and_personalisation:
|
2016-02-29 14:43:44 +00:00
|
|
|
|
2016-02-25 09:59:50 +00:00
|
|
|
encrypted = encryption.encrypt({
|
2016-04-08 16:13:10 +01:00
|
|
|
'template': str(template.id),
|
2016-05-11 17:04:51 +01:00
|
|
|
'template_version': job.template_version,
|
2016-02-24 17:12:30 +00:00
|
|
|
'job': str(job.id),
|
2016-03-06 12:51:45 +00:00
|
|
|
'to': recipient,
|
2016-05-19 10:46:03 +01:00
|
|
|
'row_number': row_number,
|
2016-11-01 15:19:56 +00:00
|
|
|
'personalisation': dict(personalisation)
|
2016-02-24 17:12:30 +00:00
|
|
|
})
|
|
|
|
|
|
2016-06-29 11:50:54 +01:00
|
|
|
if template.template_type == SMS_TYPE:
|
2016-02-25 09:59:50 +00:00
|
|
|
send_sms.apply_async((
|
|
|
|
|
str(job.service_id),
|
2016-04-29 14:36:10 +01:00
|
|
|
create_uuid(),
|
2016-02-25 11:23:04 +00:00
|
|
|
encrypted,
|
2016-03-08 17:45:37 +00:00
|
|
|
datetime.utcnow().strftime(DATETIME_FORMAT)),
|
2016-09-28 13:53:05 +01:00
|
|
|
queue='db-sms' if not service.research_mode else 'research-mode'
|
2016-02-25 09:59:50 +00:00
|
|
|
)
|
|
|
|
|
|
2016-06-29 11:50:54 +01:00
|
|
|
if template.template_type == EMAIL_TYPE:
|
2016-05-17 12:41:13 +01:00
|
|
|
send_email.apply_async((
|
|
|
|
|
str(job.service_id),
|
|
|
|
|
create_uuid(),
|
2016-02-25 11:23:04 +00:00
|
|
|
encrypted,
|
2016-03-08 17:45:37 +00:00
|
|
|
datetime.utcnow().strftime(DATETIME_FORMAT)),
|
2016-09-28 13:53:05 +01:00
|
|
|
queue='db-email' if not service.research_mode else 'research-mode'
|
|
|
|
|
)
|
2016-02-24 17:12:30 +00:00
|
|
|
|
2016-02-25 11:23:04 +00:00
|
|
|
finished = datetime.utcnow()
|
2016-10-05 14:56:32 +01:00
|
|
|
job.job_status = 'finished'
|
2016-02-25 11:23:04 +00:00
|
|
|
job.processing_started = start
|
|
|
|
|
job.processing_finished = finished
|
2016-02-24 17:12:30 +00:00
|
|
|
dao_update_job(job)
|
2016-02-25 11:23:04 +00:00
|
|
|
current_app.logger.info(
|
|
|
|
|
"Job {} created at {} started at {} finished at {}".format(job_id, job.created_at, start, finished)
|
|
|
|
|
)
|
2016-02-09 13:31:45 +00:00
|
|
|
|
|
|
|
|
|
2016-11-11 10:41:39 +00:00
|
|
|
def __sending_limits_for_job_exceeded(service, job, job_id):
|
|
|
|
|
total_sent = fetch_todays_total_message_count(service.id)
|
|
|
|
|
|
|
|
|
|
if total_sent + job.notification_count > service.message_limit:
|
|
|
|
|
job.job_status = 'sending limits exceeded'
|
|
|
|
|
job.processing_finished = datetime.utcnow()
|
|
|
|
|
dao_update_job(job)
|
|
|
|
|
current_app.logger.info(
|
|
|
|
|
"Job {} size {} error. Sending limits {} exceeded".format(
|
|
|
|
|
job_id, job.notification_count, service.message_limit)
|
|
|
|
|
)
|
|
|
|
|
return True
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
2016-08-10 08:46:37 +01:00
|
|
|
@notify_celery.task(bind=True, name="send-sms", max_retries=5, default_retry_delay=300)
|
2016-08-05 10:44:43 +01:00
|
|
|
@statsd(namespace="tasks")
|
2016-06-30 17:32:49 +01:00
|
|
|
def send_sms(self,
|
|
|
|
|
service_id,
|
|
|
|
|
notification_id,
|
|
|
|
|
encrypted_notification,
|
|
|
|
|
created_at,
|
|
|
|
|
api_key_id=None,
|
|
|
|
|
key_type=KEY_TYPE_NORMAL):
|
2016-11-15 11:44:48 +00:00
|
|
|
# notification_id is not used, it is created by the db model object
|
2016-02-16 15:28:30 +00:00
|
|
|
notification = encryption.decrypt(encrypted_notification)
|
2016-03-01 08:48:27 +00:00
|
|
|
service = dao_fetch_service_by_id(service_id)
|
2016-05-10 09:04:22 +01:00
|
|
|
|
2016-09-07 09:57:20 +01:00
|
|
|
if not service_allowed_to_send_to(notification['to'], service, key_type):
|
2016-04-05 14:51:55 +01:00
|
|
|
current_app.logger.info(
|
|
|
|
|
"SMS {} failed as restricted service".format(notification_id)
|
|
|
|
|
)
|
2016-05-31 14:55:06 +01:00
|
|
|
return
|
2016-03-03 12:05:18 +00:00
|
|
|
|
2016-02-15 16:01:14 +00:00
|
|
|
try:
|
2016-11-11 16:00:31 +00:00
|
|
|
saved_notification = persist_notification(template_id=notification['template'],
|
|
|
|
|
template_version=notification['template_version'],
|
|
|
|
|
recipient=notification['to'],
|
|
|
|
|
service_id=service.id,
|
|
|
|
|
personalisation=notification.get('personalisation'),
|
|
|
|
|
notification_type=SMS_TYPE,
|
|
|
|
|
api_key_id=api_key_id,
|
|
|
|
|
key_type=key_type,
|
|
|
|
|
created_at=created_at,
|
|
|
|
|
job_id=notification.get('job', None),
|
|
|
|
|
job_row_number=notification.get('row_number', None),
|
|
|
|
|
)
|
2016-11-11 10:41:39 +00:00
|
|
|
|
2016-09-28 15:29:10 +01:00
|
|
|
provider_tasks.deliver_sms.apply_async(
|
2016-11-16 16:15:30 +00:00
|
|
|
[str(saved_notification.id)],
|
2016-09-28 15:29:10 +01:00
|
|
|
queue='send-sms' if not service.research_mode else 'research-mode'
|
|
|
|
|
)
|
2016-04-04 15:02:25 +01:00
|
|
|
|
|
|
|
|
current_app.logger.info(
|
2016-11-11 16:00:31 +00:00
|
|
|
"SMS {} created at {} for job {}".format(saved_notification.id, created_at, notification.get('job', None))
|
2016-04-04 15:02:25 +01:00
|
|
|
)
|
2016-06-03 14:54:46 +01:00
|
|
|
|
2016-02-22 17:17:29 +00:00
|
|
|
except SQLAlchemyError as e:
|
2016-11-15 11:44:48 +00:00
|
|
|
current_app.logger.exception(
|
2016-11-21 13:10:22 +00:00
|
|
|
"RETRY: send_sms notification for job {} row number {}".format(
|
|
|
|
|
notification.get('job', None),
|
|
|
|
|
notification.get('row_number', None)), e)
|
2016-08-10 08:53:15 +01:00
|
|
|
try:
|
|
|
|
|
raise self.retry(queue="retry", exc=e)
|
|
|
|
|
except self.MaxRetriesExceededError:
|
|
|
|
|
current_app.logger.exception(
|
2016-11-21 13:10:22 +00:00
|
|
|
"RETRY FAILED: task send_sms failed for notification".format(
|
|
|
|
|
notification.get('job', None),
|
|
|
|
|
notification.get('row_number', None)), e)
|
2016-02-22 17:17:29 +00:00
|
|
|
|
|
|
|
|
|
2016-08-10 08:46:37 +01:00
|
|
|
@notify_celery.task(bind=True, name="send-email", max_retries=5, default_retry_delay=300)
|
2016-08-05 10:44:43 +01:00
|
|
|
@statsd(namespace="tasks")
|
2016-07-01 17:12:03 +01:00
|
|
|
def send_email(self, service_id,
|
2016-06-30 17:32:49 +01:00
|
|
|
notification_id,
|
|
|
|
|
encrypted_notification,
|
|
|
|
|
created_at,
|
|
|
|
|
api_key_id=None,
|
|
|
|
|
key_type=KEY_TYPE_NORMAL):
|
2016-11-15 11:44:48 +00:00
|
|
|
# notification_id is not used, it is created by the db model object
|
2016-02-22 17:17:29 +00:00
|
|
|
notification = encryption.decrypt(encrypted_notification)
|
2016-04-05 14:51:55 +01:00
|
|
|
service = dao_fetch_service_by_id(service_id)
|
2016-02-25 11:23:04 +00:00
|
|
|
|
2016-09-07 09:57:20 +01:00
|
|
|
if not service_allowed_to_send_to(notification['to'], service, key_type):
|
2016-07-01 14:42:40 +01:00
|
|
|
current_app.logger.info("Email {} failed as restricted service".format(notification_id))
|
2016-05-31 14:55:06 +01:00
|
|
|
return
|
2016-03-03 12:05:18 +00:00
|
|
|
|
2016-02-22 17:17:29 +00:00
|
|
|
try:
|
2016-11-11 16:00:31 +00:00
|
|
|
saved_notification = persist_notification(
|
2016-11-11 10:41:39 +00:00
|
|
|
template_id=notification['template'],
|
|
|
|
|
template_version=notification['template_version'],
|
|
|
|
|
recipient=notification['to'],
|
|
|
|
|
service_id=service.id,
|
|
|
|
|
personalisation=notification.get('personalisation'),
|
|
|
|
|
notification_type=EMAIL_TYPE,
|
|
|
|
|
api_key_id=api_key_id,
|
|
|
|
|
key_type=key_type,
|
2016-11-11 14:56:33 +00:00
|
|
|
created_at=created_at,
|
2016-11-11 10:41:39 +00:00
|
|
|
job_id=notification.get('job', None),
|
|
|
|
|
job_row_number=notification.get('row_number', None),
|
2016-11-11 14:56:33 +00:00
|
|
|
|
2016-09-07 13:45:37 +01:00
|
|
|
)
|
2016-02-22 17:17:29 +00:00
|
|
|
|
2016-09-28 15:29:10 +01:00
|
|
|
provider_tasks.deliver_email.apply_async(
|
2016-11-16 16:15:30 +00:00
|
|
|
[str(saved_notification.id)],
|
2016-09-28 15:29:10 +01:00
|
|
|
queue='send-email' if not service.research_mode else 'research-mode'
|
|
|
|
|
)
|
2016-04-04 15:02:25 +01:00
|
|
|
|
2016-11-11 16:09:09 +00:00
|
|
|
current_app.logger.info("Email {} created at {}".format(saved_notification.id, created_at))
|
2016-02-16 17:42:04 +00:00
|
|
|
except SQLAlchemyError as e:
|
2016-11-15 11:44:48 +00:00
|
|
|
current_app.logger.exception("RETRY: send_email notification".format(notification.get('job', None),
|
|
|
|
|
notification.get('row_number', None)), e)
|
2016-08-10 08:53:15 +01:00
|
|
|
try:
|
|
|
|
|
raise self.retry(queue="retry", exc=e)
|
|
|
|
|
except self.MaxRetriesExceededError:
|
|
|
|
|
current_app.logger.error(
|
2016-11-21 13:10:22 +00:00
|
|
|
"RETRY FAILED: task send_email failed for notification".format(
|
|
|
|
|
notification.get('job', None),
|
|
|
|
|
notification.get('row_number', None)), e)
|