from datetime import datetime, timedelta, date import uuid from functools import partial import pytest from freezegun import freeze_time from sqlalchemy.exc import SQLAlchemyError, IntegrityError from app import db from app.models import ( Notification, Job, NotificationStatistics, TemplateStatistics, NOTIFICATION_STATUS_TYPES ) from app.dao.notifications_dao import ( dao_create_notification, dao_update_notification, get_notification, get_notification_for_job, get_notifications_for_job, dao_get_notification_statistics_for_service, delete_notifications_created_more_than_a_week_ago, dao_get_notification_statistics_for_service_and_day, update_notification_status_by_id, update_provider_stats, update_notification_status_by_reference, dao_get_template_statistics_for_service, get_notifications_for_service ) from notifications_utils.template import get_sms_fragment_count from tests.app.conftest import (sample_notification) def test_should_by_able_to_update_status_by_reference(sample_email_template, ses_provider): data = _notification_json(sample_email_template, status='sending') notification = Notification(**data) dao_create_notification( notification, sample_email_template.template_type) assert Notification.query.get(notification.id).status == "sending" notification.reference = 'reference' dao_update_notification(notification) update_notification_status_by_reference('reference', 'delivered', 'delivered') assert Notification.query.get(notification.id).status == 'delivered' _assert_notification_stats(notification.service_id, emails_delivered=1, emails_requested=1, emails_failed=0) def test_should_by_able_to_update_status_by_id(sample_template, sample_job, mmg_provider): data = _notification_json(sample_template, job_id=sample_job.id, status='sending') notification = Notification(**data) dao_create_notification(notification, sample_template.template_type) assert Notification.query.get(notification.id).status == 'sending' assert update_notification_status_by_id(notification.id, 'delivered', 'delivered') assert Notification.query.get(notification.id).status == 'delivered' _assert_notification_stats(notification.service_id, sms_delivered=1, sms_requested=1, sms_failed=0) _assert_job_stats(notification.job_id, sent=1, count=1, delivered=1, failed=0) def test_should_not_update_status_by_id_if_not_sending_and_does_not_update_job(notify_db, notify_db_session): notification = sample_notification(notify_db, notify_db_session, status='delivered') job = Job.query.get(notification.job_id) assert Notification.query.get(notification.id).status == 'delivered' assert not update_notification_status_by_id(notification.id, 'failed', 'failure') assert Notification.query.get(notification.id).status == 'delivered' assert job == Job.query.get(notification.job_id) def test_should_by_able_to_update_status_by_id_from_pending_to_delivered(sample_template, sample_job): data = _notification_json(sample_template, job_id=sample_job.id, status='sending') notification = Notification(**data) dao_create_notification(notification, sample_template.template_type) assert Notification.query.get(notification.id).status == 'sending' assert update_notification_status_by_id(notification_id=notification.id, status='pending') assert Notification.query.get(notification.id).status == 'pending' _assert_notification_stats(notification.service_id, sms_requested=1, sms_delivered=0, sms_failed=0) _assert_job_stats(sample_job.id, sent=1, count=1, delivered=0, failed=0) assert update_notification_status_by_id(notification.id, 'delivered', 'delivered') assert Notification.query.get(notification.id).status == 'delivered' _assert_notification_stats(notification.service_id, sms_requested=1, sms_delivered=1, sms_failed=0) _assert_job_stats(notification.job_id, sent=1, count=1, delivered=1, failed=0) def test_should_by_able_to_update_status_by_id_from_pending_to_temporary_failure(sample_template, sample_job): data = _notification_json(sample_template, job_id=sample_job.id, status='sending') notification = Notification(**data) dao_create_notification(notification, sample_template.template_type) assert Notification.query.get(notification.id).status == 'sending' assert update_notification_status_by_id(notification_id=notification.id, status='pending') assert Notification.query.get(notification.id).status == 'pending' _assert_notification_stats(notification.service_id, sms_requested=1, sms_delivered=0, sms_failed=0) _assert_job_stats(notification.job_id, sent=1, count=1, delivered=0, failed=0) assert update_notification_status_by_id( notification.id, status='permanent-failure', notification_statistics_status='failure') assert Notification.query.get(notification.id).status == 'temporary-failure' _assert_notification_stats(notification.service_id, sms_delivered=0, sms_requested=1, sms_failed=1) _assert_job_stats(sample_job.id, sent=1, count=1, delivered=0, failed=1) def test_should_by_able_to_update_status_by_id_from_sending_to_permanent_failure(sample_template, sample_job): data = _notification_json(sample_template, job_id=sample_job.id, status='sending') notification = Notification(**data) dao_create_notification(notification, sample_template.template_type) assert Notification.query.get(notification.id).status == 'sending' assert update_notification_status_by_id( notification.id, status='permanent-failure', notification_statistics_status='failure' ) assert Notification.query.get(notification.id).status == 'permanent-failure' _assert_notification_stats(notification.service_id, sms_requested=1, sms_delivered=0, sms_failed=1) _assert_job_stats(sample_job.id, sent=1, count=1, delivered=0, failed=1) def test_should_not_update_status_one_notification_status_is_delivered(notify_db, notify_db_session, sample_email_template, ses_provider): notification = sample_notification(notify_db=notify_db, notify_db_session=notify_db_session, template=sample_email_template, status='sending') assert Notification.query.get(notification.id).status == "sending" update_provider_stats( notification.id, 'email', ses_provider.identifier) notification.reference = 'reference' dao_update_notification(notification) update_notification_status_by_reference('reference', 'delivered', 'delivered') assert Notification.query.get(notification.id).status == 'delivered' update_notification_status_by_reference('reference', 'failed', 'temporary-failure') assert Notification.query.get(notification.id).status == 'delivered' _assert_notification_stats(notification.service_id, emails_requested=1, emails_delivered=1, emails_failed=0) _assert_job_stats(notification.job_id, sent=1, count=1, delivered=1, failed=0) def test_should_be_able_to_record_statistics_failure_for_sms(notify_db, notify_db_session,): notification = sample_notification(notify_db=notify_db, notify_db_session=notify_db_session, status='sending') assert Notification.query.get(notification.id).status == 'sending' assert update_notification_status_by_id(notification.id, 'permanent-failure', 'failure') assert Notification.query.get(notification.id).status == 'permanent-failure' _assert_notification_stats(notification.service_id, sms_requested=1, sms_delivered=0, sms_failed=1) _assert_job_stats(notification.job_id, sent=1, count=1, delivered=0, failed=1) def test_should_be_able_to_record_statistics_failure_for_email(sample_email_template, sample_job, ses_provider): data = _notification_json(sample_email_template, job_id=sample_job.id, status='sending') notification = Notification(**data) dao_create_notification(notification, sample_email_template.template_type) update_provider_stats( notification.id, 'email', ses_provider.identifier) notification.reference = 'reference' dao_update_notification(notification) count = update_notification_status_by_reference('reference', 'failed', 'failure') assert count == 1 assert Notification.query.get(notification.id).status == 'failed' _assert_notification_stats(notification.service_id, emails_requested=1, emails_delivered=0, emails_failed=1) _assert_job_stats(notification.job_id, sent=1, count=1, delivered=0, failed=1) def test_should_return_zero_count_if_no_notification_with_id(): assert not update_notification_status_by_id(str(uuid.uuid4()), 'delivered', 'delivered') def test_should_return_zero_count_if_no_notification_with_reference(): assert not update_notification_status_by_reference('something', 'delivered', 'delivered') def test_should_be_able_to_get_statistics_for_a_service(sample_template, mmg_provider): data = _notification_json(sample_template) notification = Notification(**data) dao_create_notification(notification, sample_template.template_type) _assert_notification_stats(notification.service_id, sms_requested=1, notification_created_at=notification.created_at.date()) def test_should_be_able_to_get_statistics_for_a_service_for_a_day(sample_template, mmg_provider): now = datetime.utcnow() data = _notification_json(sample_template) notification = Notification(**data) dao_create_notification(notification, sample_template.template_type) stat = dao_get_notification_statistics_for_service_and_day( sample_template.service.id, now.date() ) assert stat.emails_requested == 0 assert stat.emails_failed == 0 assert stat.emails_delivered == 0 assert stat.sms_requested == 1 assert stat.sms_failed == 0 assert stat.sms_delivered == 0 assert stat.day == notification.created_at.date() assert stat.service_id == notification.service_id def test_should_return_none_if_no_statistics_for_a_service_for_a_day(sample_template, mmg_provider): data = _notification_json(sample_template) notification = Notification(**data) dao_create_notification(notification, sample_template.template_type) assert not dao_get_notification_statistics_for_service_and_day( sample_template.service.id, (datetime.utcnow() - timedelta(days=1)).date()) def test_should_be_able_to_get_all_statistics_for_a_service(sample_template, mmg_provider): data = _notification_json(sample_template) notification_1 = Notification(**data) notification_2 = Notification(**data) notification_3 = Notification(**data) dao_create_notification(notification_1, sample_template.template_type) dao_create_notification(notification_2, sample_template.template_type) dao_create_notification(notification_3, sample_template.template_type) _assert_notification_stats(sample_template.service.id, sms_requested=3) def test_should_be_able_to_get_all_statistics_for_a_service_for_several_days(sample_template, mmg_provider): data = _notification_json(sample_template) today = datetime.utcnow() yesterday = datetime.utcnow() - timedelta(days=1) two_days_ago = datetime.utcnow() - timedelta(days=2) data.update({'created_at': today}) notification_1 = Notification(**data) data.update({'created_at': yesterday}) notification_2 = Notification(**data) data.update({'created_at': two_days_ago}) notification_3 = Notification(**data) dao_create_notification(notification_1, sample_template.template_type) dao_create_notification(notification_2, sample_template.template_type) dao_create_notification(notification_3, sample_template.template_type) stats = dao_get_notification_statistics_for_service(sample_template.service.id) assert len(stats) == 3 assert stats[0].emails_requested == 0 assert stats[0].sms_requested == 1 assert stats[0].day == today.date() assert stats[1].emails_requested == 0 assert stats[1].sms_requested == 1 assert stats[1].day == yesterday.date() assert stats[2].emails_requested == 0 assert stats[2].sms_requested == 1 assert stats[2].day == two_days_ago.date() def test_should_be_empty_list_if_no_statistics_for_a_service(sample_service): assert len(dao_get_notification_statistics_for_service(sample_service.id)) == 0 def test_should_be_able_to_get_all_statistics_for_a_service_for_several_days_previous(sample_template, mmg_provider): data = _notification_json(sample_template) today = datetime.utcnow() seven_days_ago = datetime.utcnow() - timedelta(days=7) eight_days_ago = datetime.utcnow() - timedelta(days=8) data.update({'created_at': today}) notification_1 = Notification(**data) data.update({'created_at': seven_days_ago}) notification_2 = Notification(**data) data.update({'created_at': eight_days_ago}) notification_3 = Notification(**data) dao_create_notification(notification_1, sample_template.template_type) dao_create_notification(notification_2, sample_template.template_type) dao_create_notification(notification_3, sample_template.template_type) stats = dao_get_notification_statistics_for_service(sample_template.service.id, 7) assert len(stats) == 2 assert stats[0].emails_requested == 0 assert stats[0].sms_requested == 1 assert stats[0].day == today.date() assert stats[1].emails_requested == 0 assert stats[1].sms_requested == 1 assert stats[1].day == seven_days_ago.date() def test_create_notification_creates_notification_with_personalisation(notify_db, notify_db_session, sample_template_with_placeholders, sample_job, mmg_provider): assert Notification.query.count() == 0 assert NotificationStatistics.query.count() == 0 assert TemplateStatistics.query.count() == 0 data = sample_notification(notify_db=notify_db, notify_db_session=notify_db_session, template=sample_template_with_placeholders, job=sample_job, personalisation={'name': 'Jo'}, status='created') assert Notification.query.count() == 1 notification_from_db = Notification.query.all()[0] assert notification_from_db.id assert data.to == notification_from_db.to assert data.job_id == notification_from_db.job_id assert data.service == notification_from_db.service assert data.template == notification_from_db.template assert data.template_version == notification_from_db.template_version assert data.created_at == notification_from_db.created_at assert 'created' == notification_from_db.status assert {'name': 'Jo'} == notification_from_db.personalisation _assert_job_stats(sample_job.id, sent=1, count=1, delivered=0, failed=0) stats = NotificationStatistics.query.filter( NotificationStatistics.service_id == sample_template_with_placeholders.service.id).first() assert stats.emails_requested == 0 assert stats.sms_requested == 1 template_stats = TemplateStatistics.query.filter( TemplateStatistics.service_id == sample_template_with_placeholders.service.id, TemplateStatistics.template_id == sample_template_with_placeholders.id).first() assert template_stats.service_id == sample_template_with_placeholders.service.id assert template_stats.template_id == sample_template_with_placeholders.id assert template_stats.usage_count == 1 def test_save_notification_creates_sms_and_template_stats(sample_template, sample_job, mmg_provider): assert Notification.query.count() == 0 assert NotificationStatistics.query.count() == 0 assert TemplateStatistics.query.count() == 0 data = _notification_json(sample_template, job_id=sample_job.id) notification = Notification(**data) dao_create_notification(notification, sample_template.template_type) assert Notification.query.count() == 1 notification_from_db = Notification.query.all()[0] assert notification_from_db.id assert data['to'] == notification_from_db.to assert data['job_id'] == notification_from_db.job_id assert data['service'] == notification_from_db.service assert data['template'] == notification_from_db.template assert data['template_version'] == notification_from_db.template_version assert data['created_at'] == notification_from_db.created_at assert 'created' == notification_from_db.status _assert_job_stats(sample_job.id, sent=1, count=1, delivered=0, failed=0) stats = NotificationStatistics.query.filter(NotificationStatistics.service_id == sample_template.service.id).first() assert stats.emails_requested == 0 assert stats.sms_requested == 1 template_stats = TemplateStatistics.query.filter(TemplateStatistics.service_id == sample_template.service.id, TemplateStatistics.template_id == sample_template.id).first() assert template_stats.service_id == sample_template.service.id assert template_stats.template_id == sample_template.id assert template_stats.usage_count == 1 def test_save_notification_and_create_email_and_template_stats(sample_email_template, sample_job, ses_provider): assert Notification.query.count() == 0 assert NotificationStatistics.query.count() == 0 assert TemplateStatistics.query.count() == 0 data = _notification_json(sample_email_template, job_id=sample_job.id) notification = Notification(**data) dao_create_notification(notification, sample_email_template.template_type) assert Notification.query.count() == 1 notification_from_db = Notification.query.all()[0] assert notification_from_db.id assert data['to'] == notification_from_db.to assert data['job_id'] == notification_from_db.job_id assert data['service'] == notification_from_db.service assert data['template'] == notification_from_db.template assert data['template_version'] == notification_from_db.template_version assert data['created_at'] == notification_from_db.created_at assert 'created' == notification_from_db.status _assert_job_stats(sample_job.id, sent=1, count=1, delivered=0, failed=0) stats = NotificationStatistics.query.filter( NotificationStatistics.service_id == sample_email_template.service.id).first() assert stats.emails_requested == 1 assert stats.sms_requested == 0 template_stats = TemplateStatistics.query.filter(TemplateStatistics.service_id == sample_email_template.service.id, TemplateStatistics.template_id == sample_email_template.id).first() # noqa assert template_stats.service_id == sample_email_template.service.id assert template_stats.template_id == sample_email_template.id assert template_stats.usage_count == 1 @freeze_time("2016-01-01 00:00:00.000000") def test_save_notification_handles_midnight_properly(sample_template, sample_job, mmg_provider): assert Notification.query.count() == 0 data = _notification_json(sample_template, sample_job.id) notification = Notification(**data) dao_create_notification(notification, sample_template.template_type) assert Notification.query.count() == 1 stats = NotificationStatistics.query.filter( NotificationStatistics.service_id == sample_template.service.id).first() assert stats.day == date(2016, 1, 1) @freeze_time("2016-01-01 23:59:59.999999") def test_save_notification_handles_just_before_midnight_properly(sample_template, sample_job, mmg_provider): assert Notification.query.count() == 0 data = _notification_json(sample_template, job_id=sample_job.id) notification = Notification(**data) dao_create_notification(notification, sample_template.template_type) assert Notification.query.count() == 1 stats = NotificationStatistics.query.filter( NotificationStatistics.service_id == sample_template.service.id).first() assert stats.day == date(2016, 1, 1) def test_save_notification_and_increment_email_stats(sample_email_template, sample_job, ses_provider): assert Notification.query.count() == 0 data = _notification_json(sample_email_template, job_id=sample_job.id) notification_1 = Notification(**data) notification_2 = Notification(**data) dao_create_notification(notification_1, sample_email_template.template_type) assert Notification.query.count() == 1 stats1 = NotificationStatistics.query.filter( NotificationStatistics.service_id == sample_email_template.service.id).first() assert stats1.emails_requested == 1 assert stats1.sms_requested == 0 dao_create_notification(notification_2, sample_email_template.template_type) assert Notification.query.count() == 2 stats2 = NotificationStatistics.query.filter( NotificationStatistics.service_id == sample_email_template.service.id).first() assert stats2.emails_requested == 2 assert stats2.sms_requested == 0 def test_save_notification_and_increment_sms_stats(sample_template, sample_job, mmg_provider): assert Notification.query.count() == 0 data = _notification_json(sample_template, sample_job.id) notification_1 = Notification(**data) notification_2 = Notification(**data) dao_create_notification(notification_1, sample_template.template_type) assert Notification.query.count() == 1 stats1 = NotificationStatistics.query.filter( NotificationStatistics.service_id == sample_template.service.id ).first() assert stats1.emails_requested == 0 assert stats1.sms_requested == 1 dao_create_notification(notification_2, sample_template.template_type) assert Notification.query.count() == 2 stats2 = NotificationStatistics.query.filter( NotificationStatistics.service_id == sample_template.service.id ).first() assert stats2.emails_requested == 0 assert stats2.sms_requested == 2 def test_not_save_notification_and_not_create_stats_on_commit_error(sample_template, sample_job, mmg_provider): random_id = str(uuid.uuid4()) assert Notification.query.count() == 0 data = _notification_json(sample_template, job_id=random_id) notification = Notification(**data) with pytest.raises(SQLAlchemyError): dao_create_notification(notification, sample_template.template_type) assert Notification.query.count() == 0 assert Job.query.get(sample_job.id).notifications_sent == 0 assert NotificationStatistics.query.count() == 0 assert TemplateStatistics.query.count() == 0 def test_save_notification_and_increment_job(sample_template, sample_job, mmg_provider): assert Notification.query.count() == 0 data = _notification_json(sample_template, job_id=sample_job.id) notification = Notification(**data) dao_create_notification(notification, sample_template.template_type) assert Notification.query.count() == 1 notification_from_db = Notification.query.all()[0] assert notification_from_db.id assert data['to'] == notification_from_db.to assert data['job_id'] == notification_from_db.job_id assert data['service'] == notification_from_db.service assert data['template'] == notification_from_db.template assert data['template_version'] == notification_from_db.template_version assert data['created_at'] == notification_from_db.created_at assert 'created' == notification_from_db.status assert Job.query.get(sample_job.id).notifications_sent == 1 notification_2 = Notification(**data) dao_create_notification(notification_2, sample_template.template_type) assert Notification.query.count() == 2 # count is off because the count is defaulted to 1 in the sample_job _assert_job_stats(sample_job.id, sent=2, count=1) def test_should_not_increment_job_if_notification_fails_to_persist(sample_template, sample_job, mmg_provider): random_id = str(uuid.uuid4()) assert Notification.query.count() == 0 data = _notification_json(sample_template, job_id=sample_job.id, id=random_id) notification_1 = Notification(**data) dao_create_notification(notification_1, sample_template.template_type) assert Notification.query.count() == 1 assert Job.query.get(sample_job.id).notifications_sent == 1 job_last_updated_at = Job.query.get(sample_job.id).updated_at notification_2 = Notification(**data) with pytest.raises(SQLAlchemyError): dao_create_notification(notification_2, sample_template.template_type) assert Notification.query.count() == 1 assert Job.query.get(sample_job.id).notifications_sent == 1 assert Job.query.get(sample_job.id).updated_at == job_last_updated_at def test_save_notification_and_increment_correct_job(notify_db, notify_db_session, sample_template, mmg_provider): from tests.app.conftest import sample_job job_1 = sample_job(notify_db, notify_db_session, sample_template.service) job_2 = sample_job(notify_db, notify_db_session, sample_template.service) assert Notification.query.count() == 0 data = _notification_json(sample_template, job_id=job_1.id) notification = Notification(**data) dao_create_notification(notification, sample_template.template_type) assert Notification.query.count() == 1 notification_from_db = Notification.query.all()[0] assert notification_from_db.id assert data['to'] == notification_from_db.to assert data['job_id'] == notification_from_db.job_id assert data['service'] == notification_from_db.service assert data['template'] == notification_from_db.template assert data['template_version'] == notification_from_db.template_version assert data['created_at'] == notification_from_db.created_at assert 'created' == notification_from_db.status assert job_1.id != job_2.id _assert_job_stats(job_id=job_1.id, sent=1, count=1) _assert_job_stats(job_id=job_2.id, sent=0, count=1) def test_save_notification_with_no_job(sample_template, mmg_provider): assert Notification.query.count() == 0 data = _notification_json(sample_template) notification = Notification(**data) dao_create_notification(notification, sample_template.template_type) assert Notification.query.count() == 1 notification_from_db = Notification.query.all()[0] assert notification_from_db.id assert data['to'] == notification_from_db.to assert data['service'] == notification_from_db.service assert data['template'] == notification_from_db.template assert data['template_version'] == notification_from_db.template_version assert data['created_at'] == notification_from_db.created_at assert 'created' == notification_from_db.status def test_get_notification(sample_notification): notifcation_from_db = get_notification( sample_notification.service.id, sample_notification.id) assert sample_notification == notifcation_from_db def test_save_notification_no_job_id(sample_template, mmg_provider): assert Notification.query.count() == 0 data = _notification_json(sample_template) notification = Notification(**data) dao_create_notification(notification, sample_template.template_type) assert Notification.query.count() == 1 notification_from_db = Notification.query.all()[0] assert notification_from_db.id assert data['to'] == notification_from_db.to assert data['service'] == notification_from_db.service assert data['template'] == notification_from_db.template assert data['template_version'] == notification_from_db.template_version assert 'created' == notification_from_db.status assert data.get('job_id') is None def test_get_notification_for_job(sample_notification): notifcation_from_db = get_notification_for_job( sample_notification.service.id, sample_notification.job_id, sample_notification.id) assert sample_notification == notifcation_from_db def test_get_all_notifications_for_job(notify_db, notify_db_session, sample_job): for i in range(0, 5): try: sample_notification(notify_db, notify_db_session, service=sample_job.service, template=sample_job.template, job=sample_job) except IntegrityError: pass notifications_from_db = get_notifications_for_job(sample_job.service.id, sample_job.id).items assert len(notifications_from_db) == 5 _assert_notification_stats(sample_job.service.id, sms_requested=5) def test_get_all_notifications_for_job_by_status(notify_db, notify_db_session, sample_job): notifications = partial(get_notifications_for_job, sample_job.service.id, sample_job.id) for status in NOTIFICATION_STATUS_TYPES: sample_notification( notify_db, notify_db_session, service=sample_job.service, template=sample_job.template, job=sample_job, status=status ) assert len(notifications().items) == len(NOTIFICATION_STATUS_TYPES) for status in NOTIFICATION_STATUS_TYPES: assert len(notifications(filter_dict={'status': status}).items) == 1 assert len(notifications(filter_dict={'status': NOTIFICATION_STATUS_TYPES[:3]}).items) == 3 def test_update_notification(sample_notification, sample_template): assert sample_notification.status == 'sending' sample_notification.status = 'failed' dao_update_notification(sample_notification) notification_from_db = Notification.query.get(sample_notification.id) assert notification_from_db.status == 'failed' @freeze_time("2016-01-10 12:00:00.000000") def test_should_delete_notifications_after_seven_days(notify_db, notify_db_session): assert len(Notification.query.all()) == 0 # create one notification a day between 1st and 9th from 11:00 to 19:00 for i in range(1, 11): past_date = '2016-01-{0:02d} {0:02d}:00:00.000000'.format(i, i) with freeze_time(past_date): sample_notification(notify_db, notify_db_session, created_at=datetime.utcnow(), status="failed") all_notifications = Notification.query.all() assert len(all_notifications) == 10 # Records from before 3rd should be deleted delete_notifications_created_more_than_a_week_ago('failed') remaining_notifications = Notification.query.all() assert len(remaining_notifications) == 8 assert remaining_notifications[0].created_at.date() == date(2016, 1, 3) def test_should_not_delete_failed_notifications_before_seven_days(notify_db, notify_db_session): should_delete = datetime.utcnow() - timedelta(days=8) do_not_delete = datetime.utcnow() - timedelta(days=7) sample_notification(notify_db, notify_db_session, created_at=should_delete, status="failed", to_field="should_delete") sample_notification(notify_db, notify_db_session, created_at=do_not_delete, status="failed", to_field="do_not_delete") assert len(Notification.query.all()) == 2 delete_notifications_created_more_than_a_week_ago('failed') assert len(Notification.query.all()) == 1 assert Notification.query.first().to == 'do_not_delete' @freeze_time("2016-03-30") def test_save_new_notification_creates_template_stats(sample_template, sample_job, mmg_provider): assert Notification.query.count() == 0 assert TemplateStatistics.query.count() == 0 data = _notification_json(sample_template, job_id=sample_job.id) notification = Notification(**data) dao_create_notification(notification, sample_template.template_type) assert TemplateStatistics.query.count() == 1 template_stats = TemplateStatistics.query.filter(TemplateStatistics.service_id == sample_template.service.id, TemplateStatistics.template_id == sample_template.id).first() assert template_stats.service_id == sample_template.service.id assert template_stats.template_id == sample_template.id assert template_stats.usage_count == 1 assert template_stats.day == date(2016, 3, 30) @freeze_time("2016-03-30") def test_save_new_notification_creates_template_stats_per_day(sample_template, sample_job, mmg_provider): assert Notification.query.count() == 0 assert TemplateStatistics.query.count() == 0 data = _notification_json(sample_template, job_id=sample_job.id) notification = Notification(**data) dao_create_notification(notification, sample_template.template_type) assert TemplateStatistics.query.count() == 1 template_stats = TemplateStatistics.query.filter(TemplateStatistics.service_id == sample_template.service.id, TemplateStatistics.template_id == sample_template.id).first() assert template_stats.service_id == sample_template.service.id assert template_stats.template_id == sample_template.id assert template_stats.usage_count == 1 assert template_stats.day == date(2016, 3, 30) # move on one day with freeze_time('2016-03-31'): assert TemplateStatistics.query.count() == 1 new_notification = Notification(**data) dao_create_notification(new_notification, sample_template.template_type) assert TemplateStatistics.query.count() == 2 first_stats = TemplateStatistics.query.filter(TemplateStatistics.day == datetime(2016, 3, 30)).first() second_stats = TemplateStatistics.query.filter(TemplateStatistics.day == datetime(2016, 3, 31)).first() assert first_stats.template_id == second_stats.template_id assert first_stats.service_id == second_stats.service_id assert first_stats.day == date(2016, 3, 30) assert first_stats.usage_count == 1 assert second_stats.day == date(2016, 3, 31) assert second_stats.usage_count == 1 def test_save_another_notification_increments_template_stats(sample_template, sample_job, mmg_provider): assert Notification.query.count() == 0 assert TemplateStatistics.query.count() == 0 data = _notification_json(sample_template, job_id=sample_job.id) notification_1 = Notification(**data) notification_2 = Notification(**data) dao_create_notification(notification_1, sample_template.template_type) assert TemplateStatistics.query.count() == 1 template_stats = TemplateStatistics.query.filter(TemplateStatistics.service_id == sample_template.service.id, TemplateStatistics.template_id == sample_template.id).first() assert template_stats.service_id == sample_template.service.id assert template_stats.template_id == sample_template.id assert template_stats.usage_count == 1 dao_create_notification(notification_2, sample_template.template_type) assert TemplateStatistics.query.count() == 1 template_stats = TemplateStatistics.query.filter(TemplateStatistics.service_id == sample_template.service.id, TemplateStatistics.template_id == sample_template.id).first() assert template_stats.usage_count == 2 def test_successful_notification_inserts_followed_by_failure_does_not_increment_template_stats(sample_template, sample_job, mmg_provider): assert Notification.query.count() == 0 assert NotificationStatistics.query.count() == 0 assert TemplateStatistics.query.count() == 0 data = _notification_json(sample_template, job_id=sample_job.id) notification_1 = Notification(**data) notification_2 = Notification(**data) notification_3 = Notification(**data) dao_create_notification(notification_1, sample_template.template_type) dao_create_notification(notification_2, sample_template.template_type) dao_create_notification(notification_3, sample_template.template_type) _assert_notification_stats(sample_template.service.id, sms_requested=3) assert TemplateStatistics.query.count() == 1 template_stats = TemplateStatistics.query.filter(TemplateStatistics.service_id == sample_template.service.id, TemplateStatistics.template_id == sample_template.id).first() assert template_stats.service_id == sample_template.service.id assert template_stats.template_id == sample_template.id assert template_stats.usage_count == 3 failing_notification = Notification(**data) try: # Mess up db in really bad way db.session.execute('DROP TABLE TEMPLATE_STATISTICS') dao_create_notification(failing_notification, sample_template.template_type) except Exception as e: # There should be no additional notification stats or counts _assert_notification_stats(sample_template.service.id, sms_requested=3) @freeze_time("2016-03-30") def test_get_template_stats_for_service_returns_stats_in_reverse_date_order(sample_template, sample_job): template_stats = dao_get_template_statistics_for_service(sample_template.service.id) assert len(template_stats) == 0 data = _notification_json(sample_template, job_id=sample_job.id) notification = Notification(**data) dao_create_notification(notification, sample_template.template_type) # move on one day with freeze_time('2016-03-31'): new_notification = Notification(**data) dao_create_notification(new_notification, sample_template.template_type) # move on one more day with freeze_time('2016-04-01'): new_notification = Notification(**data) dao_create_notification(new_notification, sample_template.template_type) template_stats = dao_get_template_statistics_for_service(sample_template.service_id) assert len(template_stats) == 3 assert template_stats[0].day == date(2016, 4, 1) assert template_stats[1].day == date(2016, 3, 31) assert template_stats[2].day == date(2016, 3, 30) @freeze_time('2016-04-09') def test_get_template_stats_for_service_returns_stats_can_limit_number_of_days_returned(sample_template): template_stats = dao_get_template_statistics_for_service(sample_template.service.id) assert len(template_stats) == 0 # Make 9 stats records from 1st to 9th April for i in range(1, 10): past_date = '2016-04-0{}'.format(i) with freeze_time(past_date): template_stats = TemplateStatistics(template_id=sample_template.id, service_id=sample_template.service_id) db.session.add(template_stats) db.session.commit() # Retrieve last week of stats template_stats = dao_get_template_statistics_for_service(sample_template.service_id, limit_days=7) assert len(template_stats) == 8 assert template_stats[0].day == date(2016, 4, 9) # Final day of stats should be the same as today, eg Monday assert template_stats[0].day.isoweekday() == template_stats[7].day.isoweekday() assert template_stats[7].day == date(2016, 4, 2) @freeze_time('2016-04-09') def test_get_template_stats_for_service_returns_stats_returns_all_stats_if_no_limit(sample_template): template_stats = dao_get_template_statistics_for_service(sample_template.service.id) assert len(template_stats) == 0 # make 9 stats records from 1st to 9th April for i in range(1, 10): past_date = '2016-04-0{}'.format(i) with freeze_time(past_date): template_stats = TemplateStatistics(template_id=sample_template.id, service_id=sample_template.service_id) db.session.add(template_stats) db.session.commit() template_stats = dao_get_template_statistics_for_service(sample_template.service_id) assert len(template_stats) == 9 assert template_stats[0].day == date(2016, 4, 9) assert template_stats[8].day == date(2016, 4, 1) @freeze_time('2016-04-30') def test_get_template_stats_for_service_returns_no_result_if_no_usage_within_limit_days(sample_template): template_stats = dao_get_template_statistics_for_service(sample_template.service.id) assert len(template_stats) == 0 # make 9 stats records from 1st to 9th April - no data after 10th for i in range(1, 10): past_date = '2016-04-0{}'.format(i) with freeze_time(past_date): template_stats = TemplateStatistics(template_id=sample_template.id, service_id=sample_template.service_id) db.session.add(template_stats) db.session.commit() # Retrieve a week of stats - read date is 2016-04-30 template_stats = dao_get_template_statistics_for_service(sample_template.service_id, limit_days=7) assert len(template_stats) == 0 # Retrieve a month of stats - read date is 2016-04-30 template_stats = dao_get_template_statistics_for_service(sample_template.service_id, limit_days=30) assert len(template_stats) == 9 def test_get_template_stats_for_service_with_limit_if_no_records_returns_empty_list(sample_template): template_stats = dao_get_template_statistics_for_service(sample_template.service.id, limit_days=7) assert len(template_stats) == 0 @freeze_time("2016-01-10") def test_should_limit_notifications_return_by_day_limit_plus_one(notify_db, notify_db_session, sample_service): assert len(Notification.query.all()) == 0 # create one notification a day between 1st and 9th for i in range(1, 11): past_date = '2016-01-{0:02d}'.format(i) with freeze_time(past_date): sample_notification(notify_db, notify_db_session, created_at=datetime.utcnow(), status="failed") all_notifications = Notification.query.all() assert len(all_notifications) == 10 all_notifications = get_notifications_for_service(sample_service.id, limit_days=10).items assert len(all_notifications) == 10 all_notifications = get_notifications_for_service(sample_service.id, limit_days=1).items assert len(all_notifications) == 2 @pytest.mark.parametrize( "char_count, expected_sms_fragment_count", [ (159, 1), (160, 1), (161, 2), (306, 2), (307, 3), (459, 3), (460, 4), (461, 4) ]) def test_sms_fragment_count(char_count, expected_sms_fragment_count): assert get_sms_fragment_count(char_count) == expected_sms_fragment_count def _notification_json(sample_template, job_id=None, id=None, status=None): data = { 'to': '+44709123456', 'service': sample_template.service, 'service_id': sample_template.service.id, 'template': sample_template, 'template_id': sample_template.id, 'template_version': sample_template.version, 'created_at': datetime.utcnow(), 'content_char_count': 160, } if job_id: data.update({'job_id': job_id}) if id: data.update({'id': id}) if status: data.update({'status': status}) return data def _assert_notification_stats(service_id, emails_delivered=0, emails_requested=0, emails_failed=0, sms_delivered=0, sms_requested=0, sms_failed=0, notification_created_at=None): stats = NotificationStatistics.query.filter_by(service_id=service_id).all() assert len(stats) == 1 assert stats[0].emails_delivered == emails_delivered assert stats[0].emails_requested == emails_requested assert stats[0].emails_failed == emails_failed assert stats[0].sms_delivered == sms_delivered assert stats[0].sms_requested == sms_requested assert stats[0].sms_failed == sms_failed assert stats[0].day == notification_created_at if notification_created_at else True def _assert_job_stats(job_id, sent=0, count=0, delivered=0, failed=0): job = Job.query.get(job_id) assert job.notifications_sent == sent assert job.notification_count == count assert job.notifications_delivered == delivered assert job.notifications_failed == failed