from datetime import datetime from sqlalchemy import Float, Integer from sqlalchemy import func, case, cast from sqlalchemy import literal_column from app import db from app.dao.date_util import get_financial_year from app.models import (NotificationHistory, Rate, NOTIFICATION_STATUS_TYPES_BILLABLE, KEY_TYPE_TEST, SMS_TYPE, EMAIL_TYPE) from app.statsd_decorators import statsd from app.utils import get_london_month_from_utc_column @statsd(namespace="dao") def get_yearly_billing_data(service_id, year): start_date, end_date = get_financial_year(year) rates = get_rates_for_year(start_date, end_date, SMS_TYPE) result = [] for r, n in zip(rates, rates[1:]): result.append( sms_yearly_billing_data_query(r.rate, service_id, r.valid_from, n.valid_from)) result.append(sms_yearly_billing_data_query(rates[-1].rate, service_id, rates[-1].valid_from, end_date)) result.append(email_yearly_billing_data_query(service_id, start_date, end_date)) return sum(result, []) @statsd(namespace="dao") def get_monthly_billing_data(service_id, year): start_date, end_date = get_financial_year(year) rates = get_rates_for_year(start_date, end_date, SMS_TYPE) result = [] for r, n in zip(rates, rates[1:]): result.extend(sms_billing_data_per_month_query(r.rate, service_id, r.valid_from, n.valid_from)) result.extend(sms_billing_data_per_month_query(rates[-1].rate, service_id, rates[-1].valid_from, end_date)) return [(datetime.strftime(x[0], "%B"), x[1], x[2], x[3], x[4], x[5]) for x in result] def billing_data_filter(notification_type, start_date, end_date, service_id): return [ NotificationHistory.notification_type == notification_type, NotificationHistory.created_at >= start_date, NotificationHistory.created_at < end_date, NotificationHistory.service_id == service_id, NotificationHistory.status.in_(NOTIFICATION_STATUS_TYPES_BILLABLE), NotificationHistory.key_type != KEY_TYPE_TEST ] def email_yearly_billing_data_query(service_id, start_date, end_date, rate=0): result = db.session.query( func.count(NotificationHistory.id), func.count(NotificationHistory.id), rate_multiplier(), NotificationHistory.notification_type, NotificationHistory.international, cast(rate, Integer()) ).filter( *billing_data_filter(EMAIL_TYPE, start_date, end_date, service_id) ).group_by( NotificationHistory.notification_type, rate_multiplier(), NotificationHistory.international ).first() if not result: return [(0, 0, 1, EMAIL_TYPE, False, 0)] else: return [result] def sms_yearly_billing_data_query(rate, service_id, start_date, end_date): result = db.session.query( cast(func.sum(NotificationHistory.billable_units * rate_multiplier()), Integer()), func.sum(NotificationHistory.billable_units), rate_multiplier(), NotificationHistory.notification_type, NotificationHistory.international, cast(rate, Float()) ).filter( *billing_data_filter(SMS_TYPE, start_date, end_date, service_id) ).group_by( NotificationHistory.notification_type, NotificationHistory.international, rate_multiplier() ).order_by( rate_multiplier() ).all() if not result: return [(0, 0, 1, SMS_TYPE, False, rate)] else: return result def get_rates_for_year(start_date, end_date, notification_type): return Rate.query.filter(Rate.valid_from >= start_date, Rate.valid_from < end_date, Rate.notification_type == notification_type).order_by(Rate.valid_from).all() def sms_billing_data_per_month_query(rate, service_id, start_date, end_date): month = get_london_month_from_utc_column(NotificationHistory.created_at) result = db.session.query( month, func.sum(NotificationHistory.billable_units), rate_multiplier(), NotificationHistory.international, NotificationHistory.notification_type, cast(rate, Float()) ).filter( *billing_data_filter(SMS_TYPE, start_date, end_date, service_id) ).group_by( NotificationHistory.notification_type, month, NotificationHistory.rate_multiplier, NotificationHistory.international ).order_by( month, rate_multiplier() ).all() return result def rate_multiplier(): return cast(case([ (NotificationHistory.rate_multiplier == None, literal_column("'1'")), # noqa (NotificationHistory.rate_multiplier != None, NotificationHistory.rate_multiplier), # noqa ]), Integer())