mirror of
https://github.com/GSA/notifications-api.git
synced 2026-01-31 23:26:23 -05:00
Merge pull request #414 from alphagov/split-sms-and-retry
Split sms and retry
This commit is contained in:
117
app/celery/provider_tasks.py
Normal file
117
app/celery/provider_tasks.py
Normal file
@@ -0,0 +1,117 @@
|
||||
import json
|
||||
|
||||
from datetime import datetime
|
||||
from monotonic import monotonic
|
||||
from flask import current_app
|
||||
from app import notify_celery, statsd_client, encryption, clients
|
||||
from app.clients.sms import SmsClientException
|
||||
from app.dao.notifications_dao import (
|
||||
update_provider_stats,
|
||||
get_notification_by_id,
|
||||
dao_update_notification, update_notification_status_by_id)
|
||||
from app.dao.provider_details_dao import get_provider_details_by_notification_type
|
||||
from app.dao.services_dao import dao_fetch_service_by_id
|
||||
from app.celery.research_mode_tasks import send_sms_response
|
||||
|
||||
from notifications_utils.recipients import (
|
||||
validate_and_format_phone_number
|
||||
)
|
||||
|
||||
from app.dao.templates_dao import dao_get_template_by_id
|
||||
from notifications_utils.template import (
|
||||
Template,
|
||||
unlink_govuk_escaped
|
||||
)
|
||||
|
||||
|
||||
def retry_iteration_to_delay(retry=0):
|
||||
"""
|
||||
Given current retry calculate some delay before retrying
|
||||
0: 10 seconds
|
||||
1: 60 seconds (1 minutes)
|
||||
2: 300 seconds (5 minutes)
|
||||
3: 3600 seconds (60 minutes)
|
||||
4: 14400 seconds (4 hours)
|
||||
:param retry (zero indexed):
|
||||
:return length to retry in seconds, default 10 seconds
|
||||
"""
|
||||
|
||||
delays = {
|
||||
0: 10,
|
||||
1: 60,
|
||||
2: 300,
|
||||
3: 3600,
|
||||
4: 14400
|
||||
}
|
||||
|
||||
return delays.get(retry, 10)
|
||||
|
||||
|
||||
@notify_celery.task(bind=True, name="send-sms-to-provider", max_retries=5, default_retry_delay=5)
|
||||
def send_sms_to_provider(self, service_id, notification_id, encrypted_notification):
|
||||
task_start = monotonic()
|
||||
|
||||
service = dao_fetch_service_by_id(service_id)
|
||||
provider = provider_to_use('sms', notification_id)
|
||||
notification = get_notification_by_id(notification_id)
|
||||
|
||||
notification_json = encryption.decrypt(encrypted_notification)
|
||||
|
||||
template = Template(
|
||||
dao_get_template_by_id(notification.template_id, notification.template_version).__dict__,
|
||||
values=notification_json.get('personalisation', {}),
|
||||
prefix=service.name
|
||||
)
|
||||
try:
|
||||
if service.research_mode:
|
||||
send_sms_response.apply_async(
|
||||
(provider.get_name(), str(notification_id), notification_json['to']), queue='research-mode'
|
||||
)
|
||||
else:
|
||||
provider.send_sms(
|
||||
to=validate_and_format_phone_number(notification_json['to']),
|
||||
content=template.replaced,
|
||||
reference=str(notification_id)
|
||||
)
|
||||
|
||||
update_provider_stats(
|
||||
notification_id,
|
||||
'sms',
|
||||
provider.get_name(),
|
||||
content_char_count=template.replaced_content_count
|
||||
)
|
||||
|
||||
notification.sent_at = datetime.utcnow()
|
||||
notification.sent_by = provider.get_name(),
|
||||
notification.content_char_count = template.replaced_content_count
|
||||
dao_update_notification(notification)
|
||||
except SmsClientException as e:
|
||||
try:
|
||||
current_app.logger.error(
|
||||
"SMS notification {} failed".format(notification_id)
|
||||
)
|
||||
current_app.logger.exception(e)
|
||||
self.retry(queue="retry", countdown=retry_iteration_to_delay(self.request.retries))
|
||||
except self.MaxRetriesExceededError:
|
||||
update_notification_status_by_id(notification.id, 'technical-failure', 'failure')
|
||||
|
||||
current_app.logger.info(
|
||||
"SMS {} created at {} sent at {}".format(notification_id, notification.created_at, notification.sent_at)
|
||||
)
|
||||
statsd_client.incr("notifications.tasks.send-sms-to-provider")
|
||||
statsd_client.timing("notifications.tasks.send-sms-to-provider.task-time", monotonic() - task_start)
|
||||
statsd_client.timing("notifications.sms.total-time", datetime.utcnow() - notification.created_at)
|
||||
|
||||
|
||||
def provider_to_use(notification_type, notification_id):
|
||||
active_providers_in_order = [
|
||||
provider for provider in get_provider_details_by_notification_type(notification_type) if provider.active
|
||||
]
|
||||
|
||||
if not active_providers_in_order:
|
||||
current_app.logger.error(
|
||||
"{} {} failed as no active providers".format(notification_type, notification_id)
|
||||
)
|
||||
raise Exception("No active {} providers".format(notification_type))
|
||||
|
||||
return clients.get_client_by_name_and_type(active_providers_in_order[0].identifier, notification_type)
|
||||
@@ -4,14 +4,15 @@ from datetime import (datetime, timedelta)
|
||||
from flask import current_app
|
||||
from monotonic import monotonic
|
||||
from sqlalchemy.exc import SQLAlchemyError
|
||||
|
||||
from app import clients, statsd_client
|
||||
from app.clients import STATISTICS_FAILURE
|
||||
from app.clients.email import EmailClientException
|
||||
from app.clients.sms import SmsClientException
|
||||
from app.dao.services_dao import dao_fetch_service_by_id
|
||||
from app.dao.templates_dao import dao_get_template_by_id
|
||||
from app.dao.provider_details_dao import get_provider_details_by_notification_type
|
||||
from app.celery.research_mode_tasks import send_email_response, send_sms_response
|
||||
from app.celery.provider_tasks import send_sms_to_provider
|
||||
from app.celery.research_mode_tasks import send_email_response
|
||||
|
||||
from notifications_utils.template import Template
|
||||
|
||||
@@ -208,14 +209,12 @@ def remove_job(job_id):
|
||||
current_app.logger.info("Job {} has been removed from s3.".format(job_id))
|
||||
|
||||
|
||||
@notify_celery.task(name="send-sms")
|
||||
def send_sms(service_id, notification_id, encrypted_notification, created_at):
|
||||
@notify_celery.task(bind=True, name="send-sms", max_retries=5, default_retry_delay=5)
|
||||
def send_sms(self, service_id, notification_id, encrypted_notification, created_at):
|
||||
task_start = monotonic()
|
||||
notification = encryption.decrypt(encrypted_notification)
|
||||
service = dao_fetch_service_by_id(service_id)
|
||||
|
||||
provider = provider_to_use('sms', notification_id)
|
||||
|
||||
if not service_allowed_to_send_to(notification['to'], service):
|
||||
current_app.logger.info(
|
||||
"SMS {} failed as restricted service".format(notification_id)
|
||||
@@ -224,12 +223,6 @@ def send_sms(service_id, notification_id, encrypted_notification, created_at):
|
||||
|
||||
try:
|
||||
|
||||
template = Template(
|
||||
dao_get_template_by_id(notification['template'], notification['template_version']).__dict__,
|
||||
values=notification.get('personalisation', {}),
|
||||
prefix=service.name
|
||||
)
|
||||
|
||||
sent_at = datetime.utcnow()
|
||||
notification_db_object = Notification(
|
||||
id=notification_id,
|
||||
@@ -240,51 +233,21 @@ def send_sms(service_id, notification_id, encrypted_notification, created_at):
|
||||
job_id=notification.get('job', None),
|
||||
job_row_number=notification.get('row_number', None),
|
||||
status='sending',
|
||||
created_at=datetime.strptime(created_at, DATETIME_FORMAT),
|
||||
sent_at=sent_at,
|
||||
sent_by=provider.get_name(),
|
||||
content_char_count=template.replaced_content_count
|
||||
created_at=datetime.strptime(created_at, DATETIME_FORMAT)
|
||||
)
|
||||
statsd_client.timing_with_dates(
|
||||
"notifications.tasks.send-sms.queued-for",
|
||||
sent_at,
|
||||
datetime.strptime(created_at, DATETIME_FORMAT)
|
||||
)
|
||||
dao_create_notification(notification_db_object, TEMPLATE_TYPE_SMS, provider.get_name())
|
||||
dao_create_notification(notification_db_object, TEMPLATE_TYPE_SMS)
|
||||
|
||||
try:
|
||||
if service.research_mode:
|
||||
send_sms_response.apply_async(
|
||||
(provider.get_name(), str(notification_id), notification['to']), queue='research-mode'
|
||||
)
|
||||
else:
|
||||
provider.send_sms(
|
||||
to=validate_and_format_phone_number(notification['to']),
|
||||
content=template.replaced,
|
||||
reference=str(notification_id)
|
||||
)
|
||||
|
||||
update_provider_stats(
|
||||
notification_id,
|
||||
'sms',
|
||||
provider.get_name()
|
||||
)
|
||||
|
||||
except SmsClientException as e:
|
||||
current_app.logger.error(
|
||||
"SMS notification {} failed".format(notification_id)
|
||||
)
|
||||
current_app.logger.exception(e)
|
||||
notification_db_object.status = 'technical-failure'
|
||||
dao_update_notification(notification_db_object)
|
||||
send_sms_to_provider.apply_async((service_id, notification_id, encrypted_notification), queue='sms')
|
||||
|
||||
current_app.logger.info(
|
||||
"SMS {} created at {} sent at {}".format(notification_id, created_at, sent_at)
|
||||
)
|
||||
|
||||
statsd_client.incr("notifications.tasks.send-sms")
|
||||
statsd_client.timing("notifications.tasks.send-sms.task-time", monotonic() - task_start)
|
||||
except SQLAlchemyError as e:
|
||||
current_app.logger.exception(e)
|
||||
raise self.retry(queue="retry", exc=e)
|
||||
|
||||
|
||||
@notify_celery.task(name="send-email")
|
||||
@@ -317,7 +280,7 @@ def send_email(service_id, notification_id, encrypted_notification, created_at,
|
||||
sent_by=provider.get_name()
|
||||
)
|
||||
|
||||
dao_create_notification(notification_db_object, TEMPLATE_TYPE_EMAIL, provider.get_name())
|
||||
dao_create_notification(notification_db_object, TEMPLATE_TYPE_EMAIL)
|
||||
statsd_client.timing_with_dates(
|
||||
"notifications.tasks.send-email.queued-for",
|
||||
sent_at,
|
||||
|
||||
@@ -148,9 +148,7 @@ def dao_get_template_statistics_for_template(template_id):
|
||||
|
||||
|
||||
@transactional
|
||||
def dao_create_notification(notification, notification_type, provider_identifier):
|
||||
provider = ProviderDetails.query.filter_by(identifier=provider_identifier).one()
|
||||
|
||||
def dao_create_notification(notification, notification_type):
|
||||
if notification.job_id:
|
||||
db.session.query(Job).filter_by(
|
||||
id=notification.job_id
|
||||
@@ -207,8 +205,9 @@ def _update_notification_stats_query(notification_type, status):
|
||||
|
||||
def _update_statistics(notification, notification_statistics_status):
|
||||
if notification.job_id:
|
||||
db.session.query(Job).filter_by(id=notification.job_id
|
||||
).update(_update_job_stats_query(notification_statistics_status))
|
||||
db.session.query(Job).filter_by(
|
||||
id=notification.job_id
|
||||
).update(_update_job_stats_query(notification_statistics_status))
|
||||
|
||||
db.session.query(NotificationStatistics).filter_by(
|
||||
day=notification.created_at.date(),
|
||||
@@ -241,17 +240,16 @@ def _update_notification_status(notification, status, notification_statistics_st
|
||||
if notification_statistics_status:
|
||||
_update_statistics(notification, notification_statistics_status)
|
||||
|
||||
db.session.query(Notification).filter(Notification.id == notification.id
|
||||
).update({Notification.status: status})
|
||||
db.session.query(Notification).filter(Notification.id == notification.id).update({Notification.status: status})
|
||||
return True
|
||||
|
||||
|
||||
@transactional
|
||||
def update_notification_status_by_id(notification_id, status, notification_statistics_status=None):
|
||||
notification = Notification.query.filter(Notification.id == notification_id,
|
||||
or_(Notification.status == 'sending',
|
||||
Notification.status == 'pending')
|
||||
).first()
|
||||
notification = Notification.query.filter(
|
||||
Notification.id == notification_id,
|
||||
or_(Notification.status == 'sending',
|
||||
Notification.status == 'pending')).first()
|
||||
|
||||
if not notification:
|
||||
return False
|
||||
@@ -289,8 +287,8 @@ def dao_update_notification(notification):
|
||||
def update_provider_stats(
|
||||
id_,
|
||||
notification_type,
|
||||
provider_name):
|
||||
|
||||
provider_name,
|
||||
content_char_count=None):
|
||||
notification = Notification.query.filter(Notification.id == id_).one()
|
||||
provider = ProviderDetails.query.filter_by(identifier=provider_name).one()
|
||||
|
||||
@@ -298,6 +296,8 @@ def update_provider_stats(
|
||||
if notification_type == TEMPLATE_TYPE_EMAIL:
|
||||
return 1
|
||||
else:
|
||||
if (content_char_count):
|
||||
return get_sms_fragment_count(content_char_count)
|
||||
return get_sms_fragment_count(notification.content_char_count)
|
||||
|
||||
update_count = db.session.query(ProviderStatistics).filter_by(
|
||||
|
||||
Reference in New Issue
Block a user