Merge pull request #638 from alphagov/remove-contested-writes

Remove contested writes
This commit is contained in:
minglis
2016-08-30 08:56:45 +01:00
committed by GitHub
13 changed files with 78 additions and 759 deletions

View File

@@ -13,7 +13,6 @@ from app import notify_celery, statsd_client, clients, create_uuid
from app.clients.email import EmailClientException
from app.clients.sms import SmsClientException
from app.dao.notifications_dao import (
update_provider_stats,
get_notification_by_id,
dao_update_notification,
update_notification_status_by_id
@@ -78,13 +77,6 @@ def send_sms_to_provider(self, service_id, notification_id):
)
notification.billable_units = get_sms_fragment_count(template.replaced_content_count)
update_provider_stats(
notification_id,
SMS_TYPE,
provider.get_name(),
billable_units=notification.billable_units
)
notification.sent_at = datetime.utcnow()
notification.sent_by = provider.get_name()
notification.status = 'sending'
@@ -101,7 +93,7 @@ def send_sms_to_provider(self, service_id, notification_id):
"RETRY FAILED: task send_sms_to_provider failed for notification {}".format(notification.id),
e
)
update_notification_status_by_id(notification.id, 'technical-failure', 'failure')
update_notification_status_by_id(notification.id, 'technical-failure')
current_app.logger.info(
"SMS {} sent to provider at {}".format(notification_id, notification.sent_at)
@@ -163,12 +155,6 @@ def send_email_to_provider(self, service_id, notification_id):
reply_to_address=service.reply_to_email_address,
)
update_provider_stats(
notification_id,
EMAIL_TYPE,
provider.get_name(),
billable_units=1
)
notification.reference = reference
notification.sent_at = datetime.utcnow()
notification.sent_by = provider.get_name(),
@@ -186,7 +172,7 @@ def send_email_to_provider(self, service_id, notification_id):
"RETRY FAILED: task send_email_to_provider failed for notification {}".format(notification.id),
e
)
update_notification_status_by_id(notification.id, 'technical-failure', 'failure')
update_notification_status_by_id(notification.id, 'technical-failure')
current_app.logger.info(
"Email {} sent to provider at {}".format(notification_id, notification.sent_at)

View File

@@ -91,10 +91,10 @@ def timeout_notifications():
seconds=current_app.config.get('SENDING_NOTIFICATIONS_TIMEOUT_PERIOD')
):
# TODO: think about making this a bulk update rather than one at a time.
updated = update_notification_status_by_id(noti.id, 'temporary-failure', STATISTICS_FAILURE)
updated = update_notification_status_by_id(noti.id, 'temporary-failure')
if updated:
current_app.logger.info(("Timeout period reached for notification ({})"
", status has been updated.").format(noti.id))
current_app.logger.info(
"Timeout period reached for notification ({}), status has been updated.".format(noti.id))
except Exception as e:
current_app.logger.exception(e)
current_app.logger.error((

View File

@@ -2,7 +2,6 @@ import itertools
from datetime import (datetime)
from flask import current_app
from monotonic import monotonic
from notifications_utils.recipients import (
RecipientCSV,
allowed_to_send_to
@@ -13,7 +12,6 @@ from sqlalchemy.exc import SQLAlchemyError
from app import (
create_uuid,
DATETIME_FORMAT,
DATE_FORMAT,
notify_celery,
encryption
)
@@ -23,11 +21,8 @@ from app.dao.jobs_dao import (
dao_update_job,
dao_get_job_by_id
)
from app.dao.notifications_dao import (
dao_create_notification,
dao_get_notification_statistics_for_service_and_day
)
from app.dao.services_dao import dao_fetch_service_by_id
from app.dao.notifications_dao import (dao_create_notification)
from app.dao.services_dao import dao_fetch_service_by_id, dao_fetch_todays_stats_for_service
from app.dao.templates_dao import dao_get_template_by_id
from app.models import (
Notification,
@@ -46,14 +41,7 @@ def process_job(job_id):
service = job.service
stats = dao_get_notification_statistics_for_service_and_day(
service_id=service.id,
day=job.created_at.strftime(DATE_FORMAT)
)
total_sent = 0
if stats:
total_sent = stats.emails_requested + stats.sms_requested
total_sent = sum(row.count for row in dao_fetch_todays_stats_for_service(service.id))
if total_sent + job.notification_count > service.message_limit:
job.status = 'sending limits exceeded'
@@ -212,7 +200,7 @@ def _save_notification(created_at, notification, notification_id, service_id, no
api_key_id=api_key_id,
key_type=key_type
)
dao_create_notification(notification_db_object, notification_type)
dao_create_notification(notification_db_object)
def service_allowed_to_send_to(recipient, service):

View File

@@ -16,19 +16,9 @@ from app.models import (
Service,
Notification,
NotificationHistory,
Job,
NotificationStatistics,
TemplateStatistics,
SMS_TYPE,
EMAIL_TYPE,
Template,
ProviderStatistics,
ProviderDetails)
from app.clients import (
STATISTICS_FAILURE,
STATISTICS_DELIVERED,
STATISTICS_REQUESTED
)
Template)
from app.dao.dao_utils import transactional
from app.statsd_decorators import statsd
@@ -124,40 +114,7 @@ def dao_get_last_template_usage(template_id):
@statsd(namespace="dao")
@transactional
def dao_create_notification(notification, notification_type):
if notification.job_id:
db.session.query(Job).filter_by(
id=notification.job_id
).update({
Job.notifications_sent: Job.notifications_sent + 1,
Job.updated_at: datetime.utcnow()
})
update_count = db.session.query(NotificationStatistics).filter_by(
day=notification.created_at.date(),
service_id=notification.service_id
).update(_update_notification_stats_query(notification_type, 'requested'))
if update_count == 0:
stats = NotificationStatistics(
day=notification.created_at.date(),
service_id=notification.service_id,
sms_requested=1 if notification_type == SMS_TYPE else 0,
emails_requested=1 if notification_type == EMAIL_TYPE else 0
)
db.session.add(stats)
update_count = db.session.query(TemplateStatistics).filter_by(
day=date.today(),
service_id=notification.service_id,
template_id=notification.template_id
).update({'usage_count': TemplateStatistics.usage_count + 1, 'updated_at': datetime.utcnow()})
if update_count == 0:
template_stats = TemplateStatistics(template_id=notification.template_id,
service_id=notification.service_id)
db.session.add(template_stats)
def dao_create_notification(notification):
if not notification.id:
# need to populate defaulted fields before we create the notification history object
notification.id = uuid.uuid4()
@@ -165,51 +122,10 @@ def dao_create_notification(notification, notification_type):
notification.status = 'created'
notification_history = NotificationHistory.from_notification(notification)
db.session.add(notification)
db.session.add(notification_history)
def _update_notification_stats_query(notification_type, status):
mapping = {
SMS_TYPE: {
STATISTICS_REQUESTED: NotificationStatistics.sms_requested,
STATISTICS_DELIVERED: NotificationStatistics.sms_delivered,
STATISTICS_FAILURE: NotificationStatistics.sms_failed
},
EMAIL_TYPE: {
STATISTICS_REQUESTED: NotificationStatistics.emails_requested,
STATISTICS_DELIVERED: NotificationStatistics.emails_delivered,
STATISTICS_FAILURE: NotificationStatistics.emails_failed
}
}
return {
mapping[notification_type][status]: mapping[notification_type][status] + 1
}
def _update_statistics(notification, notification_statistics_status):
if notification.job_id:
db.session.query(Job).filter_by(
id=notification.job_id
).update(_update_job_stats_query(notification_statistics_status))
db.session.query(NotificationStatistics).filter_by(
day=notification.created_at.date(),
service_id=notification.service_id
).update(
_update_notification_stats_query(notification.notification_type, notification_statistics_status)
)
def _update_job_stats_query(status):
mapping = {
STATISTICS_FAILURE: Job.notifications_failed,
STATISTICS_DELIVERED: Job.notifications_delivered
}
return {mapping[status]: mapping[status] + 1}
def _decide_permanent_temporary_failure(current_status, status):
# Firetext will send pending, then send either succes or fail.
# If we go from pending to delivered we need to set failure type as temporary-failure
@@ -219,12 +135,8 @@ def _decide_permanent_temporary_failure(current_status, status):
return status
def _update_notification_status(notification, status, notification_statistics_status):
def _update_notification_status(notification, status):
status = _decide_permanent_temporary_failure(current_status=notification.status, status=status)
if notification_statistics_status:
_update_statistics(notification, notification_statistics_status)
notification.status = status
dao_update_notification(notification)
return True
@@ -232,7 +144,7 @@ def _update_notification_status(notification, status, notification_statistics_st
@statsd(namespace="dao")
@transactional
def update_notification_status_by_id(notification_id, status, notification_statistics_status=None):
def update_notification_status_by_id(notification_id, status):
notification = Notification.query.with_lockmode("update").filter(
Notification.id == notification_id,
or_(Notification.status == 'created',
@@ -244,25 +156,24 @@ def update_notification_status_by_id(notification_id, status, notification_stati
return _update_notification_status(
notification=notification,
status=status,
notification_statistics_status=notification_statistics_status
status=status
)
@statsd(namespace="dao")
@transactional
def update_notification_status_by_reference(reference, status, notification_statistics_status):
notification = Notification.query.filter(Notification.reference == reference,
or_(Notification.status == 'sending',
Notification.status == 'pending')
).first()
def update_notification_status_by_reference(reference, status):
notification = Notification.query.filter(
Notification.reference == reference,
or_(Notification.status == 'sending',
Notification.status == 'pending')).first()
if not notification:
return False
return _update_notification_status(
notification=notification,
status=status,
notification_statistics_status=notification_statistics_status
status=status
)
@@ -275,33 +186,6 @@ def dao_update_notification(notification):
db.session.commit()
@statsd(namespace="dao")
@transactional
def update_provider_stats(
id_,
notification_type,
provider_name,
billable_units=1):
notification = Notification.query.filter(Notification.id == id_).one()
provider = ProviderDetails.query.filter_by(identifier=provider_name).one()
update_count = db.session.query(ProviderStatistics).filter_by(
day=date.today(),
service_id=notification.service_id,
provider_id=provider.id
).update({'unit_count': ProviderStatistics.unit_count + billable_units})
if update_count == 0:
provider_stats = ProviderStatistics(
day=notification.created_at.date(),
service_id=notification.service_id,
provider_id=provider.id,
unit_count=billable_units
)
db.session.add(provider_stats)
@statsd(namespace="dao")
def get_notification_for_job(service_id, job_id, notification_id):
return Notification.query.filter_by(service_id=service_id, job_id=job_id, id=notification_id).one()

View File

@@ -511,7 +511,9 @@ class NotificationHistory(db.Model):
@classmethod
def from_notification(cls, notification):
return cls(**{c.name: getattr(notification, c.name) for c in cls.__table__.columns})
history = cls(**{c.name: getattr(notification, c.name) for c in cls.__table__.columns})
history.template = notification.template
return history
def update_from_notification(self, notification):
for c in self.__table__.columns:

View File

@@ -6,9 +6,10 @@ from app.dao import notifications_dao
from app.clients.sms.firetext import get_firetext_responses
from app.clients.sms.mmg import get_mmg_responses
sms_response_mapper = {'MMG': get_mmg_responses,
'Firetext': get_firetext_responses
}
sms_response_mapper = {
'MMG': get_mmg_responses,
'Firetext': get_firetext_responses
}
def validate_callback_data(data, fields, client_name):
@@ -49,14 +50,11 @@ def process_sms_client_response(status, reference, client_name):
return success, msg
notification_status = response_dict['notification_status']
notification_statistics_status = response_dict['notification_statistics_status']
notification_status_message = response_dict['message']
notification_success = response_dict['success']
# record stats
if not notifications_dao.update_notification_status_by_id(reference,
notification_status,
notification_statistics_status):
if not notifications_dao.update_notification_status_by_id(reference, notification_status):
status_error = "{} callback failed: notification {} either not found or already updated " \
"from sending. Status {}".format(client_name,
reference,
@@ -69,6 +67,6 @@ def process_sms_client_response(status, reference, client_name):
reference,
notification_status_message))
statsd_client.incr('callback.{}.{}'.format(client_name.lower(), notification_statistics_status))
statsd_client.incr('callback.{}.{}'.format(client_name.lower(), notification_status))
success = "{} callback succeeded. reference {} updated".format(client_name, reference)
return success, errors

View File

@@ -14,6 +14,7 @@ from notifications_utils.template import Template
from notifications_utils.renderers import PassThrough
from app.clients.email.aws_ses import get_aws_responses
from app import api_user, encryption, create_uuid, DATETIME_FORMAT, DATE_FORMAT, statsd_client
from app.dao.services_dao import dao_fetch_todays_stats_for_service
from app.models import KEY_TYPE_TEAM
from app.dao import (
templates_dao,
@@ -74,7 +75,6 @@ def process_ses_response():
raise InvalidRequest(error, status_code=400)
notification_status = aws_response_dict['notification_status']
notification_statistics_status = aws_response_dict['notification_statistics_status']
try:
source = ses_message['mail']['source']
@@ -89,8 +89,7 @@ def process_ses_response():
reference = ses_message['mail']['messageId']
if not notifications_dao.update_notification_status_by_reference(
reference,
notification_status,
notification_statistics_status
notification_status
):
error = "SES callback failed: notification either not found or already updated " \
"from sending. Status {}".format(notification_status)
@@ -104,7 +103,7 @@ def process_ses_response():
)
)
statsd_client.incr('callback.ses.{}'.format(notification_statistics_status))
statsd_client.incr('callback.ses.{}'.format(notification_status))
return jsonify(
result="success", message="SES callback succeeded"
), 200
@@ -214,18 +213,11 @@ def send_notification(notification_type):
service_id = str(api_user.service_id)
service = services_dao.dao_fetch_service_by_id(service_id)
service_stats = notifications_dao.dao_get_notification_statistics_for_service_and_day(
service_id,
datetime.today().strftime(DATE_FORMAT)
)
service_stats = sum(row.count for row in dao_fetch_todays_stats_for_service(service.id))
if service_stats:
total_sms_count = service_stats.sms_requested
total_email_count = service_stats.emails_requested
if (total_email_count + total_sms_count >= service.message_limit):
error = 'Exceeded send limits ({}) for today'.format(service.message_limit)
raise InvalidRequest(error, status_code=429)
if service_stats >= service.message_limit:
error = 'Exceeded send limits ({}) for today'.format(service.message_limit)
raise InvalidRequest(error, status_code=429)
notification, errors = (
sms_template_notification_schema if notification_type == SMS_TYPE else email_notification_schema