Files
notifications-api/app/celery/tasks.py
Rebecca Law 8bea8b6a8f Fix bug where all notifications were getting a type = email.
Includes a fix to notification and notification_statistics data.
2016-07-06 14:31:52 +01:00

221 lines
7.3 KiB
Python

import itertools
from datetime import (datetime)
from flask import current_app
from monotonic import monotonic
from notifications_utils.recipients import (
RecipientCSV,
allowed_to_send_to
)
from notifications_utils.template import Template
from sqlalchemy.exc import SQLAlchemyError
from app import statsd_client
from app import (
create_uuid,
DATETIME_FORMAT,
DATE_FORMAT,
notify_celery,
encryption
)
from app.aws import s3
from app.celery.provider_tasks import send_sms_to_provider, send_email_to_provider
from app.dao.jobs_dao import (
dao_update_job,
dao_get_job_by_id
)
from app.dao.notifications_dao import (
dao_create_notification,
dao_get_notification_statistics_for_service_and_day
)
from app.dao.services_dao import dao_fetch_service_by_id
from app.dao.templates_dao import dao_get_template_by_id
from app.models import (
Notification,
EMAIL_TYPE,
SMS_TYPE,
KEY_TYPE_NORMAL
)
@notify_celery.task(name="process-job")
def process_job(job_id):
task_start = monotonic()
start = datetime.utcnow()
job = dao_get_job_by_id(job_id)
service = job.service
stats = dao_get_notification_statistics_for_service_and_day(
service_id=service.id,
day=job.created_at.strftime(DATE_FORMAT)
)
total_sent = 0
if stats:
total_sent = stats.emails_requested + stats.sms_requested
if total_sent + job.notification_count > service.message_limit:
job.status = 'sending limits exceeded'
job.processing_finished = datetime.utcnow()
dao_update_job(job)
current_app.logger.info(
"Job {} size {} error. Sending limits {} exceeded".format(
job_id, job.notification_count, service.message_limit)
)
return
job.status = 'in progress'
dao_update_job(job)
template = Template(
dao_get_template_by_id(job.template_id, job.template_version).__dict__
)
for row_number, recipient, personalisation in RecipientCSV(
s3.get_job_from_s3(str(service.id), str(job_id)),
template_type=template.template_type,
placeholders=template.placeholders
).enumerated_recipients_and_personalisation:
encrypted = encryption.encrypt({
'template': str(template.id),
'template_version': job.template_version,
'job': str(job.id),
'to': recipient,
'row_number': row_number,
'personalisation': {
key: personalisation.get(key)
for key in template.placeholders
}
})
if template.template_type == SMS_TYPE:
send_sms.apply_async((
str(job.service_id),
create_uuid(),
encrypted,
datetime.utcnow().strftime(DATETIME_FORMAT)),
queue='bulk-sms'
)
if template.template_type == EMAIL_TYPE:
send_email.apply_async((
str(job.service_id),
create_uuid(),
encrypted,
datetime.utcnow().strftime(DATETIME_FORMAT)),
queue='bulk-email')
finished = datetime.utcnow()
job.status = 'finished'
job.processing_started = start
job.processing_finished = finished
dao_update_job(job)
remove_job.apply_async((str(job_id),), queue='remove-job')
current_app.logger.info(
"Job {} created at {} started at {} finished at {}".format(job_id, job.created_at, start, finished)
)
statsd_client.incr("notifications.tasks.process-job")
statsd_client.timing("notifications.tasks.process-job.task-time", monotonic() - task_start)
@notify_celery.task(name="remove-job")
def remove_job(job_id):
job = dao_get_job_by_id(job_id)
s3.remove_job_from_s3(job.service.id, str(job_id))
current_app.logger.info("Job {} has been removed from s3.".format(job_id))
@notify_celery.task(bind=True, name="send-sms", max_retries=5, default_retry_delay=5)
def send_sms(self,
service_id,
notification_id,
encrypted_notification,
created_at,
api_key_id=None,
key_type=KEY_TYPE_NORMAL):
task_start = monotonic()
notification = encryption.decrypt(encrypted_notification)
service = dao_fetch_service_by_id(service_id)
if not service_allowed_to_send_to(notification['to'], service):
current_app.logger.info(
"SMS {} failed as restricted service".format(notification_id)
)
return
try:
_save_notification(created_at, notification, notification_id, service_id, SMS_TYPE, api_key_id, key_type)
send_sms_to_provider.apply_async((service_id, notification_id), queue='sms')
current_app.logger.info(
"SMS {} created at {}".format(notification_id, created_at)
)
statsd_client.incr("notifications.tasks.send-sms")
statsd_client.timing("notifications.tasks.send-sms.task-time", monotonic() - task_start)
except SQLAlchemyError as e:
current_app.logger.exception(e)
raise self.retry(queue="retry", exc=e)
@notify_celery.task(bind=True, name="send-email", max_retries=5, default_retry_delay=5)
def send_email(self, service_id,
notification_id,
encrypted_notification,
created_at,
api_key_id=None,
key_type=KEY_TYPE_NORMAL):
task_start = monotonic()
notification = encryption.decrypt(encrypted_notification)
service = dao_fetch_service_by_id(service_id)
if not service_allowed_to_send_to(notification['to'], service):
current_app.logger.info("Email {} failed as restricted service".format(notification_id))
return
try:
_save_notification(created_at, notification, notification_id, service_id, EMAIL_TYPE, api_key_id, key_type)
send_email_to_provider.apply_async((service_id, notification_id), queue='email')
current_app.logger.info("Email {} created at {}".format(notification_id, created_at))
statsd_client.incr("notifications.tasks.send-email")
statsd_client.timing("notifications.tasks.send-email.task-time", monotonic() - task_start)
except SQLAlchemyError as e:
current_app.logger.exception(e)
raise self.retry(queue="retry", exc=e)
def _save_notification(created_at, notification, notification_id, service_id, notification_type, api_key_id, key_type):
notification_db_object = Notification(
id=notification_id,
template_id=notification['template'],
template_version=notification['template_version'],
to=notification['to'],
service_id=service_id,
job_id=notification.get('job', None),
job_row_number=notification.get('row_number', None),
status='created',
created_at=datetime.strptime(created_at, DATETIME_FORMAT),
personalisation=notification.get('personalisation'),
notification_type=notification_type,
api_key_id=api_key_id,
key_type=key_type
)
dao_create_notification(notification_db_object, notification_type)
def service_allowed_to_send_to(recipient, service):
if not service.restricted:
return True
return allowed_to_send_to(
recipient,
itertools.chain.from_iterable(
[user.mobile_number, user.email_address] for user in service.users
)
)