Merge pull request #686 from alphagov/refactor-send-tasks-into-shared-code

Refactor send tasks into shared code
This commit is contained in:
minglis
2016-09-27 16:46:34 +01:00
committed by GitHub
15 changed files with 1070 additions and 627 deletions

View File

@@ -70,6 +70,7 @@ def create_app(app_name=None):
from app.provider_details.rest import provider_details as provider_details_blueprint
from app.spec.rest import spec as spec_blueprint
from app.organisation.rest import organisation_blueprint
from app.delivery.rest import delivery_blueprint
application.register_blueprint(service_blueprint, url_prefix='/service')
application.register_blueprint(user_blueprint, url_prefix='/user')
@@ -78,6 +79,7 @@ def create_app(app_name=None):
application.register_blueprint(notifications_blueprint)
application.register_blueprint(job_blueprint)
application.register_blueprint(invite_blueprint)
application.register_blueprint(delivery_blueprint)
application.register_blueprint(accept_invite, url_prefix='/invite')
application.register_blueprint(template_statistics_blueprint)

View File

@@ -1,31 +1,17 @@
from datetime import datetime
from flask import current_app
from notifications_utils.recipients import (
validate_and_format_phone_number
)
from notifications_utils.template import Template, get_sms_fragment_count
from notifications_utils.renderers import HTMLEmail, PlainTextEmail, SMSMessage
from app import notify_celery, statsd_client, clients, create_uuid
from app.clients.email import EmailClientException
from app.clients.sms import SmsClientException
from app.dao.notifications_dao import (
get_notification_by_id,
dao_update_notification,
update_notification_status_by_id
)
from app.dao.provider_details_dao import get_provider_details_by_notification_type
from app.dao.services_dao import dao_fetch_service_by_id
from app.celery.research_mode_tasks import send_sms_response, send_email_response
from app.dao.templates_dao import dao_get_template_by_id
from app.models import SMS_TYPE, EMAIL_TYPE, KEY_TYPE_TEST, BRANDING_ORG
from app import notify_celery
from app.dao import notifications_dao
from app.dao.notifications_dao import update_notification_status_by_id
from app.statsd_decorators import statsd
from app.delivery import send_to_providers
from sqlalchemy.orm.exc import NoResultFound
def retry_iteration_to_delay(retry=0):
"""
:param retry times we have performed a retry
Given current retry calculate some delay before retrying
0: 10 seconds
1: 60 seconds (1 minutes)
@@ -47,153 +33,93 @@ def retry_iteration_to_delay(retry=0):
return delays.get(retry, 10)
@notify_celery.task(bind=True, name="deliver_sms", max_retries=5, default_retry_delay=5)
@statsd(namespace="tasks")
def deliver_sms(self, notification_id):
try:
notification = notifications_dao.get_notification_by_id(notification_id)
if not notification:
raise NoResultFound()
send_to_providers.send_sms_to_provider(notification)
except Exception as e:
try:
current_app.logger.error(
"RETRY: SMS notification {} failed".format(notification_id)
)
current_app.logger.exception(e)
self.retry(queue="retry", countdown=retry_iteration_to_delay(self.request.retries))
except self.MaxRetriesExceededError:
current_app.logger.error(
"RETRY FAILED: task send_sms_to_provider failed for notification {}".format(notification_id),
e
)
update_notification_status_by_id(notification_id, 'technical-failure')
@notify_celery.task(bind=True, name="deliver_email", max_retries=5, default_retry_delay=5)
@statsd(namespace="tasks")
def deliver_email(self, notification_id):
try:
notification = notifications_dao.get_notification_by_id(notification_id)
if not notification:
raise NoResultFound()
send_to_providers.send_email_to_provider(notification)
except Exception as e:
try:
current_app.logger.error(
"RETRY: Email notification {} failed".format(notification_id)
)
current_app.logger.exception(e)
self.retry(queue="retry", countdown=retry_iteration_to_delay(self.request.retries))
except self.MaxRetriesExceededError:
current_app.logger.error(
"RETRY FAILED: task send_email_to_provider failed for notification {}".format(notification_id),
e
)
update_notification_status_by_id(notification_id, 'technical-failure')
@notify_celery.task(bind=True, name="send-sms-to-provider", max_retries=5, default_retry_delay=5)
@statsd(namespace="tasks")
def send_sms_to_provider(self, service_id, notification_id):
service = dao_fetch_service_by_id(service_id)
provider = provider_to_use(SMS_TYPE, notification_id)
notification = get_notification_by_id(notification_id)
if notification.status == 'created':
template_model = dao_get_template_by_id(notification.template_id, notification.template_version)
template = Template(
template_model.__dict__,
values={} if not notification.personalisation else notification.personalisation,
renderer=SMSMessage(prefix=service.name, sender=service.sms_sender)
)
try:
notification = notifications_dao.get_notification_by_id(notification_id)
if not notification:
raise NoResultFound()
send_to_providers.send_sms_to_provider(notification)
except Exception as e:
try:
if service.research_mode or notification.key_type == KEY_TYPE_TEST:
send_sms_response.apply_async(
(provider.get_name(), str(notification_id), notification.to), queue='research-mode'
)
notification.billable_units = 0
else:
provider.send_sms(
to=validate_and_format_phone_number(notification.to),
content=template.replaced,
reference=str(notification_id),
sender=service.sms_sender
)
notification.billable_units = get_sms_fragment_count(template.replaced_content_count)
notification.sent_at = datetime.utcnow()
notification.sent_by = provider.get_name()
notification.status = 'sending'
dao_update_notification(notification)
except SmsClientException as e:
try:
current_app.logger.error(
"RETRY: SMS notification {} failed".format(notification_id)
)
current_app.logger.exception(e)
self.retry(queue="retry", countdown=retry_iteration_to_delay(self.request.retries))
except self.MaxRetriesExceededError:
current_app.logger.error(
"RETRY FAILED: task send_sms_to_provider failed for notification {}".format(notification.id),
e
)
update_notification_status_by_id(notification.id, 'technical-failure')
current_app.logger.info(
"SMS {} sent to provider at {}".format(notification_id, notification.sent_at)
)
delta_milliseconds = (datetime.utcnow() - notification.created_at).total_seconds() * 1000
statsd_client.timing("sms.total-time", delta_milliseconds)
def provider_to_use(notification_type, notification_id):
active_providers_in_order = [
provider for provider in get_provider_details_by_notification_type(notification_type) if provider.active
]
if not active_providers_in_order:
current_app.logger.error(
"{} {} failed as no active providers".format(notification_type, notification_id)
)
raise Exception("No active {} providers".format(notification_type))
return clients.get_client_by_name_and_type(active_providers_in_order[0].identifier, notification_type)
current_app.logger.error(
"RETRY: SMS notification {} failed".format(notification_id)
)
current_app.logger.exception(e)
self.retry(queue="retry", countdown=retry_iteration_to_delay(self.request.retries))
except self.MaxRetriesExceededError:
current_app.logger.error(
"RETRY FAILED: task send_sms_to_provider failed for notification {}".format(notification_id),
e
)
update_notification_status_by_id(notification_id, 'technical-failure')
@notify_celery.task(bind=True, name="send-email-to-provider", max_retries=5, default_retry_delay=5)
@statsd(namespace="tasks")
def send_email_to_provider(self, service_id, notification_id):
service = dao_fetch_service_by_id(service_id)
provider = provider_to_use(EMAIL_TYPE, notification_id)
notification = get_notification_by_id(notification_id)
if notification.status == 'created':
try:
notification = notifications_dao.get_notification_by_id(notification_id)
if not notification:
raise NoResultFound()
send_to_providers.send_email_to_provider(notification)
except Exception as e:
try:
template_dict = dao_get_template_by_id(notification.template_id, notification.template_version).__dict__
html_email = Template(
template_dict,
values=notification.personalisation,
renderer=get_html_email_renderer(service)
current_app.logger.error(
"RETRY: Email notification {} failed".format(notification_id)
)
plain_text_email = Template(
template_dict,
values=notification.personalisation,
renderer=PlainTextEmail()
current_app.logger.exception(e)
self.retry(queue="retry", countdown=retry_iteration_to_delay(self.request.retries))
except self.MaxRetriesExceededError:
current_app.logger.error(
"RETRY FAILED: task send_email_to_provider failed for notification {}".format(notification_id),
e
)
if service.research_mode or notification.key_type == KEY_TYPE_TEST:
reference = str(create_uuid())
send_email_response.apply_async(
(provider.get_name(), reference, notification.to), queue='research-mode'
)
notification.billable_units = 0
else:
from_address = '"{}" <{}@{}>'.format(service.name, service.email_from,
current_app.config['NOTIFY_EMAIL_DOMAIN'])
reference = provider.send_email(
from_address,
notification.to,
plain_text_email.replaced_subject,
body=plain_text_email.replaced,
html_body=html_email.replaced,
reply_to_address=service.reply_to_email_address,
)
notification.reference = reference
notification.sent_at = datetime.utcnow()
notification.sent_by = provider.get_name(),
notification.status = 'sending'
dao_update_notification(notification)
except EmailClientException as e:
try:
current_app.logger.error(
"RETRY: Email notification {} failed".format(notification_id)
)
current_app.logger.exception(e)
self.retry(queue="retry", countdown=retry_iteration_to_delay(self.request.retries))
except self.MaxRetriesExceededError:
current_app.logger.error(
"RETRY FAILED: task send_email_to_provider failed for notification {}".format(notification.id),
e
)
update_notification_status_by_id(notification.id, 'technical-failure')
current_app.logger.info(
"Email {} sent to provider at {}".format(notification_id, notification.sent_at)
)
delta_milliseconds = (datetime.utcnow() - notification.created_at).total_seconds() * 1000
statsd_client.timing("email.total-time", delta_milliseconds)
def get_html_email_renderer(service):
govuk_banner = service.branding != BRANDING_ORG
if service.organisation:
logo = '{}{}{}'.format(
current_app.config['ADMIN_BASE_URL'],
current_app.config['BRANDING_PATH'],
service.organisation.logo
)
branding = {
'brand_colour': service.organisation.colour,
'brand_logo': logo,
'brand_name': service.organisation.name,
}
else:
branding = {}
return HTMLEmail(govuk_banner=govuk_banner, **branding)
update_notification_status_by_id(notification_id, 'technical-failure')

View File

@@ -1,11 +1,16 @@
from app.clients import (Client, ClientException)
class SmsClientException(ClientException):
class SmsClientResponseException(ClientException):
'''
Base Exception for SmsClients
Base Exception for SmsClientsResponses
'''
pass
def __init__(self, message):
self.message = message
def __str__(self):
return "Message {}".format(self.message)
class SmsClient(Client):

View File

@@ -1,12 +1,10 @@
import json
import logging
from monotonic import monotonic
from requests import request, RequestException, HTTPError
from requests import request, RequestException
from app.clients.sms import (
SmsClient,
SmsClientException
)
from app.clients.sms import (SmsClient, SmsClientResponseException)
from app.clients import STATISTICS_DELIVERED, STATISTICS_FAILURE
logger = logging.getLogger(__name__)
@@ -42,13 +40,14 @@ def get_firetext_responses(status):
return firetext_responses[status]
class FiretextClientException(SmsClientException):
def __init__(self, response):
self.code = response['code']
self.description = response['description']
class FiretextClientResponseException(SmsClientResponseException):
def __init__(self, response, exception):
self.status_code = response.status_code
self.text = response.text
self.exception = exception
def __str__(self):
return "Code {} description {}".format(self.code, self.description)
return "Code {} text {} exception {}".format(self.status_code, self.text, str(self.exception))
class FiretextClient(SmsClient):
@@ -62,11 +61,27 @@ class FiretextClient(SmsClient):
self.api_key = current_app.config.get('FIRETEXT_API_KEY')
self.from_number = current_app.config.get('FROM_NUMBER')
self.name = 'firetext'
self.url = "https://www.firetext.co.uk/api/sendsms/json"
self.statsd_client = statsd_client
def get_name(self):
return self.name
def record_outcome(self, success, response):
log_message = "API {} request {} on {} response status_code {}".format(
"POST",
"succeeded" if success else "failed",
self.url,
response.status_code
)
if success:
self.current_app.logger.info(log_message)
self.statsd_client.incr("clients.firetext.success")
else:
self.statsd_client.incr("clients.firetext.error")
self.current_app.logger.error(log_message)
def send_sms(self, to, content, reference, sender=None):
data = {
@@ -81,36 +96,23 @@ class FiretextClient(SmsClient):
try:
response = request(
"POST",
"https://www.firetext.co.uk/api/sendsms/json",
self.url,
data=data
)
firetext_response = response.json()
if firetext_response['code'] != 0:
raise FiretextClientException(firetext_response)
response.raise_for_status()
self.current_app.logger.info(
"API {} request on {} succeeded with {} '{}'".format(
"POST",
"https://www.firetext.co.uk/api/sendsms",
response.status_code,
firetext_response.items()
)
)
try:
json.loads(response.text)
if response.json()['code'] != 0:
raise ValueError()
except (ValueError, AttributeError) as e:
self.record_outcome(False, response)
raise FiretextClientResponseException(response=response, exception=e)
self.record_outcome(True, response)
except RequestException as e:
api_error = HTTPError.create(e)
logger.error(
"API {} request on {} failed with {} '{}'".format(
"POST",
"https://www.firetext.co.uk/api/sendsms",
api_error.status_code,
api_error.message
)
)
self.statsd_client.incr("clients.firetext.error")
raise api_error
self.record_outcome(False, e.response)
raise FiretextClientResponseException(response=e.response, exception=e)
finally:
elapsed_time = monotonic() - start_time
self.current_app.logger.info("Firetext request finished in {}".format(elapsed_time))
self.statsd_client.incr("clients.firetext.success")
self.statsd_client.timing("clients.firetext.request-time", elapsed_time)
return response

View File

@@ -1,4 +1,7 @@
import logging
from flask import current_app
from app.clients.sms.firetext import (
FiretextClient
)
@@ -13,7 +16,9 @@ class LoadtestingClient(FiretextClient):
def init_app(self, config, statsd_client, *args, **kwargs):
super(FiretextClient, self).__init__(*args, **kwargs)
self.current_app = current_app
self.api_key = config.config.get('LOADTESTING_API_KEY')
self.from_number = config.config.get('LOADTESTING_NUMBER')
self.from_number = config.config.get('FROM_NUMBER')
self.name = 'loadtesting'
self.url = "https://www.firetext.co.uk/api/sendsms/json"
self.statsd_client = statsd_client

View File

@@ -1,8 +1,9 @@
import json
from monotonic import monotonic
from requests import (request, RequestException, HTTPError)
from requests import (request, RequestException)
from app.clients import (STATISTICS_DELIVERED, STATISTICS_FAILURE)
from app.clients.sms import (SmsClient, SmsClientException)
from app.clients.sms import (SmsClient, SmsClientResponseException)
mmg_response_map = {
'2': {
@@ -42,13 +43,14 @@ def get_mmg_responses(status):
return mmg_response_map.get(status, mmg_response_map.get('default'))
class MMGClientException(SmsClientException):
def __init__(self, error_response):
self.code = error_response['Error']
self.description = error_response['Description']
class MMGClientResponseException(SmsClientResponseException):
def __init__(self, response, exception):
self.status_code = response.status_code
self.text = response.text
self.exception = exception
def __str__(self):
return "Code {} description {}".format(self.code, self.description)
return "Code {} text {} exception {}".format(self.status_code, self.text, str(self.exception))
class MMGClient(SmsClient):
@@ -65,6 +67,21 @@ class MMGClient(SmsClient):
self.statsd_client = statsd_client
self.mmg_url = current_app.config.get('MMG_URL')
def record_outcome(self, success, response):
log_message = "API {} request {} on {} response status_code {}".format(
"POST",
"succeeded" if success else "failed",
self.mmg_url,
response.status_code
)
if success:
self.current_app.logger.info(log_message)
self.statsd_client.incr("clients.mmg.success")
else:
self.statsd_client.incr("clients.mmg.error")
self.current_app.logger.error(log_message)
def get_name(self):
return self.name
@@ -79,38 +96,30 @@ class MMGClient(SmsClient):
}
start_time = monotonic()
try:
response = request("POST", self.mmg_url,
data=json.dumps(data),
headers={'Content-Type': 'application/json',
'Authorization': 'Basic {}'.format(self.api_key)})
if response.status_code != 200:
raise MMGClientException(response.json())
response = request(
"POST",
self.mmg_url,
data=json.dumps(data),
headers={
'Content-Type': 'application/json',
'Authorization': 'Basic {}'.format(self.api_key)
}
)
response.raise_for_status()
self.current_app.logger.info(
"API {} request on {} succeeded with {} '{}'".format(
"POST",
self.mmg_url,
response.status_code,
response.json().items()
)
)
try:
json.loads(response.text)
except (ValueError, AttributeError) as e:
self.record_outcome(False, response)
raise MMGClientResponseException(response=response, exception=e)
self.record_outcome(True, response)
except RequestException as e:
api_error = HTTPError.create(e)
self.current_app.logger.error(
"API {} request on {} failed with {} '{}'".format(
"POST",
self.mmg_url,
api_error.status_code,
api_error.message
)
)
self.statsd_client.incr("clients.mmg.error")
raise api_error
self.record_outcome(False, e.response)
raise MMGClientResponseException(response=e.response, exception=e)
finally:
elapsed_time = monotonic() - start_time
self.statsd_client.timing("clients.mmg.request-time", elapsed_time)
self.statsd_client.incr("clients.mmg.success")
self.current_app.logger.info("MMG request finished in {}".format(elapsed_time))
return response

0
app/delivery/__init__.py Normal file
View File

46
app/delivery/rest.py Normal file
View File

@@ -0,0 +1,46 @@
from flask import Blueprint, jsonify
from app.delivery import send_to_providers
from app.models import EMAIL_TYPE
from app.celery import provider_tasks
from app.dao import notifications_dao
from flask import current_app
delivery_blueprint = Blueprint('delivery', __name__)
from app.errors import register_errors
register_errors(delivery_blueprint)
@delivery_blueprint.route('/deliver/notification/<uuid:notification_id>', methods=['POST'])
def send_notification_to_provider(notification_id):
notification = notifications_dao.get_notification_by_id(notification_id)
if not notification:
return jsonify({"result": "error", "message": "No result found"}), 404
if notification.notification_type == EMAIL_TYPE:
send_response(
send_to_providers.send_email_to_provider,
provider_tasks.deliver_email,
notification,
'send-email')
else:
send_response(
send_to_providers.send_sms_to_provider,
provider_tasks.deliver_sms,
notification,
'send-sms')
return jsonify({}), 204
def send_response(send_call, task_call, notification, queue):
try:
send_call(notification)
except Exception as e:
current_app.logger.exception(
"Failed to send notification, retrying in celery. ID {} type {}".format(
notification.id,
notification.notification_type),
e)
task_call.apply_async((str(notification.id)), queue=queue)

View File

@@ -0,0 +1,135 @@
from datetime import datetime
from flask import current_app
from notifications_utils.recipients import (
validate_and_format_phone_number
)
from notifications_utils.template import Template, get_sms_fragment_count
from notifications_utils.renderers import HTMLEmail, PlainTextEmail, SMSMessage
from app import clients, statsd_client, create_uuid
from app.dao.notifications_dao import dao_update_notification
from app.dao.provider_details_dao import get_provider_details_by_notification_type
from app.dao.services_dao import dao_fetch_service_by_id
from app.celery.research_mode_tasks import send_sms_response, send_email_response
from app.dao.templates_dao import dao_get_template_by_id
from app.models import SMS_TYPE, KEY_TYPE_TEST, BRANDING_ORG, EMAIL_TYPE
def send_sms_to_provider(notification):
service = dao_fetch_service_by_id(notification.service_id)
provider = provider_to_use(SMS_TYPE, notification.id)
if notification.status == 'created':
template_model = dao_get_template_by_id(notification.template_id, notification.template_version)
template = Template(
template_model.__dict__,
values={} if not notification.personalisation else notification.personalisation,
renderer=SMSMessage(prefix=service.name, sender=service.sms_sender)
)
if service.research_mode or notification.key_type == KEY_TYPE_TEST:
send_sms_response.apply_async(
(provider.get_name(), str(notification.id), notification.to), queue='research-mode'
)
notification.billable_units = 0
else:
provider.send_sms(
to=validate_and_format_phone_number(notification.to),
content=template.replaced,
reference=str(notification.id),
sender=service.sms_sender
)
notification.billable_units = get_sms_fragment_count(template.replaced_content_count)
notification.sent_at = datetime.utcnow()
notification.sent_by = provider.get_name()
notification.status = 'sending'
dao_update_notification(notification)
current_app.logger.info(
"SMS {} sent to provider at {}".format(notification.id, notification.sent_at)
)
delta_milliseconds = (datetime.utcnow() - notification.created_at).total_seconds() * 1000
statsd_client.timing("sms.total-time", delta_milliseconds)
def send_email_to_provider(notification):
service = dao_fetch_service_by_id(notification.service_id)
provider = provider_to_use(EMAIL_TYPE, notification.id)
if notification.status == 'created':
template_dict = dao_get_template_by_id(notification.template_id, notification.template_version).__dict__
html_email = Template(
template_dict,
values=notification.personalisation,
renderer=get_html_email_renderer(service)
)
plain_text_email = Template(
template_dict,
values=notification.personalisation,
renderer=PlainTextEmail()
)
if service.research_mode or notification.key_type == KEY_TYPE_TEST:
reference = str(create_uuid())
send_email_response.apply_async(
(provider.get_name(), reference, notification.to), queue='research-mode'
)
notification.billable_units = 0
else:
from_address = '"{}" <{}@{}>'.format(service.name, service.email_from,
current_app.config['NOTIFY_EMAIL_DOMAIN'])
reference = provider.send_email(
from_address,
notification.to,
plain_text_email.replaced_subject,
body=plain_text_email.replaced,
html_body=html_email.replaced,
reply_to_address=service.reply_to_email_address,
)
notification.reference = reference
notification.sent_at = datetime.utcnow()
notification.sent_by = provider.get_name(),
notification.status = 'sending'
dao_update_notification(notification)
current_app.logger.info(
"Email {} sent to provider at {}".format(notification.id, notification.sent_at)
)
delta_milliseconds = (datetime.utcnow() - notification.created_at).total_seconds() * 1000
statsd_client.timing("email.total-time", delta_milliseconds)
def provider_to_use(notification_type, notification_id):
active_providers_in_order = [
provider for provider in get_provider_details_by_notification_type(notification_type) if provider.active
]
if not active_providers_in_order:
current_app.logger.error(
"{} {} failed as no active providers".format(notification_type, notification_id)
)
raise Exception("No active {} providers".format(notification_type))
return clients.get_client_by_name_and_type(active_providers_in_order[0].identifier, notification_type)
def get_html_email_renderer(service):
govuk_banner = service.branding != BRANDING_ORG
if service.organisation:
logo = '{}{}{}'.format(
current_app.config['ADMIN_BASE_URL'],
current_app.config['BRANDING_PATH'],
service.organisation.logo
)
branding = {
'brand_colour': service.organisation.colour,
'brand_logo': logo,
'brand_name': service.organisation.name,
}
else:
branding = {}
return HTMLEmail(govuk_banner=govuk_banner, **branding)