import itertools from datetime import (datetime) from flask import current_app from monotonic import monotonic from notifications_utils.recipients import ( RecipientCSV, allowed_to_send_to ) from notifications_utils.template import Template from sqlalchemy.exc import SQLAlchemyError from app import ( create_uuid, DATETIME_FORMAT, DATE_FORMAT, notify_celery, encryption ) from app.aws import s3 from app.celery.provider_tasks import send_sms_to_provider, send_email_to_provider from app.dao.jobs_dao import ( dao_update_job, dao_get_job_by_id ) from app.dao.notifications_dao import ( dao_create_notification, dao_get_notification_statistics_for_service_and_day ) from app.dao.services_dao import dao_fetch_service_by_id from app.dao.templates_dao import dao_get_template_by_id from app.models import ( Notification, EMAIL_TYPE, SMS_TYPE, KEY_TYPE_NORMAL ) from app.statsd_decorators import statsd @notify_celery.task(name="process-job") @statsd(namespace="tasks") def process_job(job_id): start = datetime.utcnow() job = dao_get_job_by_id(job_id) service = job.service stats = dao_get_notification_statistics_for_service_and_day( service_id=service.id, day=job.created_at.strftime(DATE_FORMAT) ) total_sent = 0 if stats: total_sent = stats.emails_requested + stats.sms_requested if total_sent + job.notification_count > service.message_limit: job.status = 'sending limits exceeded' job.processing_finished = datetime.utcnow() dao_update_job(job) current_app.logger.info( "Job {} size {} error. Sending limits {} exceeded".format( job_id, job.notification_count, service.message_limit) ) return job.status = 'in progress' dao_update_job(job) template = Template( dao_get_template_by_id(job.template_id, job.template_version).__dict__ ) for row_number, recipient, personalisation in RecipientCSV( s3.get_job_from_s3(str(service.id), str(job_id)), template_type=template.template_type, placeholders=template.placeholders ).enumerated_recipients_and_personalisation: encrypted = encryption.encrypt({ 'template': str(template.id), 'template_version': job.template_version, 'job': str(job.id), 'to': recipient, 'row_number': row_number, 'personalisation': { key: personalisation.get(key) for key in template.placeholders } }) if template.template_type == SMS_TYPE: send_sms.apply_async(( str(job.service_id), create_uuid(), encrypted, datetime.utcnow().strftime(DATETIME_FORMAT)), queue='bulk-sms' ) if template.template_type == EMAIL_TYPE: send_email.apply_async(( str(job.service_id), create_uuid(), encrypted, datetime.utcnow().strftime(DATETIME_FORMAT)), queue='bulk-email') finished = datetime.utcnow() job.status = 'finished' job.processing_started = start job.processing_finished = finished dao_update_job(job) remove_job.apply_async((str(job_id),), queue='remove-job') current_app.logger.info( "Job {} created at {} started at {} finished at {}".format(job_id, job.created_at, start, finished) ) @notify_celery.task(name="remove-job") @statsd(namespace="tasks") def remove_job(job_id): job = dao_get_job_by_id(job_id) s3.remove_job_from_s3(job.service.id, str(job_id)) current_app.logger.info("Job {} has been removed from s3.".format(job_id)) @notify_celery.task(bind=True, name="send-sms", max_retries=5, default_retry_delay=300) @statsd(namespace="tasks") def send_sms(self, service_id, notification_id, encrypted_notification, created_at, api_key_id=None, key_type=KEY_TYPE_NORMAL): notification = encryption.decrypt(encrypted_notification) service = dao_fetch_service_by_id(service_id) if not service_allowed_to_send_to(notification['to'], service): current_app.logger.info( "SMS {} failed as restricted service".format(notification_id) ) return try: _save_notification(created_at, notification, notification_id, service_id, SMS_TYPE, api_key_id, key_type) send_sms_to_provider.apply_async((service_id, notification_id), queue='sms') current_app.logger.info( "SMS {} created at {}".format(notification_id, created_at) ) except SQLAlchemyError as e: current_app.logger.exception(e) raise self.retry(queue="retry", exc=e) @notify_celery.task(bind=True, name="send-email", max_retries=5, default_retry_delay=300) @statsd(namespace="tasks") def send_email(self, service_id, notification_id, encrypted_notification, created_at, api_key_id=None, key_type=KEY_TYPE_NORMAL): notification = encryption.decrypt(encrypted_notification) service = dao_fetch_service_by_id(service_id) if not service_allowed_to_send_to(notification['to'], service): current_app.logger.info("Email {} failed as restricted service".format(notification_id)) return try: _save_notification(created_at, notification, notification_id, service_id, EMAIL_TYPE, api_key_id, key_type) send_email_to_provider.apply_async((service_id, notification_id), queue='email') current_app.logger.info("Email {} created at {}".format(notification_id, created_at)) except SQLAlchemyError as e: current_app.logger.exception(e) raise self.retry(queue="retry", exc=e) def _save_notification(created_at, notification, notification_id, service_id, notification_type, api_key_id, key_type): notification_db_object = Notification( id=notification_id, template_id=notification['template'], template_version=notification['template_version'], to=notification['to'], service_id=service_id, job_id=notification.get('job', None), job_row_number=notification.get('row_number', None), status='created', created_at=datetime.strptime(created_at, DATETIME_FORMAT), personalisation=notification.get('personalisation'), notification_type=notification_type, api_key_id=api_key_id, key_type=key_type ) dao_create_notification(notification_db_object, notification_type) def service_allowed_to_send_to(recipient, service): if not service.restricted: return True return allowed_to_send_to( recipient, itertools.chain.from_iterable( [user.mobile_number, user.email_address] for user in service.users ) )