add GET /service/<id>/notifications/weekly

moved format_statistics to a new service/statistics.py file, and
refactored to share code. moved tests as well, to try and enforce
separation between the restful endpoints of rest.py and the logic/
data manipulation of statistics.py
This commit is contained in:
Leo Hemsted
2016-07-28 13:34:24 +01:00
parent 444132ad66
commit 8ad47481d7
4 changed files with 196 additions and 64 deletions

View File

@@ -1,4 +1,4 @@
from datetime import date
from datetime import date, timedelta
from flask import (
jsonify,
@@ -8,7 +8,6 @@ from flask import (
)
from sqlalchemy.orm.exc import NoResultFound
from app.models import EMAIL_TYPE, SMS_TYPE
from app.dao.api_key_dao import (
save_model_api_key,
get_model_api_keys,
@@ -23,7 +22,8 @@ from app.dao.services_dao import (
dao_add_user_to_service,
dao_remove_user_from_service,
dao_fetch_stats_for_service,
dao_fetch_todays_stats_for_service
dao_fetch_todays_stats_for_service,
dao_fetch_weekly_historical_stats_for_service
)
from app.dao import notifications_dao
from app.dao.provider_statistics_dao import get_fragment_count
@@ -236,29 +236,19 @@ def get_all_notifications_for_service(service_id):
), 200
@service.route('/<uuid:service_id>/notifications/weekly', methods=['GET'])
def get_weekly_notification_stats(service_id):
service = dao_fetch_service_by_id(service_id)
statistics = dao_fetch_weekly_historical_stats_for_service(service_id, created_at, preceeding_monday)
return jsonify(data=statistics.format_weekly_notification_stats(statistics))
def get_detailed_service(service_id, today_only=False):
service = dao_fetch_service_by_id(service_id)
stats_fn = dao_fetch_todays_stats_for_service if today_only else dao_fetch_stats_for_service
statistics = stats_fn(service_id)
service.statistics = format_statistics(statistics)
stats = stats_fn(service_id)
service.statistics = statistics.format_statistics(stats)
data = detailed_service_schema.dump(service).data
return jsonify(data=data)
def format_statistics(statistics):
# statistics come in a named tuple with uniqueness from 'notification_type', 'status' - however missing
# statuses/notification types won't be represented and the status types need to be simplified/summed up
# so we can return emails/sms * created, sent, and failed
counts = {
template_type: {
status: 0 for status in ('requested', 'delivered', 'failed')
} for template_type in (EMAIL_TYPE, SMS_TYPE)
}
for row in statistics:
counts[row.notification_type]['requested'] += row.count
if row.status == 'delivered':
counts[row.notification_type]['delivered'] += row.count
elif row.status in ('failed', 'technical-failure', 'temporary-failure', 'permanent-failure'):
counts[row.notification_type]['failed'] += row.count
return counts

51
app/service/statistics.py Normal file
View File

@@ -0,0 +1,51 @@
import itertools
from datetime import datetime, timedelta
from app.models import EMAIL_TYPE, SMS_TYPE
def format_statistics(statistics):
# statistics come in a named tuple with uniqueness from 'notification_type', 'status' - however missing
# statuses/notification types won't be represented and the status types need to be simplified/summed up
# so we can return emails/sms * created, sent, and failed
counts = _create_zeroed_stats_dicts()
for row in statistics:
_update_statuses_from_row(counts[row.notification_type], row)
return counts
def format_weekly_notification_stats(statistics, service_created_at):
preceeding_monday = service_created_at - timedelta(days=service_created_at.weekday())
week_dict = {
week: _create_zeroed_stats_dicts()
for week in _weeks_for_range(preceeding_monday, datetime.utcnow())
}
for row in statistics:
_update_statuses_from_row(week_dict[row.week_start][row.notification_type], row)
return week_dict
def _create_zeroed_stats_dicts():
return {
template_type: {
status: 0 for status in ('requested', 'delivered', 'failed')
} for template_type in (EMAIL_TYPE, SMS_TYPE)
}
def _update_statuses_from_row(update_dict, row):
update_dict['requested'] += row.count
if row.status == 'delivered':
update_dict['delivered'] += row.count
elif row.status in ('failed', 'technical-failure', 'temporary-failure', 'permanent-failure'):
update_dict['failed'] += row.count
def _weeks_for_range(start, end):
"""
Generator that yields dates from `start` to `end`, in 7 day intervals. End is inclusive.
"""
infinite_date_generator = (start + timedelta(days=i) for i in itertools.count(step=7))
return itertools.takewhile(lambda x: x <= end, infinite_date_generator)