import json from datetime import datetime from flask import current_app from notifications_utils.recipients import RecipientCSV from requests import HTTPError, RequestException, request from sqlalchemy.exc import IntegrityError, SQLAlchemyError from app import create_uuid, encryption, notify_celery from app.aws import s3 from app.celery import provider_tasks from app.config import QueueNames from app.dao.inbound_sms_dao import dao_get_inbound_sms_by_id from app.dao.jobs_dao import dao_get_job_by_id, dao_update_job from app.dao.notifications_dao import ( dao_get_last_notification_added_for_job_id, get_notification_by_id, ) from app.dao.service_email_reply_to_dao import dao_get_reply_to_by_id from app.dao.service_inbound_api_dao import get_service_inbound_api_for_service from app.dao.service_sms_sender_dao import dao_get_service_sms_senders_by_id from app.dao.templates_dao import dao_get_template_by_id from app.models import ( EMAIL_TYPE, JOB_STATUS_CANCELLED, JOB_STATUS_FINISHED, JOB_STATUS_IN_PROGRESS, JOB_STATUS_PENDING, KEY_TYPE_NORMAL, SMS_TYPE, ) from app.notifications.process_notifications import persist_notification from app.notifications.validators import ( check_service_over_daily_message_limit, check_service_over_total_message_limit, ) from app.serialised_models import SerialisedService, SerialisedTemplate from app.service.utils import service_allowed_to_send_to from app.utils import DATETIME_FORMAT from app.v2.errors import TooManyRequestsError @notify_celery.task(name="process-job") def process_job(job_id, sender_id=None): start = datetime.utcnow() job = dao_get_job_by_id(job_id) current_app.logger.info("Starting process-job task for job id {} with status: {}".format(job_id, job.job_status)) if job.job_status != JOB_STATUS_PENDING: return service = job.service job.job_status = JOB_STATUS_IN_PROGRESS job.processing_started = start dao_update_job(job) if not service.active: job.job_status = JOB_STATUS_CANCELLED dao_update_job(job) current_app.logger.warning( "Job {} has been cancelled, service {} is inactive".format(job_id, service.id)) return if __daily_sending_limits_for_job_exceeded(service, job, job_id): return if __total_sending_limits_for_job_exceeded(service, job, job_id): return recipient_csv, template, sender_id = get_recipient_csv_and_template_and_sender_id(job) current_app.logger.info("Starting job {} processing {} notifications".format(job_id, job.notification_count)) for row in recipient_csv.get_rows(): process_row(row, template, job, service, sender_id=sender_id) job_complete(job, start=start) def job_complete(job, resumed=False, start=None): job.job_status = JOB_STATUS_FINISHED finished = datetime.utcnow() job.processing_finished = finished dao_update_job(job) if resumed: current_app.logger.info( "Resumed Job {} completed at {}".format(job.id, job.created_at) ) else: current_app.logger.info( "Job {} created at {} started at {} finished at {}".format(job.id, job.created_at, start, finished) ) def get_recipient_csv_and_template_and_sender_id(job): db_template = dao_get_template_by_id(job.template_id, job.template_version) template = db_template._as_utils_template() contents, meta_data = s3.get_job_and_metadata_from_s3(service_id=str(job.service_id), job_id=str(job.id)) recipient_csv = RecipientCSV(contents, template=template) return recipient_csv, template, meta_data.get("sender_id") def process_row(row, template, job, service, sender_id=None): template_type = template.template_type encrypted = encryption.encrypt({ 'template': str(template.id), 'template_version': job.template_version, 'job': str(job.id), 'to': row.recipient, 'row_number': row.index, 'personalisation': dict(row.personalisation) }) send_fns = { SMS_TYPE: save_sms, EMAIL_TYPE: save_email } send_fn = send_fns[template_type] task_kwargs = {} if sender_id: task_kwargs['sender_id'] = sender_id notification_id = create_uuid() send_fn.apply_async( ( str(service.id), notification_id, encrypted, ), task_kwargs, queue=QueueNames.DATABASE if not service.research_mode else QueueNames.RESEARCH_MODE ) return notification_id def __daily_sending_limits_for_job_exceeded(service, job, job_id): try: total_daily_sent = check_service_over_daily_message_limit(KEY_TYPE_NORMAL, service) if total_daily_sent + job.notification_count > service.message_limit: raise TooManyRequestsError(service.message_limit) else: return False except TooManyRequestsError: job.job_status = 'sending limits exceeded' job.processing_finished = datetime.utcnow() dao_update_job(job) current_app.logger.info( "Job {} size {} error. Daily ending limits {} exceeded".format( job_id, job.notification_count, service.message_limit) ) return True def __total_sending_limits_for_job_exceeded(service, job, job_id): try: total_sent = check_service_over_total_message_limit(KEY_TYPE_NORMAL, service) if total_sent + job.notification_count > service.total_message_limit: raise TooManyRequestsError(service.total_message_limit) else: return False except TooManyRequestsError: job.job_status = 'total sending limits exceeded' job.processing_finished = datetime.utcnow() dao_update_job(job) current_app.logger.info( "Job {} size {} error. Total sending limits {} exceeded".format( job_id, job.notification_count, service.message_limit) ) return True @notify_celery.task(bind=True, name="save-sms", max_retries=5, default_retry_delay=300) def save_sms(self, service_id, notification_id, encrypted_notification, sender_id=None): notification = encryption.decrypt(encrypted_notification) service = SerialisedService.from_id(service_id) template = SerialisedTemplate.from_id_and_service_id( notification['template'], service_id=service.id, version=notification['template_version'], ) if sender_id: reply_to_text = dao_get_service_sms_senders_by_id(service_id, sender_id).sms_sender else: reply_to_text = template.reply_to_text if not service_allowed_to_send_to(notification['to'], service, KEY_TYPE_NORMAL): current_app.logger.debug( "SMS {} failed as restricted service".format(notification_id) ) return try: saved_notification = persist_notification( template_id=notification['template'], template_version=notification['template_version'], recipient=notification['to'], service=service, personalisation=notification.get('personalisation'), notification_type=SMS_TYPE, api_key_id=None, key_type=KEY_TYPE_NORMAL, created_at=datetime.utcnow(), job_id=notification.get('job', None), job_row_number=notification.get('row_number', None), notification_id=notification_id, reply_to_text=reply_to_text ) provider_tasks.deliver_sms.apply_async( [str(saved_notification.id)], queue=QueueNames.SEND_SMS if not service.research_mode else QueueNames.RESEARCH_MODE ) current_app.logger.debug( "SMS {} created at {} for job {}".format( saved_notification.id, saved_notification.created_at, notification.get('job', None)) ) except SQLAlchemyError as e: handle_exception(self, notification, notification_id, e) @notify_celery.task(bind=True, name="save-email", max_retries=5, default_retry_delay=300) def save_email(self, service_id, notification_id, encrypted_notification, sender_id=None): notification = encryption.decrypt(encrypted_notification) service = SerialisedService.from_id(service_id) template = SerialisedTemplate.from_id_and_service_id( notification['template'], service_id=service.id, version=notification['template_version'], ) if sender_id: reply_to_text = dao_get_reply_to_by_id(service_id, sender_id).email_address else: reply_to_text = template.reply_to_text if not service_allowed_to_send_to(notification['to'], service, KEY_TYPE_NORMAL): current_app.logger.info("Email {} failed as restricted service".format(notification_id)) return try: saved_notification = persist_notification( template_id=notification['template'], template_version=notification['template_version'], recipient=notification['to'], service=service, personalisation=notification.get('personalisation'), notification_type=EMAIL_TYPE, api_key_id=None, key_type=KEY_TYPE_NORMAL, created_at=datetime.utcnow(), job_id=notification.get('job', None), job_row_number=notification.get('row_number', None), notification_id=notification_id, reply_to_text=reply_to_text ) provider_tasks.deliver_email.apply_async( [str(saved_notification.id)], queue=QueueNames.SEND_EMAIL if not service.research_mode else QueueNames.RESEARCH_MODE ) current_app.logger.debug("Email {} created at {}".format(saved_notification.id, saved_notification.created_at)) except SQLAlchemyError as e: handle_exception(self, notification, notification_id, e) @notify_celery.task(bind=True, name="save-api-email", max_retries=5, default_retry_delay=300) def save_api_email(self, encrypted_notification): save_api_email_or_sms(self, encrypted_notification) @notify_celery.task(bind=True, name="save-api-sms", max_retries=5, default_retry_delay=300) def save_api_sms(self, encrypted_notification): save_api_email_or_sms(self, encrypted_notification) def save_api_email_or_sms(self, encrypted_notification): notification = encryption.decrypt(encrypted_notification) service = SerialisedService.from_id(notification['service_id']) q = QueueNames.SEND_EMAIL if notification['notification_type'] == EMAIL_TYPE else QueueNames.SEND_SMS provider_task = provider_tasks.deliver_email if notification['notification_type'] == EMAIL_TYPE \ else provider_tasks.deliver_sms try: persist_notification( notification_id=notification["id"], template_id=notification['template_id'], template_version=notification['template_version'], recipient=notification['to'], service=service, personalisation=notification.get('personalisation'), notification_type=notification['notification_type'], client_reference=notification['client_reference'], api_key_id=notification.get('api_key_id'), key_type=KEY_TYPE_NORMAL, created_at=notification['created_at'], reply_to_text=notification['reply_to_text'], status=notification['status'], document_download_count=notification['document_download_count'] ) q = q if not service.research_mode else QueueNames.RESEARCH_MODE provider_task.apply_async( [notification['id']], queue=q ) current_app.logger.debug( f"{notification['notification_type']} {notification['id']} has been persisted and sent to delivery queue." ) except IntegrityError: current_app.logger.info(f"{notification['notification_type']} {notification['id']} already exists.") except SQLAlchemyError: try: self.retry(queue=QueueNames.RETRY) except self.MaxRetriesExceededError: current_app.logger.error(f"Max retry failed Failed to persist notification {notification['id']}") def handle_exception(task, notification, notification_id, exc): if not get_notification_by_id(notification_id): retry_msg = '{task} notification for job {job} row number {row} and notification id {noti}'.format( task=task.__name__, job=notification.get('job', None), row=notification.get('row_number', None), noti=notification_id ) # Sometimes, SQS plays the same message twice. We should be able to catch an IntegrityError, but it seems # SQLAlchemy is throwing a FlushError. So we check if the notification id already exists then do not # send to the retry queue. # This probably (hopefully) is not an issue with Redis as the celery backing store current_app.logger.exception('Retry' + retry_msg) try: task.retry(queue=QueueNames.RETRY, exc=exc) except task.MaxRetriesExceededError: current_app.logger.error('Max retry failed' + retry_msg) @notify_celery.task(bind=True, name="send-inbound-sms", max_retries=5, default_retry_delay=300) def send_inbound_sms_to_service(self, inbound_sms_id, service_id): inbound_api = get_service_inbound_api_for_service(service_id=service_id) if not inbound_api: # No API data has been set for this service return inbound_sms = dao_get_inbound_sms_by_id(service_id=service_id, inbound_id=inbound_sms_id) data = { "id": str(inbound_sms.id), # TODO: should we be validating and formatting the phone number here? "source_number": inbound_sms.user_number, "destination_number": inbound_sms.notify_number, "message": inbound_sms.content, "date_received": inbound_sms.provider_date.strftime(DATETIME_FORMAT) } try: response = request( method="POST", url=inbound_api.url, data=json.dumps(data), headers={ 'Content-Type': 'application/json', 'Authorization': 'Bearer {}'.format(inbound_api.bearer_token) }, timeout=60 ) current_app.logger.debug( f"send_inbound_sms_to_service sending {inbound_sms_id} to {inbound_api.url}, " + f"response {response.status_code}" ) response.raise_for_status() except RequestException as e: current_app.logger.warning( f"send_inbound_sms_to_service failed for service_id: {service_id} for inbound_sms_id: {inbound_sms_id} " + f"and url: {inbound_api.url}. exception: {e}" ) if not isinstance(e, HTTPError) or e.response.status_code >= 500: try: self.retry(queue=QueueNames.RETRY) except self.MaxRetriesExceededError: current_app.logger.error( "Retry: send_inbound_sms_to_service has retried the max number of" + f"times for service: {service_id} and inbound_sms {inbound_sms_id}" ) else: current_app.logger.warning( f"send_inbound_sms_to_service is not being retried for service_id: {service_id} for " + f"inbound_sms id: {inbound_sms_id} and url: {inbound_api.url}. exception: {e}" ) @notify_celery.task(name='process-incomplete-jobs') def process_incomplete_jobs(job_ids): jobs = [dao_get_job_by_id(job_id) for job_id in job_ids] # reset the processing start time so that the check_job_status scheduled task doesn't pick this job up again for job in jobs: job.job_status = JOB_STATUS_IN_PROGRESS job.processing_started = datetime.utcnow() dao_update_job(job) current_app.logger.info("Resuming Job(s) {}".format(job_ids)) for job_id in job_ids: process_incomplete_job(job_id) def process_incomplete_job(job_id): job = dao_get_job_by_id(job_id) last_notification_added = dao_get_last_notification_added_for_job_id(job_id) if last_notification_added: resume_from_row = last_notification_added.job_row_number else: resume_from_row = -1 # The first row in the csv with a number is row 0 current_app.logger.info("Resuming job {} from row {}".format(job_id, resume_from_row)) recipient_csv, template, sender_id = get_recipient_csv_and_template_and_sender_id(job) for row in recipient_csv.get_rows(): if row.index > resume_from_row: process_row(row, template, job, job.service, sender_id=sender_id) job_complete(job, resumed=True)