diff --git a/app/celery/provider_tasks.py b/app/celery/provider_tasks.py new file mode 100644 index 000000000..1ab706670 --- /dev/null +++ b/app/celery/provider_tasks.py @@ -0,0 +1,104 @@ +import json + +from celery.exceptions import MaxRetriesExceededError + +from datetime import datetime +from monotonic import monotonic +from flask import current_app +from app import notify_celery, statsd_client, encryption, clients +from app.clients.sms import SmsClientException +from app.dao.notifications_dao import ( + update_provider_stats, + get_notification_by_id, + dao_update_notification) +from app.dao.provider_details_dao import get_provider_details_by_notification_type +from app.dao.services_dao import dao_fetch_service_by_id +from app.celery.research_mode_tasks import send_email_response, send_sms_response + +from notifications_utils.recipients import ( + validate_and_format_phone_number +) + +from app.dao.templates_dao import dao_get_template_by_id +from notifications_utils.template import ( + Template, + unlink_govuk_escaped +) + + +retries = { + 0: 5, # 5 seconds + 1: 30, # 30 seconds + 2: 60 * 5, # 5 minutes + 3: 60 * 15, # 15 minutes + 4: 60 * 30 # 30 minutes +} + +@notify_celery.task(bind=True, name="send-sms-to-provider", max_retries=5, default_retry_delay=5) +def send_sms_to_provider(self, service_id, notification_id, encrypted_notification): + task_start = monotonic() + + service = dao_fetch_service_by_id(service_id) + provider = provider_to_use('sms', notification_id) + notification = get_notification_by_id(notification_id) + + notification_json = encryption.decrypt(encrypted_notification) + + template = Template( + dao_get_template_by_id(notification.template_id, notification.template_version).__dict__, + values=notification_json.get('personalisation', {}), + prefix=service.name + ) + try: + if service.research_mode: + send_sms_response.apply_async( + (provider.get_name(), str(notification_id), notification_json['to']), queue='research-mode' + ) + else: + provider.send_sms( + to=validate_and_format_phone_number(notification_json['to']), + content=template.replaced, + reference=str(notification_id) + ) + + update_provider_stats( + notification_id, + 'sms', + provider.get_name(), + content_char_count=template.replaced_content_count + ) + + except SmsClientException as e: + try: + current_app.logger.error( + "SMS notification {} failed".format(notification_id) + ) + current_app.logger.exception(e) + raise self.retry(queue="sms", countdown=retries[self.request.retries]) + except self.MaxRetriesExceededError: + notification.status = 'technical-failure' + + notification.sent_at = datetime.utcnow() + notification.sent_by = provider.get_name(), + notification.content_char_count = template.replaced_content_count + dao_update_notification(notification) + + current_app.logger.info( + "SMS {} created at {} sent at {}".format(notification_id, notification.created_at, notification.sent_at) + ) + statsd_client.incr("notifications.tasks.send-sms-to-provider") + statsd_client.timing("notifications.tasks.send-sms-to-provider.task-time", monotonic() - task_start) + + +def provider_to_use(notification_type, notification_id): + active_providers_in_order = [ + provider for provider in get_provider_details_by_notification_type(notification_type) if provider.active + ] + + if not active_providers_in_order: + current_app.logger.error( + "{} {} failed as no active providers".format(notification_type, notification_id) + ) + raise Exception("No active {} providers".format(notification_type)) + + return clients.get_client_by_name_and_type(active_providers_in_order[0].identifier, notification_type) diff --git a/app/celery/tasks.py b/app/celery/tasks.py index 80925b5af..a736de8b5 100644 --- a/app/celery/tasks.py +++ b/app/celery/tasks.py @@ -11,7 +11,9 @@ from app.dao.services_dao import dao_fetch_service_by_id from app.dao.templates_dao import dao_get_template_by_id from app.dao.provider_details_dao import get_provider_details_by_notification_type from app.celery.research_mode_tasks import send_email_response, send_sms_response - +from app.celery.provider_tasks import ( + send_sms_to_provider +) from notifications_utils.template import Template, unlink_govuk_escaped from notifications_utils.recipients import ( @@ -212,14 +214,12 @@ def remove_job(job_id): current_app.logger.info("Job {} has been removed from s3.".format(job_id)) -@notify_celery.task(name="send-sms") -def send_sms(service_id, notification_id, encrypted_notification, created_at): +@notify_celery.task(bind=True, name="send-sms", max_retries=5, default_retry_delay=5) +def send_sms(self, service_id, notification_id, encrypted_notification, created_at): task_start = monotonic() notification = encryption.decrypt(encrypted_notification) service = dao_fetch_service_by_id(service_id) - provider = provider_to_use('sms', notification_id) - if not service_allowed_to_send_to(notification['to'], service): current_app.logger.info( "SMS {} failed as restricted service".format(notification_id) @@ -228,12 +228,6 @@ def send_sms(service_id, notification_id, encrypted_notification, created_at): try: - template = Template( - dao_get_template_by_id(notification['template'], notification['template_version']).__dict__, - values=notification.get('personalisation', {}), - prefix=service.name - ) - sent_at = datetime.utcnow() notification_db_object = Notification( id=notification_id, @@ -244,51 +238,21 @@ def send_sms(service_id, notification_id, encrypted_notification, created_at): job_id=notification.get('job', None), job_row_number=notification.get('row_number', None), status='sending', - created_at=datetime.strptime(created_at, DATETIME_FORMAT), - sent_at=sent_at, - sent_by=provider.get_name(), - content_char_count=template.replaced_content_count + created_at=datetime.strptime(created_at, DATETIME_FORMAT) ) - statsd_client.timing_with_dates( - "notifications.tasks.send-sms.queued-for", - sent_at, - datetime.strptime(created_at, DATETIME_FORMAT) - ) - dao_create_notification(notification_db_object, TEMPLATE_TYPE_SMS, provider.get_name()) + dao_create_notification(notification_db_object, TEMPLATE_TYPE_SMS) - try: - if service.research_mode: - send_sms_response.apply_async( - (provider.get_name(), str(notification_id), notification['to']), queue='research-mode' - ) - else: - provider.send_sms( - to=validate_and_format_phone_number(notification['to']), - content=template.replaced, - reference=str(notification_id) - ) - - update_provider_stats( - notification_id, - 'sms', - provider.get_name() - ) - - except SmsClientException as e: - current_app.logger.error( - "SMS notification {} failed".format(notification_id) - ) - current_app.logger.exception(e) - notification_db_object.status = 'technical-failure' - dao_update_notification(notification_db_object) + send_sms_to_provider.apply_async((service_id, notification_id, encrypted_notification), queue='sms') current_app.logger.info( "SMS {} created at {} sent at {}".format(notification_id, created_at, sent_at) ) + statsd_client.incr("notifications.tasks.send-sms") statsd_client.timing("notifications.tasks.send-sms.task-time", monotonic() - task_start) except SQLAlchemyError as e: current_app.logger.exception(e) + raise self.retry(queue="sms", exc=e) @notify_celery.task(name="send-email") diff --git a/app/dao/notifications_dao.py b/app/dao/notifications_dao.py index ac9d597d9..4260e4817 100644 --- a/app/dao/notifications_dao.py +++ b/app/dao/notifications_dao.py @@ -140,9 +140,7 @@ def dao_get_template_statistics_for_service(service_id, limit_days=None): @transactional -def dao_create_notification(notification, notification_type, provider_identifier): - provider = ProviderDetails.query.filter_by(identifier=provider_identifier).one() - +def dao_create_notification(notification, notification_type): if notification.job_id: db.session.query(Job).filter_by( id=notification.job_id @@ -281,8 +279,8 @@ def dao_update_notification(notification): def update_provider_stats( id_, notification_type, - provider_name): - + provider_name, + content_char_count=None): notification = Notification.query.filter(Notification.id == id_).one() provider = ProviderDetails.query.filter_by(identifier=provider_name).one() @@ -290,6 +288,8 @@ def update_provider_stats( if notification_type == TEMPLATE_TYPE_EMAIL: return 1 else: + if (content_char_count): + return get_sms_fragment_count(content_char_count) return get_sms_fragment_count(notification.content_char_count) update_count = db.session.query(ProviderStatistics).filter_by(