From f84694fb29d7a2901a4616c0996a6afef9e8b0cb Mon Sep 17 00:00:00 2001 From: Martyn Inglis Date: Fri, 12 May 2017 12:19:27 +0100 Subject: [PATCH] updated the timeout query to base outcome on notifications Previous: assumed discrepancy in stats counts to be related to timeouts Now: If discrepancy exists do the math on the notifications for that job to work out counts based on statuses to redo stats. --- app/dao/statistics_dao.py | 44 +++++- tests/app/dao/test_statistics_dao.py | 215 ++++++++++++++++++--------- 2 files changed, 183 insertions(+), 76 deletions(-) diff --git a/app/dao/statistics_dao.py b/app/dao/statistics_dao.py index ca5e134a9..20c75112e 100644 --- a/app/dao/statistics_dao.py +++ b/app/dao/statistics_dao.py @@ -1,12 +1,15 @@ from datetime import datetime, timedelta +from itertools import groupby from flask import current_app +from sqlalchemy import func from sqlalchemy.exc import IntegrityError, SQLAlchemyError from app import db from app.dao.dao_utils import transactional from app.models import ( JobStatistics, + Notification, EMAIL_TYPE, SMS_TYPE, LETTER_TYPE, @@ -18,21 +21,50 @@ from app.statsd_decorators import statsd @transactional def timeout_job_counts(notifications_type, timeout_start): + total_updated = 0 + sent = columns(notifications_type, 'sent') delivered = columns(notifications_type, 'delivered') failed = columns(notifications_type, 'failed') - query = JobStatistics.query.filter( + results = db.session.query( + JobStatistics.job_id.label('job_id'), + func.count(Notification.status).label('count'), + Notification.status.label('status') + ).filter( + JobStatistics.job_id == Notification.job_id, JobStatistics.created_at < timeout_start, sent != failed + delivered - ) - return query.update( - {failed: sent - delivered}, synchronize_session=False - ) + ).group_by(Notification.status, JobStatistics.job_id).order_by(JobStatistics.job_id).all() + + sort = sorted(results, key=lambda result: result.job_id) + groups = [] + for k, g in groupby(sort, key=lambda result: result.job_id): + groups.append(list(g)) + + for job in groups: + sent_count = 0 + delivered_count = 0 + failed_count = 0 + for notification_status in job: + if notification_status.status in [NOTIFICATION_DELIVERED, NOTIFICATION_SENT]: + delivered_count += notification_status.count + else: + failed_count += notification_status.count + sent_count += notification_status.count + + total_updated += JobStatistics.query.filter_by( + job_id=notification_status.job_id + ).update({ + sent: sent_count, + failed: failed_count, + delivered: delivered_count + }, synchronize_session=False) + return total_updated @statsd(namespace="dao") -def timeout_job_statistics(timeout_period): +def dao_timeout_job_statistics(timeout_period): timeout_start = datetime.utcnow() - timedelta(seconds=timeout_period) sms_count = timeout_job_counts(SMS_TYPE, timeout_start) email_count = timeout_job_counts(EMAIL_TYPE, timeout_start) diff --git a/tests/app/dao/test_statistics_dao.py b/tests/app/dao/test_statistics_dao.py index 7aa0b0fd8..128c595d3 100644 --- a/tests/app/dao/test_statistics_dao.py +++ b/tests/app/dao/test_statistics_dao.py @@ -7,7 +7,7 @@ from sqlalchemy.exc import IntegrityError, SQLAlchemyError from app.dao.statistics_dao import ( create_or_update_job_sending_statistics, update_job_stats_outcome_count, - timeout_job_statistics) + dao_timeout_job_statistics) from app.models import ( JobStatistics, SMS_TYPE, @@ -17,8 +17,9 @@ from app.models import ( NOTIFICATION_TECHNICAL_FAILURE, NOTIFICATION_TEMPORARY_FAILURE, NOTIFICATION_PERMANENT_FAILURE, - NOTIFICATION_PENDING, NOTIFICATION_CREATED, NOTIFICATION_FAILED, NOTIFICATION_SENT, NOTIFICATION_SENDING) -from tests.app.conftest import sample_notification, sample_email_template, sample_template + NOTIFICATION_PENDING, NOTIFICATION_CREATED, NOTIFICATION_FAILED, NOTIFICATION_SENT, NOTIFICATION_SENDING, + NOTIFICATION_STATUS_TYPES_COMPLETED, Notification, NOTIFICATION_STATUS_TYPES, NOTIFICATION_STATUS_SUCCESS) +from tests.app.conftest import sample_notification, sample_email_template, sample_template, sample_job @pytest.mark.parametrize('notification_type, sms_count, email_count, letter_count', [ @@ -36,8 +37,6 @@ def test_should_create_a_stats_entry_for_a_job( email_count, letter_count ): - assert not len(JobStatistics.query.all()) - template = None if notification_type == SMS_TYPE: @@ -88,8 +87,6 @@ def test_should_update_a_stats_entry_for_a_job( email_count, letter_count ): - assert not len(JobStatistics.query.all()) - template = None if notification_type == SMS_TYPE: @@ -160,8 +157,6 @@ def test_should_update_a_stats_entry_with_its_success_outcome_for_a_job( email_count, letter_count ): - assert not len(JobStatistics.query.all()) - template = None if notification_type == SMS_TYPE: @@ -227,8 +222,6 @@ def test_should_update_a_stats_entry_with_its_error_outcomes_for_a_job( letter_count, status ): - assert not len(JobStatistics.query.all()) - template = None if notification_type == SMS_TYPE: @@ -291,8 +284,6 @@ def test_should_update_a_stats_entry_with_its_success_outcomes_for_a_job( letter_count, status ): - assert not len(JobStatistics.query.all()) - template = None if notification_type == SMS_TYPE: @@ -361,8 +352,6 @@ def test_should_not_update_job_stats_if_irrelevant_status( letter_count, status ): - assert not len(JobStatistics.query.all()) - template = None if notification_type == SMS_TYPE: @@ -421,8 +410,6 @@ def test_inserting_one_type_of_notification_maintains_other_counts( email_count, letter_count ): - assert not len(JobStatistics.query.all()) - sms_template = sample_template(notify_db, notify_db_session, service=sample_job.service) email_template = sample_email_template(notify_db, notify_db_session, service=sample_job.service) letter_template = sample_letter_template @@ -500,8 +487,6 @@ def test_updating_one_type_of_notification_to_success_maintains_other_counts( sample_job, sample_letter_template ): - assert not len(JobStatistics.query.all()) - sms_template = sample_template(notify_db, notify_db_session, service=sample_job.service) email_template = sample_email_template(notify_db, notify_db_session, service=sample_job.service) letter_template = sample_letter_template @@ -560,8 +545,6 @@ def test_updating_one_type_of_notification_to_error_maintains_other_counts( sample_job, sample_letter_template ): - assert not len(JobStatistics.query.all()) - sms_template = sample_template(notify_db, notify_db_session, service=sample_job.service) email_template = sample_email_template(notify_db, notify_db_session, service=sample_job.service) letter_template = sample_letter_template @@ -616,54 +599,6 @@ def test_updating_one_type_of_notification_to_error_maintains_other_counts( assert stats[0].emails_failed == 1 -def test_will_timeout_job_counts_after_notification_timeouts(notify_db, notify_db_session, sample_job): - sms_template = sample_template(notify_db, notify_db_session, service=sample_job.service) - email_template = sample_email_template(notify_db, notify_db_session, service=sample_job.service) - - one_minute_ago = datetime.utcnow() - timedelta(minutes=1) - - sms = sample_notification( - notify_db, - notify_db_session, - service=sample_job.service, - template=sms_template, - job=sample_job, - status=NOTIFICATION_CREATED - ) - - email = sample_notification( - notify_db, - notify_db_session, - service=sample_job.service, - template=email_template, - job=sample_job, - status=NOTIFICATION_CREATED - ) - - create_or_update_job_sending_statistics(email) - create_or_update_job_sending_statistics(sms) - - JobStatistics.query.update({JobStatistics.created_at: one_minute_ago}) - - intial_stats = JobStatistics.query.all() - - assert intial_stats[0].emails_sent == 1 - assert intial_stats[0].sms_sent == 1 - assert intial_stats[0].emails_delivered == 0 - assert intial_stats[0].sms_delivered == 0 - assert intial_stats[0].sms_failed == 0 - assert intial_stats[0].emails_failed == 0 - - timeout_job_statistics(59) - updated_stats = JobStatistics.query.all() - assert updated_stats[0].emails_sent == 1 - assert updated_stats[0].sms_sent == 1 - assert updated_stats[0].emails_delivered == 0 - assert updated_stats[0].sms_delivered == 0 - assert updated_stats[0].sms_failed == 1 - assert updated_stats[0].emails_failed == 1 - - def test_will_not_timeout_job_counts_before_notification_timeouts(notify_db, notify_db_session, sample_job): sms_template = sample_template(notify_db, notify_db_session, service=sample_job.service) email_template = sample_email_template(notify_db, notify_db_session, service=sample_job.service) @@ -702,7 +637,7 @@ def test_will_not_timeout_job_counts_before_notification_timeouts(notify_db, not assert intial_stats[0].sms_failed == 0 assert intial_stats[0].emails_failed == 0 - timeout_job_statistics(61) + dao_timeout_job_statistics(61) updated_stats = JobStatistics.query.all() assert updated_stats[0].emails_sent == 1 assert updated_stats[0].sms_sent == 1 @@ -710,3 +645,143 @@ def test_will_not_timeout_job_counts_before_notification_timeouts(notify_db, not assert updated_stats[0].sms_delivered == 0 assert updated_stats[0].sms_failed == 0 assert updated_stats[0].emails_failed == 0 + + +@pytest.mark.parametrize('notification_type, sms_count, email_count', [ + (SMS_TYPE, 3, 0), + (EMAIL_TYPE, 0, 3), +]) +def test_timeout_job_counts_timesout_multiple_jobs( + notify_db, notify_db_session, notification_type, sms_count, email_count +): + one_minute_ago = datetime.utcnow() - timedelta(minutes=1) + + job_1 = sample_job(notify_db, notify_db_session) + job_2 = sample_job(notify_db, notify_db_session) + job_3 = sample_job(notify_db, notify_db_session) + + jobs = [job_1, job_2, job_3] + + for job in jobs: + if notification_type == EMAIL_TYPE: + template = sample_email_template(notify_db, notify_db_session, service=job.service) + else: + template = sample_template(notify_db, notify_db_session, service=job.service) + + for i in range(3): + n = sample_notification( + notify_db, + notify_db_session, + service=job.service, + template=template, + job=job, + status=NOTIFICATION_CREATED + ) + create_or_update_job_sending_statistics(n) + + JobStatistics.query.update({JobStatistics.created_at: one_minute_ago}) + initial_stats = JobStatistics.query.all() + for stats in initial_stats: + assert stats.emails_sent == email_count + assert stats.sms_sent == sms_count + assert stats.emails_delivered == 0 + assert stats.sms_delivered == 0 + assert stats.sms_failed == 0 + assert stats.emails_failed == 0 + + dao_timeout_job_statistics(1) + updated_stats = JobStatistics.query.all() + for stats in updated_stats: + assert stats.emails_sent == email_count + assert stats.sms_sent == sms_count + assert stats.emails_delivered == 0 + assert stats.sms_delivered == 0 + assert stats.sms_failed == sms_count + assert stats.emails_failed == email_count + +count_notifications = len(NOTIFICATION_STATUS_TYPES) +count_success_notifications = len(NOTIFICATION_STATUS_SUCCESS) +count_error_notifications = len(NOTIFICATION_STATUS_TYPES) - len(NOTIFICATION_STATUS_SUCCESS) + + +def test_timeout_job_sets_all_non_delivered_emails_to_error( + notify_db, + notify_db_session, + sample_job +): + # Make a notification in every state + for i in range(len(NOTIFICATION_STATUS_TYPES)): + n = sample_notification( + notify_db, + notify_db_session, + service=sample_job.service, + template=sample_email_template(notify_db, notify_db_session, service=sample_job.service), + job=sample_job, + status=NOTIFICATION_STATUS_TYPES[i] + ) + create_or_update_job_sending_statistics(n) + + # fudge the created at time on the job stats table to make the eligible for timeout query + JobStatistics.query.update({JobStatistics.created_at: datetime.utcnow() - timedelta(minutes=1)}) + + # should have sent an email for every state (len(NOTIFICATION_STATUS_TYPES)) + initial_stats = JobStatistics.query.all() + for stats in initial_stats: + assert stats.emails_sent == count_notifications + assert stats.sms_sent == 0 + assert stats.emails_delivered == 0 + assert stats.sms_delivered == 0 + assert stats.sms_failed == 0 + assert stats.emails_failed == 0 + + # timeout the notifications + dao_timeout_job_statistics(1) + + # after timeout all delivered states are success and ALL other states are failed + updated_stats = JobStatistics.query.all() + for stats in updated_stats: + assert stats.emails_sent == count_notifications + assert stats.sms_sent == 0 + assert stats.emails_delivered == count_success_notifications + assert stats.sms_delivered == 0 + assert stats.sms_failed == 0 + assert stats.emails_failed == count_error_notifications + + +# this test is as above, but for SMS not email +def test_timeout_job_sets_all_non_delivered_states_to_error( + notify_db, + notify_db_session, + sample_job +): + for i in range(len(NOTIFICATION_STATUS_TYPES)): + n = sample_notification( + notify_db, + notify_db_session, + service=sample_job.service, + template=sample_template(notify_db, notify_db_session, service=sample_job.service), + job=sample_job, + status=NOTIFICATION_STATUS_TYPES[i] + ) + create_or_update_job_sending_statistics(n) + + JobStatistics.query.update({JobStatistics.created_at: datetime.utcnow() - timedelta(minutes=1)}) + initial_stats = JobStatistics.query.all() + for stats in initial_stats: + assert stats.emails_sent == 0 + assert stats.sms_sent == count_notifications + assert stats.emails_delivered == 0 + assert stats.sms_delivered == 0 + assert stats.sms_failed == 0 + assert stats.emails_failed == 0 + + dao_timeout_job_statistics(1) + updated_stats = JobStatistics.query.all() + + for stats in updated_stats: + assert stats.emails_sent == 0 + assert stats.sms_sent == count_notifications + assert stats.emails_delivered == 0 + assert stats.sms_delivered == count_success_notifications + assert stats.sms_failed == count_error_notifications + assert stats.emails_failed == 0