mirror of
https://github.com/GSA/notifications-api.git
synced 2026-02-01 15:46:07 -05:00
Merge pull request #3157 from alphagov/persist-performance-data
Persist the processing time statistics to the database.
This commit is contained in:
@@ -30,6 +30,7 @@ from app.dao.fact_billing_dao import (
|
||||
get_service_ids_that_need_billing_populated,
|
||||
update_fact_billing,
|
||||
)
|
||||
from app.dao.fact_processing_time_dao import insert_update_processing_time
|
||||
from app.dao.jobs_dao import dao_get_job_by_id
|
||||
from app.dao.organisation_dao import dao_get_organisation_by_email_address, dao_add_service_to_organisation
|
||||
|
||||
@@ -57,7 +58,7 @@ from app.models import (
|
||||
Domain,
|
||||
Service,
|
||||
EmailBranding,
|
||||
LetterBranding,
|
||||
LetterBranding, FactProcessingTime,
|
||||
)
|
||||
from app.performance_platform.processing_time import send_processing_time_for_start_and_end
|
||||
from app.utils import DATETIME_FORMAT, get_london_midnight_in_utc, get_midnight_for_day_before
|
||||
@@ -269,7 +270,7 @@ def backfill_processing_time(start_date, end_date):
|
||||
process_start_date.isoformat(),
|
||||
process_end_date.isoformat()
|
||||
))
|
||||
send_processing_time_for_start_and_end(process_start_date, process_end_date)
|
||||
send_processing_time_for_start_and_end(process_start_date, process_end_date, process_date)
|
||||
|
||||
|
||||
@notify_command(name='populate-annual-billing')
|
||||
@@ -927,3 +928,78 @@ def process_row_from_job(job_id, job_row_number):
|
||||
notification_id = process_row(row, template, job, job.service)
|
||||
current_app.logger.info("Process row {} for job {} created notification_id: {}".format(
|
||||
job_row_number, job_id, notification_id))
|
||||
|
||||
|
||||
@notify_command(name='load-processing-time-data')
|
||||
@click.option('-f', '--file_name', required=True, help='Text file contain json data for processing time')
|
||||
def load_processing_time_data(file_name):
|
||||
# This method loads the data from a text file that was downloaded from
|
||||
# https://www.performance.service.gov.uk/data/govuk-notify/processing-time?flatten=true&duration=30&group_by=status&period=day&collect=count%3Asum&format=json ## noqa
|
||||
# The data is formatted as a json
|
||||
# {"data": [
|
||||
# {
|
||||
# "_count": 1.0,
|
||||
# "_end_at": "2021-01-27T00:00:00+00:00",
|
||||
# "_start_at": "2021-01-26T00:00:00+00:00",
|
||||
# "count:sum": 4024207.0,
|
||||
# "status": "messages-within-10-secs"
|
||||
# },
|
||||
# {
|
||||
# "_count": 1.0,
|
||||
# "_end_at": "2021-01-27T00:00:00+00:00",
|
||||
# "_start_at": "2021-01-26T00:00:00+00:00",
|
||||
# "count:sum": 4243204.0,
|
||||
# "status": "messages-total"
|
||||
# },
|
||||
# ]}
|
||||
#
|
||||
# Using the fact_processing_time_dao.insert_update_processing_time means if this method is run more than once
|
||||
# it will not throw an exception.
|
||||
|
||||
file = open(file_name)
|
||||
|
||||
file_contents = ""
|
||||
for line in file:
|
||||
file_contents += line
|
||||
data = json.loads(file_contents)
|
||||
normalised = []
|
||||
|
||||
class ProcesingTimeData:
|
||||
bst_date = datetime(1990, 1, 1).date()
|
||||
messages_total = 0
|
||||
messages_within_10_secs = 0
|
||||
|
||||
def __eq__(self, obj):
|
||||
return isinstance(obj, ProcesingTimeData) and obj.bst_date == self.bst_date
|
||||
|
||||
def set_bst_date(self, value):
|
||||
self.bst_date = value
|
||||
|
||||
def set_m(self, status, value):
|
||||
if status == 'messages-total':
|
||||
self.messages_total = value
|
||||
elif status == 'messages-within-10-secs':
|
||||
self.messages_within_10_secs = value
|
||||
|
||||
for entry in data['data']:
|
||||
bst_date = datetime.strptime(entry['_start_at'][0:10], "%Y-%m-%d").date()
|
||||
status = entry['status']
|
||||
value = entry['count:sum']
|
||||
obj = ProcesingTimeData()
|
||||
obj.set_bst_date(bst_date)
|
||||
if obj in normalised:
|
||||
normalised[normalised.index(obj)].set_m(status, value)
|
||||
else:
|
||||
d = ProcesingTimeData()
|
||||
d.set_bst_date(bst_date)
|
||||
d.set_m(status, value)
|
||||
normalised.append(d)
|
||||
for n in normalised:
|
||||
print(n.bst_date, n.messages_total, n.messages_within_10_secs)
|
||||
fact_processing_time = FactProcessingTime(bst_date=n.bst_date,
|
||||
messages_total=n.messages_total,
|
||||
messages_within_10_secs=n.messages_within_10_secs
|
||||
)
|
||||
insert_update_processing_time(fact_processing_time)
|
||||
|
||||
print("Done loading processing time data.")
|
||||
|
||||
32
app/dao/fact_processing_time_dao.py
Normal file
32
app/dao/fact_processing_time_dao.py
Normal file
@@ -0,0 +1,32 @@
|
||||
from datetime import datetime
|
||||
|
||||
from sqlalchemy.dialects.postgresql import insert
|
||||
|
||||
from app import db
|
||||
from app.dao.dao_utils import transactional
|
||||
from app.models import FactProcessingTime
|
||||
|
||||
|
||||
@transactional
|
||||
def insert_update_processing_time(processing_time):
|
||||
'''
|
||||
This uses the Postgres upsert to avoid race conditions when two threads try and insert
|
||||
at the same row. The excluded object refers to values that we tried to insert but were
|
||||
rejected.
|
||||
http://docs.sqlalchemy.org/en/latest/dialects/postgresql.html#insert-on-conflict-upsert
|
||||
'''
|
||||
table = FactProcessingTime.__table__
|
||||
stmt = insert(table).values(
|
||||
bst_date=processing_time.bst_date,
|
||||
messages_total=processing_time.messages_total,
|
||||
messages_within_10_secs=processing_time.messages_within_10_secs
|
||||
)
|
||||
stmt = stmt.on_conflict_do_update(
|
||||
index_elements=[table.c.bst_date],
|
||||
set_={
|
||||
'messages_total': stmt.excluded.messages_total,
|
||||
'messages_within_10_secs': stmt.excluded.messages_within_10_secs,
|
||||
'updated_at': datetime.utcnow()
|
||||
}
|
||||
)
|
||||
db.session.connection().execute(stmt)
|
||||
@@ -683,9 +683,9 @@ def dao_get_notifications_by_references(references):
|
||||
def dao_get_total_notifications_sent_per_day_for_performance_platform(start_date, end_date):
|
||||
"""
|
||||
SELECT
|
||||
count(notification_history),
|
||||
count(notifications),
|
||||
coalesce(sum(CASE WHEN sent_at - created_at <= interval '10 seconds' THEN 1 ELSE 0 END), 0)
|
||||
FROM notification_history
|
||||
FROM notifications
|
||||
WHERE
|
||||
created_at > 'START DATE' AND
|
||||
created_at < 'END DATE' AND
|
||||
|
||||
@@ -2068,6 +2068,16 @@ class FactNotificationStatus(db.Model):
|
||||
updated_at = db.Column(db.DateTime, nullable=True, onupdate=datetime.datetime.utcnow)
|
||||
|
||||
|
||||
class FactProcessingTime(db.Model):
|
||||
__tablename__ = "ft_processing_time"
|
||||
|
||||
bst_date = db.Column(db.Date, index=True, primary_key=True, nullable=False)
|
||||
messages_total = db.Column(db.Integer(), nullable=False)
|
||||
messages_within_10_secs = db.Column(db.Integer(), nullable=False)
|
||||
created_at = db.Column(db.DateTime, nullable=False, default=datetime.datetime.utcnow)
|
||||
updated_at = db.Column(db.DateTime, nullable=True, onupdate=datetime.datetime.utcnow)
|
||||
|
||||
|
||||
class Complaint(db.Model):
|
||||
__tablename__ = 'complaints'
|
||||
|
||||
|
||||
@@ -2,6 +2,8 @@ from datetime import timedelta
|
||||
|
||||
from flask import current_app
|
||||
|
||||
from app.dao.fact_processing_time_dao import insert_update_processing_time
|
||||
from app.models import FactProcessingTime
|
||||
from app.utils import get_london_midnight_in_utc
|
||||
from app.dao.notifications_dao import dao_get_total_notifications_sent_per_day_for_performance_platform
|
||||
from app import performance_platform_client
|
||||
@@ -11,10 +13,10 @@ def send_processing_time_to_performance_platform(bst_date):
|
||||
start_time = get_london_midnight_in_utc(bst_date)
|
||||
end_time = get_london_midnight_in_utc(bst_date + timedelta(days=1))
|
||||
|
||||
send_processing_time_for_start_and_end(start_time, end_time)
|
||||
send_processing_time_for_start_and_end(start_time, end_time, bst_date)
|
||||
|
||||
|
||||
def send_processing_time_for_start_and_end(start_time, end_time):
|
||||
def send_processing_time_for_start_and_end(start_time, end_time, bst_date):
|
||||
result = dao_get_total_notifications_sent_per_day_for_performance_platform(start_time, end_time)
|
||||
|
||||
current_app.logger.info(
|
||||
@@ -25,6 +27,11 @@ def send_processing_time_for_start_and_end(start_time, end_time):
|
||||
|
||||
send_processing_time_data(start_time, 'messages-total', result.messages_total)
|
||||
send_processing_time_data(start_time, 'messages-within-10-secs', result.messages_within_10_secs)
|
||||
insert_update_processing_time(FactProcessingTime(
|
||||
bst_date=bst_date,
|
||||
messages_total=result.messages_total,
|
||||
messages_within_10_secs=result.messages_within_10_secs)
|
||||
)
|
||||
|
||||
|
||||
def send_processing_time_data(start_time, status, count):
|
||||
|
||||
30
migrations/versions/0349_add_ft_processing_time.py
Normal file
30
migrations/versions/0349_add_ft_processing_time.py
Normal file
@@ -0,0 +1,30 @@
|
||||
"""
|
||||
|
||||
Revision ID: 0349_add_ft_processing_time
|
||||
Revises: 0348_migrate_broadcast_settings
|
||||
Create Date: 2021-02-22 14:05:24.775338
|
||||
|
||||
"""
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
from sqlalchemy.dialects import postgresql
|
||||
|
||||
revision = '0349_add_ft_processing_time'
|
||||
down_revision = '0348_migrate_broadcast_settings'
|
||||
|
||||
|
||||
def upgrade():
|
||||
op.create_table('ft_processing_time',
|
||||
sa.Column('bst_date', sa.Date(), nullable=False),
|
||||
sa.Column('messages_total', sa.Integer(), nullable=False),
|
||||
sa.Column('messages_within_10_secs', sa.Integer(), nullable=False),
|
||||
sa.Column('created_at', sa.DateTime(), nullable=False),
|
||||
sa.Column('updated_at', sa.DateTime(), nullable=True),
|
||||
sa.PrimaryKeyConstraint('bst_date')
|
||||
)
|
||||
op.create_index(op.f('ix_ft_processing_time_bst_date'), 'ft_processing_time', ['bst_date'], unique=False)
|
||||
|
||||
|
||||
def downgrade():
|
||||
op.drop_index(op.f('ix_ft_processing_time_bst_date'), table_name='ft_processing_time')
|
||||
op.drop_table('ft_processing_time')
|
||||
@@ -11,9 +11,9 @@ def test_backfill_processing_time_works_for_correct_dates(mocker, notify_api):
|
||||
backfill_processing_time.callback.__wrapped__(datetime(2017, 8, 1), datetime(2017, 8, 3))
|
||||
|
||||
assert send_mock.call_count == 3
|
||||
send_mock.assert_any_call(datetime(2017, 7, 31, 23, 0), datetime(2017, 8, 1, 23, 0))
|
||||
send_mock.assert_any_call(datetime(2017, 8, 1, 23, 0), datetime(2017, 8, 2, 23, 0))
|
||||
send_mock.assert_any_call(datetime(2017, 8, 2, 23, 0), datetime(2017, 8, 3, 23, 0))
|
||||
send_mock.assert_any_call(datetime(2017, 7, 31, 23, 0), datetime(2017, 8, 1, 23, 0), datetime(2017, 8, 2, 0, 0))
|
||||
send_mock.assert_any_call(datetime(2017, 8, 1, 23, 0), datetime(2017, 8, 2, 23, 0), datetime(2017, 8, 3, 0, 0))
|
||||
send_mock.assert_any_call(datetime(2017, 8, 2, 23, 0), datetime(2017, 8, 3, 23, 0), datetime(2017, 8, 4, 0, 0))
|
||||
|
||||
|
||||
def test_backfill_totals_works_for_correct_dates(mocker, notify_api):
|
||||
|
||||
42
tests/app/dao/test_fact_processing_time_dao.py
Normal file
42
tests/app/dao/test_fact_processing_time_dao.py
Normal file
@@ -0,0 +1,42 @@
|
||||
from datetime import datetime
|
||||
|
||||
from freezegun import freeze_time
|
||||
|
||||
from app.dao import fact_processing_time_dao
|
||||
from app.models import FactProcessingTime
|
||||
|
||||
|
||||
def test_insert_update_processing_time(notify_db_session):
|
||||
data = FactProcessingTime(
|
||||
bst_date=datetime(2021, 2, 22).date(),
|
||||
messages_total=3,
|
||||
messages_within_10_secs=2
|
||||
)
|
||||
|
||||
fact_processing_time_dao.insert_update_processing_time(data)
|
||||
|
||||
result = FactProcessingTime.query.all()
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0].bst_date == datetime(2021, 2, 22).date()
|
||||
assert result[0].messages_total == 3
|
||||
assert result[0].messages_within_10_secs == 2
|
||||
assert result[0].created_at
|
||||
assert not result[0].updated_at
|
||||
|
||||
data = FactProcessingTime(
|
||||
bst_date=datetime(2021, 2, 22).date(),
|
||||
messages_total=4,
|
||||
messages_within_10_secs=3
|
||||
)
|
||||
with freeze_time("2021-02-23 13:23:33"):
|
||||
fact_processing_time_dao.insert_update_processing_time(data)
|
||||
|
||||
result = FactProcessingTime.query.all()
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0].bst_date == datetime(2021, 2, 22).date()
|
||||
assert result[0].messages_total == 4
|
||||
assert result[0].messages_within_10_secs == 3
|
||||
assert result[0].created_at
|
||||
assert result[0].updated_at == datetime(2021, 2, 23, 13, 23, 33)
|
||||
@@ -2,6 +2,7 @@ from datetime import datetime, timedelta, date
|
||||
|
||||
from freezegun import freeze_time
|
||||
|
||||
from app.models import FactProcessingTime
|
||||
from tests.app.db import create_notification
|
||||
from app.performance_platform.processing_time import (
|
||||
send_processing_time_to_performance_platform,
|
||||
@@ -23,6 +24,11 @@ def test_send_processing_time_to_performance_platform_generates_correct_calls(mo
|
||||
|
||||
send_mock.assert_any_call(datetime(2016, 10, 16, 23, 0), 'messages-total', 2)
|
||||
send_mock.assert_any_call(datetime(2016, 10, 16, 23, 0), 'messages-within-10-secs', 1)
|
||||
persisted_to_db = FactProcessingTime.query.all()
|
||||
assert len(persisted_to_db) == 1
|
||||
assert persisted_to_db[0].bst_date == date(2016, 10, 17)
|
||||
assert persisted_to_db[0].messages_total == 2
|
||||
assert persisted_to_db[0].messages_within_10_secs == 1
|
||||
|
||||
|
||||
def test_send_processing_time_to_performance_platform_creates_correct_call_to_perf_platform(mocker):
|
||||
|
||||
Reference in New Issue
Block a user