track message costs

This commit is contained in:
Kenneth Kehl
2025-02-28 08:39:13 -08:00
parent e9e69777de
commit d6bb2d8fb0
7 changed files with 99 additions and 33 deletions

View File

@@ -266,7 +266,7 @@ def process_delivery_receipts(self):
cloudwatch = AwsCloudwatchClient() cloudwatch = AwsCloudwatchClient()
cloudwatch.init_app(current_app) cloudwatch.init_app(current_app)
start_time = aware_utcnow() - timedelta(minutes=3) start_time = aware_utcnow() - timedelta(minutes=30)
end_time = aware_utcnow() end_time = aware_utcnow()
delivered_receipts, failed_receipts = cloudwatch.check_delivery_receipts( delivered_receipts, failed_receipts = cloudwatch.check_delivery_receipts(
start_time, end_time start_time, end_time

View File

@@ -7,6 +7,7 @@ from flask import current_app
from app.clients import AWS_CLIENT_CONFIG, Client from app.clients import AWS_CLIENT_CONFIG, Client
from app.cloudfoundry_config import cloud_config from app.cloudfoundry_config import cloud_config
from app.utils import hilite
class AwsCloudwatchClient(Client): class AwsCloudwatchClient(Client):
@@ -107,6 +108,13 @@ class AwsCloudwatchClient(Client):
provider_response = self._aws_value_or_default( provider_response = self._aws_value_or_default(
event, "delivery", "providerResponse" event, "delivery", "providerResponse"
) )
message_cost = self._aws_value_or_default(event, "delivery", "priceInUSD")
if message_cost is None or message_cost == "":
message_cost = 0.0
else:
message_cost = float(message_cost)
current_app.logger.info(hilite(f"EVENT {event} message_cost = {message_cost}"))
my_timestamp = self._aws_value_or_default(event, "notification", "timestamp") my_timestamp = self._aws_value_or_default(event, "notification", "timestamp")
return { return {
"notification.messageId": event["notification"]["messageId"], "notification.messageId": event["notification"]["messageId"],
@@ -114,6 +122,7 @@ class AwsCloudwatchClient(Client):
"delivery.phoneCarrier": phone_carrier, "delivery.phoneCarrier": phone_carrier,
"delivery.providerResponse": provider_response, "delivery.providerResponse": provider_response,
"@timestamp": my_timestamp, "@timestamp": my_timestamp,
"delivery.priceInUSD": message_cost,
} }
# Here is an example of how to get the events with log insights # Here is an example of how to get the events with log insights

View File

@@ -507,7 +507,7 @@ def insert_notification_history_delete_notifications(
SELECT id, job_id, job_row_number, service_id, template_id, template_version, api_key_id, SELECT id, job_id, job_row_number, service_id, template_id, template_version, api_key_id,
key_type, notification_type, created_at, sent_at, sent_by, updated_at, reference, billable_units, key_type, notification_type, created_at, sent_at, sent_by, updated_at, reference, billable_units,
client_reference, international, phone_prefix, rate_multiplier, notification_status, client_reference, international, phone_prefix, rate_multiplier, notification_status,
created_by_id, document_download_count created_by_id, document_download_count, message_cost
FROM notifications FROM notifications
WHERE service_id = :service_id WHERE service_id = :service_id
AND notification_type = :notification_type AND notification_type = :notification_type
@@ -842,7 +842,6 @@ def dao_update_delivery_receipts(receipts, delivered):
new_receipts.append(r) new_receipts.append(r)
receipts = new_receipts receipts = new_receipts
id_to_carrier = { id_to_carrier = {
r["notification.messageId"]: r["delivery.phoneCarrier"] for r in receipts r["notification.messageId"]: r["delivery.phoneCarrier"] for r in receipts
} }
@@ -851,9 +850,13 @@ def dao_update_delivery_receipts(receipts, delivered):
} }
id_to_timestamp = {r["notification.messageId"]: r["@timestamp"] for r in receipts} id_to_timestamp = {r["notification.messageId"]: r["@timestamp"] for r in receipts}
id_to_message_cost = {
r["notification.messageId"]: r["delivery.priceInUSD"] for r in receipts
}
status_to_update_with = NotificationStatus.DELIVERED status_to_update_with = NotificationStatus.DELIVERED
if not delivered: if not delivered:
status_to_update_with = NotificationStatus.FAILED status_to_update_with = NotificationStatus.FAILED
stmt = ( stmt = (
update(Notification) update(Notification)
.where(Notification.message_id.in_(id_to_carrier.keys())) .where(Notification.message_id.in_(id_to_carrier.keys()))
@@ -877,6 +880,12 @@ def dao_update_delivery_receipts(receipts, delivered):
for key, value in id_to_provider_response.items() for key, value in id_to_provider_response.items()
] ]
), ),
message_cost=case(
*[
(Notification.message_id == key, value)
for key, value in id_to_message_cost.items()
]
),
) )
) )
db.session.execute(stmt) db.session.execute(stmt)
@@ -908,7 +917,7 @@ def dao_close_out_delivery_receipts():
def dao_batch_insert_notifications(batch): def dao_batch_insert_notifications(batch):
current_app.logger.info(f"ENTER DAO_BATCH_INSERT with batch {batch}")
db.session.bulk_save_objects(batch) db.session.bulk_save_objects(batch)
db.session.commit() db.session.commit()
current_app.logger.info(f"Batch inserted notifications: {len(batch)}") current_app.logger.info(f"Batch inserted notifications: {len(batch)}")

View File

@@ -534,7 +534,9 @@ def dao_fetch_stats_for_service_from_hours(service_id, start_date, end_date):
# Update to group by HOUR instead of DAY # Update to group by HOUR instead of DAY
total_substmt = ( total_substmt = (
select( select(
func.date_trunc("hour", NotificationAllTimeView.created_at).label("hour"), # UPDATED func.date_trunc("hour", NotificationAllTimeView.created_at).label(
"hour"
), # UPDATED
Job.notification_count.label("notification_count"), Job.notification_count.label("notification_count"),
) )
.join(Job, NotificationAllTimeView.job_id == Job.id) .join(Job, NotificationAllTimeView.job_id == Job.id)
@@ -556,11 +558,14 @@ def dao_fetch_stats_for_service_from_hours(service_id, start_date, end_date):
total_stmt = select( total_stmt = select(
total_substmt.c.hour, # UPDATED total_substmt.c.hour, # UPDATED
func.sum(total_substmt.c.notification_count).label("total_notifications"), func.sum(total_substmt.c.notification_count).label("total_notifications"),
).group_by(total_substmt.c.hour) # UPDATED ).group_by(
total_substmt.c.hour
) # UPDATED
# Ensure we're using hourly timestamps in the response # Ensure we're using hourly timestamps in the response
total_notifications = { total_notifications = {
row.hour: row.total_notifications for row in db.session.execute(total_stmt).all() row.hour: row.total_notifications
for row in db.session.execute(total_stmt).all()
} }
# Update the second query to also use "hour" # Update the second query to also use "hour"
@@ -568,7 +573,9 @@ def dao_fetch_stats_for_service_from_hours(service_id, start_date, end_date):
select( select(
NotificationAllTimeView.notification_type, NotificationAllTimeView.notification_type,
NotificationAllTimeView.status, NotificationAllTimeView.status,
func.date_trunc("hour", NotificationAllTimeView.created_at).label("hour"), # UPDATED func.date_trunc("hour", NotificationAllTimeView.created_at).label(
"hour"
), # UPDATED
func.count(NotificationAllTimeView.id).label("count"), func.count(NotificationAllTimeView.id).label("count"),
) )
.where( .where(
@@ -895,7 +902,9 @@ def get_specific_days_stats(
return stats return stats
def get_specific_hours_stats(data, start_date, hours=None, end_date=None, total_notifications=None): def get_specific_hours_stats(
data, start_date, hours=None, end_date=None, total_notifications=None
):
if hours is not None and end_date is not None: if hours is not None and end_date is not None:
raise ValueError("Only set hours OR set end_date, not both.") raise ValueError("Only set hours OR set end_date, not both.")
elif hours is not None: elif hours is not None:
@@ -919,10 +928,10 @@ def get_specific_hours_stats(data, start_date, hours=None, end_date=None, total_
# Format statistics, returning only hours with results # Format statistics, returning only hours with results
stats = { stats = {
hour.strftime("%Y-%m-%dT%H:00:00Z"): statistics.format_statistics( hour.strftime("%Y-%m-%dT%H:00:00Z"): statistics.format_statistics(
rows, rows, total_notifications.get(hour, 0) if total_notifications else None
total_notifications.get(hour, 0) if total_notifications else None
) )
for hour, rows in grouped_data.items() if rows for hour, rows in grouped_data.items()
if rows
} }
return stats return stats

View File

@@ -1508,6 +1508,7 @@ class Notification(db.Model):
created_at = db.Column(db.DateTime, index=True, unique=False, nullable=False) created_at = db.Column(db.DateTime, index=True, unique=False, nullable=False)
sent_at = db.Column(db.DateTime, index=False, unique=False, nullable=True) sent_at = db.Column(db.DateTime, index=False, unique=False, nullable=True)
sent_by = db.Column(db.String, nullable=True) sent_by = db.Column(db.String, nullable=True)
message_cost = db.Column(db.Float, nullable=True, default=0.0)
updated_at = db.Column( updated_at = db.Column(
db.DateTime, db.DateTime,
index=False, index=False,
@@ -1813,6 +1814,7 @@ class NotificationHistory(db.Model, HistoryModel):
created_at = db.Column(db.DateTime, unique=False, nullable=False) created_at = db.Column(db.DateTime, unique=False, nullable=False)
sent_at = db.Column(db.DateTime, index=False, unique=False, nullable=True) sent_at = db.Column(db.DateTime, index=False, unique=False, nullable=True)
sent_by = db.Column(db.String, nullable=True) sent_by = db.Column(db.String, nullable=True)
message_cost = db.Column(db.Float, nullable=True, default=0.0)
updated_at = db.Column( updated_at = db.Column(
db.DateTime, db.DateTime,
index=False, index=False,

View File

@@ -0,0 +1,23 @@
"""
Revision ID: 0415_add_message_cost
Revises: 0414_change_total_message_limit
Create Date: 2025-02-28 11:35:22.873930
"""
import sqlalchemy as sa
from alembic import op
down_revision = "0414_change_total_message_limit"
revision = "0415_add_message_cost"
def upgrade():
op.add_column("notifications", sa.Column("message_cost", sa.Float))
op.add_column("notification_history", sa.Column("message_cost", sa.Float))
def downgrade():
op.drop_column("notifications", "message_cost")
op.add_column("notification_history", sa.Column("message_cost", sa.Float))

View File

@@ -7,7 +7,9 @@ from app.dao.services_dao import get_specific_hours_stats
from app.enums import StatisticsType from app.enums import StatisticsType
from app.models import TemplateType from app.models import TemplateType
NotificationRow = namedtuple("NotificationRow", ["notification_type", "status", "timestamp", "count"]) NotificationRow = namedtuple(
"NotificationRow", ["notification_type", "status", "timestamp", "count"]
)
def generate_expected_hourly_output(requested_sms_hours): def generate_expected_hourly_output(requested_sms_hours):
@@ -38,27 +40,31 @@ def create_mock_notification(notification_type, status, timestamp, count=1):
notification_type=notification_type, notification_type=notification_type,
status=status, status=status,
timestamp=timestamp.replace(minute=0, second=0, microsecond=0), timestamp=timestamp.replace(minute=0, second=0, microsecond=0),
count=count count=count,
) )
test_cases = [ test_cases = [
( (
[create_mock_notification( [
TemplateType.SMS, create_mock_notification(
StatisticsType.REQUESTED, TemplateType.SMS,
datetime(2025, 2, 18, 14, 15, 0), StatisticsType.REQUESTED,
)], datetime(2025, 2, 18, 14, 15, 0),
)
],
datetime(2025, 2, 18, 12, 0), datetime(2025, 2, 18, 12, 0),
6, 6,
generate_expected_hourly_output(["2025-02-18T14:00:00Z"]), generate_expected_hourly_output(["2025-02-18T14:00:00Z"]),
), ),
( (
[create_mock_notification( [
TemplateType.SMS, create_mock_notification(
StatisticsType.REQUESTED, TemplateType.SMS,
datetime(2025, 2, 18, 17, 59, 59), StatisticsType.REQUESTED,
)], datetime(2025, 2, 18, 17, 59, 59),
)
],
datetime(2025, 2, 18, 15, 0), datetime(2025, 2, 18, 15, 0),
3, 3,
generate_expected_hourly_output(["2025-02-18T17:00:00Z"]), generate_expected_hourly_output(["2025-02-18T17:00:00Z"]),
@@ -66,21 +72,29 @@ test_cases = [
([], datetime(2025, 2, 18, 10, 0), 4, {}), ([], datetime(2025, 2, 18, 10, 0), 4, {}),
( (
[ [
create_mock_notification(TemplateType.SMS, StatisticsType.REQUESTED, datetime(2025, 2, 18, 9, 30, 0)), create_mock_notification(
create_mock_notification(TemplateType.SMS, StatisticsType.REQUESTED, datetime(2025, 2, 18, 11, 45, 0)), TemplateType.SMS,
StatisticsType.REQUESTED,
datetime(2025, 2, 18, 9, 30, 0),
),
create_mock_notification(
TemplateType.SMS,
StatisticsType.REQUESTED,
datetime(2025, 2, 18, 11, 45, 0),
),
], ],
datetime(2025, 2, 18, 8, 0), datetime(2025, 2, 18, 8, 0),
5, 5,
generate_expected_hourly_output(["2025-02-18T09:00:00Z", "2025-02-18T11:00:00Z"]), generate_expected_hourly_output(
["2025-02-18T09:00:00Z", "2025-02-18T11:00:00Z"]
),
), ),
] ]
@pytest.mark.parametrize("mocked_notifications, start_date, hours, expected_output", test_cases) @pytest.mark.parametrize(
"mocked_notifications, start_date, hours, expected_output", test_cases
)
def test_get_specific_hours(mocked_notifications, start_date, hours, expected_output): def test_get_specific_hours(mocked_notifications, start_date, hours, expected_output):
results = get_specific_hours_stats( results = get_specific_hours_stats(mocked_notifications, start_date, hours=hours)
mocked_notifications,
start_date,
hours=hours
)
assert results == expected_output, f"Expected {expected_output}, but got {results}" assert results == expected_output, f"Expected {expected_output}, but got {results}"