2017-05-11 15:22:43 +01:00
|
|
|
from datetime import datetime, timedelta
|
2017-05-12 12:19:27 +01:00
|
|
|
from itertools import groupby
|
2017-05-09 18:16:44 +01:00
|
|
|
|
2017-05-09 14:06:27 +01:00
|
|
|
from flask import current_app
|
2017-05-12 12:19:27 +01:00
|
|
|
from sqlalchemy import func
|
2017-05-09 14:41:29 +01:00
|
|
|
from sqlalchemy.exc import IntegrityError, SQLAlchemyError
|
2017-05-09 12:30:31 +01:00
|
|
|
|
2017-05-09 14:06:27 +01:00
|
|
|
from app import db
|
|
|
|
|
from app.dao.dao_utils import transactional
|
|
|
|
|
from app.models import (
|
|
|
|
|
JobStatistics,
|
2017-05-12 12:19:27 +01:00
|
|
|
Notification,
|
2017-05-09 14:06:27 +01:00
|
|
|
EMAIL_TYPE,
|
|
|
|
|
SMS_TYPE,
|
|
|
|
|
LETTER_TYPE,
|
|
|
|
|
NOTIFICATION_STATUS_TYPES_FAILED,
|
2017-05-16 12:49:20 +01:00
|
|
|
NOTIFICATION_STATUS_SUCCESS,
|
2017-05-11 12:09:57 +01:00
|
|
|
NOTIFICATION_DELIVERED,
|
|
|
|
|
NOTIFICATION_SENT)
|
2017-05-09 14:06:27 +01:00
|
|
|
from app.statsd_decorators import statsd
|
2017-05-09 12:30:31 +01:00
|
|
|
|
2017-05-09 14:06:27 +01:00
|
|
|
|
2017-05-11 15:22:43 +01:00
|
|
|
@transactional
|
|
|
|
|
def timeout_job_counts(notifications_type, timeout_start):
|
2017-05-12 12:19:27 +01:00
|
|
|
total_updated = 0
|
|
|
|
|
|
2017-05-11 15:22:43 +01:00
|
|
|
sent = columns(notifications_type, 'sent')
|
|
|
|
|
delivered = columns(notifications_type, 'delivered')
|
|
|
|
|
failed = columns(notifications_type, 'failed')
|
|
|
|
|
|
2017-05-12 12:19:27 +01:00
|
|
|
results = db.session.query(
|
|
|
|
|
JobStatistics.job_id.label('job_id'),
|
|
|
|
|
func.count(Notification.status).label('count'),
|
|
|
|
|
Notification.status.label('status')
|
|
|
|
|
).filter(
|
2017-05-16 12:49:20 +01:00
|
|
|
Notification.notification_type == notifications_type,
|
2017-05-12 12:19:27 +01:00
|
|
|
JobStatistics.job_id == Notification.job_id,
|
2017-05-11 15:22:43 +01:00
|
|
|
JobStatistics.created_at < timeout_start,
|
|
|
|
|
sent != failed + delivered
|
2017-05-12 12:19:27 +01:00
|
|
|
).group_by(Notification.status, JobStatistics.job_id).order_by(JobStatistics.job_id).all()
|
|
|
|
|
|
|
|
|
|
sort = sorted(results, key=lambda result: result.job_id)
|
|
|
|
|
groups = []
|
|
|
|
|
for k, g in groupby(sort, key=lambda result: result.job_id):
|
|
|
|
|
groups.append(list(g))
|
|
|
|
|
|
|
|
|
|
for job in groups:
|
|
|
|
|
sent_count = 0
|
|
|
|
|
delivered_count = 0
|
|
|
|
|
failed_count = 0
|
|
|
|
|
for notification_status in job:
|
2017-05-16 12:49:20 +01:00
|
|
|
if notification_status.status in NOTIFICATION_STATUS_SUCCESS:
|
2017-05-12 12:19:27 +01:00
|
|
|
delivered_count += notification_status.count
|
|
|
|
|
else:
|
|
|
|
|
failed_count += notification_status.count
|
|
|
|
|
sent_count += notification_status.count
|
|
|
|
|
|
|
|
|
|
total_updated += JobStatistics.query.filter_by(
|
|
|
|
|
job_id=notification_status.job_id
|
|
|
|
|
).update({
|
|
|
|
|
sent: sent_count,
|
|
|
|
|
failed: failed_count,
|
|
|
|
|
delivered: delivered_count
|
|
|
|
|
}, synchronize_session=False)
|
|
|
|
|
return total_updated
|
2017-05-11 15:22:43 +01:00
|
|
|
|
|
|
|
|
|
|
|
|
|
@statsd(namespace="dao")
|
2017-05-12 12:19:27 +01:00
|
|
|
def dao_timeout_job_statistics(timeout_period):
|
2017-05-11 15:22:43 +01:00
|
|
|
timeout_start = datetime.utcnow() - timedelta(seconds=timeout_period)
|
|
|
|
|
sms_count = timeout_job_counts(SMS_TYPE, timeout_start)
|
|
|
|
|
email_count = timeout_job_counts(EMAIL_TYPE, timeout_start)
|
|
|
|
|
return sms_count + email_count
|
|
|
|
|
|
|
|
|
|
|
2017-05-09 14:06:27 +01:00
|
|
|
@statsd(namespace="dao")
|
|
|
|
|
def create_or_update_job_sending_statistics(notification):
|
|
|
|
|
if __update_job_stats_sent_count(notification) == 0:
|
|
|
|
|
try:
|
|
|
|
|
__insert_job_stats(notification)
|
2017-05-09 14:41:29 +01:00
|
|
|
except IntegrityError as e:
|
2017-05-09 14:06:27 +01:00
|
|
|
current_app.logger.exception(e)
|
2017-05-09 14:41:29 +01:00
|
|
|
if __update_job_stats_sent_count(notification) == 0:
|
|
|
|
|
raise SQLAlchemyError("Failed to create job statistics for {}".format(notification.job_id))
|
2017-05-09 14:06:27 +01:00
|
|
|
|
|
|
|
|
|
2017-05-09 18:16:44 +01:00
|
|
|
@transactional
|
2017-05-09 14:06:27 +01:00
|
|
|
def __update_job_stats_sent_count(notification):
|
2017-05-11 12:09:57 +01:00
|
|
|
column = columns(notification.notification_type, 'sent')
|
|
|
|
|
|
2017-05-09 14:06:27 +01:00
|
|
|
return db.session.query(JobStatistics).filter_by(
|
|
|
|
|
job_id=notification.job_id,
|
2017-05-11 12:09:57 +01:00
|
|
|
).update({
|
|
|
|
|
column: column + 1
|
|
|
|
|
})
|
2017-05-09 14:06:27 +01:00
|
|
|
|
|
|
|
|
|
2017-05-09 14:41:29 +01:00
|
|
|
@transactional
|
2017-05-09 14:06:27 +01:00
|
|
|
def __insert_job_stats(notification):
|
|
|
|
|
stats = JobStatistics(
|
|
|
|
|
job_id=notification.job_id,
|
|
|
|
|
emails_sent=1 if notification.notification_type == EMAIL_TYPE else 0,
|
|
|
|
|
sms_sent=1 if notification.notification_type == SMS_TYPE else 0,
|
2017-05-09 18:16:44 +01:00
|
|
|
letters_sent=1 if notification.notification_type == LETTER_TYPE else 0,
|
|
|
|
|
updated_at=datetime.utcnow()
|
2017-05-09 14:06:27 +01:00
|
|
|
)
|
|
|
|
|
db.session.add(stats)
|
|
|
|
|
|
|
|
|
|
|
2017-05-11 12:09:57 +01:00
|
|
|
def columns(notification_type, status):
|
|
|
|
|
keys = {
|
|
|
|
|
EMAIL_TYPE: {
|
|
|
|
|
'failed': JobStatistics.emails_failed,
|
|
|
|
|
'delivered': JobStatistics.emails_delivered,
|
|
|
|
|
'sent': JobStatistics.emails_sent
|
|
|
|
|
},
|
|
|
|
|
SMS_TYPE: {
|
|
|
|
|
'failed': JobStatistics.sms_failed,
|
|
|
|
|
'delivered': JobStatistics.sms_delivered,
|
|
|
|
|
'sent': JobStatistics.sms_sent
|
|
|
|
|
},
|
|
|
|
|
LETTER_TYPE: {
|
|
|
|
|
'failed': JobStatistics.letters_failed,
|
|
|
|
|
'sent': JobStatistics.letters_sent
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return keys.get(notification_type).get(status)
|
|
|
|
|
|
|
|
|
|
|
2017-05-09 18:16:44 +01:00
|
|
|
@transactional
|
2017-05-09 14:06:27 +01:00
|
|
|
def update_job_stats_outcome_count(notification):
|
|
|
|
|
if notification.status in NOTIFICATION_STATUS_TYPES_FAILED:
|
2017-05-11 12:09:57 +01:00
|
|
|
column = columns(notification.notification_type, 'failed')
|
2017-05-09 14:06:27 +01:00
|
|
|
|
2017-05-11 12:09:57 +01:00
|
|
|
elif notification.status in [NOTIFICATION_DELIVERED,
|
|
|
|
|
NOTIFICATION_SENT] and notification.notification_type != LETTER_TYPE:
|
|
|
|
|
column = columns(notification.notification_type, 'delivered')
|
|
|
|
|
|
|
|
|
|
else:
|
|
|
|
|
column = None
|
2017-05-09 14:06:27 +01:00
|
|
|
|
2017-05-11 12:09:57 +01:00
|
|
|
if column:
|
2017-05-09 14:06:27 +01:00
|
|
|
return db.session.query(JobStatistics).filter_by(
|
|
|
|
|
job_id=notification.job_id,
|
2017-05-11 12:09:57 +01:00
|
|
|
).update({
|
|
|
|
|
column: column + 1
|
|
|
|
|
})
|
2017-05-09 14:06:27 +01:00
|
|
|
else:
|
|
|
|
|
return 0
|