move job dao tests to use db.py instead of conftest directly

This commit is contained in:
Leo Hemsted
2018-12-11 17:40:11 +00:00
parent bf62d3ad5f
commit 63b3a3849f

View File

@@ -19,106 +19,14 @@ from app.models import (
Job,
EMAIL_TYPE, SMS_TYPE, LETTER_TYPE
)
from tests.app.conftest import sample_job as create_job
from tests.app.conftest import sample_notification as create_notification
from tests.app.conftest import sample_service as create_service
from tests.app.conftest import sample_template as create_template
from tests.app.db import (
create_user
)
from tests.app.db import create_job, create_service, create_template
def test_should_have_decorated_notifications_dao_functions():
assert dao_get_notification_outcomes_for_job.__wrapped__.__name__ == 'dao_get_notification_outcomes_for_job' # noqa
def test_should_get_all_statuses_for_notifications_associated_with_job(
notify_db,
notify_db_session,
sample_service,
sample_job
):
notification = partial(create_notification, notify_db, notify_db_session, service=sample_service, job=sample_job)
notification(status='created')
notification(status='sending')
notification(status='delivered')
notification(status='pending')
notification(status='failed')
notification(status='technical-failure')
notification(status='temporary-failure')
notification(status='permanent-failure')
notification(status='sent')
results = dao_get_notification_outcomes_for_job(sample_service.id, sample_job.id)
assert set([(row.count, row.status) for row in results]) == set([
(1, 'created'),
(1, 'sending'),
(1, 'delivered'),
(1, 'pending'),
(1, 'failed'),
(1, 'technical-failure'),
(1, 'temporary-failure'),
(1, 'permanent-failure'),
(1, 'sent')
])
def test_should_count_of_statuses_for_notifications_associated_with_job(
notify_db,
notify_db_session,
sample_service,
sample_job
):
notification = partial(create_notification, notify_db, notify_db_session, service=sample_service, job=sample_job)
notification(status='created')
notification(status='created')
notification(status='sending')
notification(status='sending')
notification(status='sending')
notification(status='sending')
notification(status='delivered')
notification(status='delivered')
results = dao_get_notification_outcomes_for_job(sample_service.id, sample_job.id)
assert set([(row.count, row.status) for row in results]) == set([
(2, 'created'),
(4, 'sending'),
(2, 'delivered')
])
def test_should_return_zero_length_array_if_no_notifications_for_job(sample_service, sample_job):
assert len(dao_get_notification_outcomes_for_job(sample_job.id, sample_service.id)) == 0
def test_should_return_notifications_only_for_this_job(notify_db, notify_db_session, sample_service):
job_1 = create_job(notify_db, notify_db_session, service=sample_service)
job_2 = create_job(notify_db, notify_db_session, service=sample_service)
create_notification(notify_db, notify_db_session, service=sample_service, job=job_1, status='created')
create_notification(notify_db, notify_db_session, service=sample_service, job=job_2, status='created')
results = dao_get_notification_outcomes_for_job(sample_service.id, job_1.id)
assert [(row.count, row.status) for row in results] == [
(1, 'created')
]
def test_should_return_notifications_only_for_this_service(notify_db, notify_db_session):
service_1 = create_service(notify_db, notify_db_session, service_name="one", email_from="one")
service_2 = create_service(notify_db, notify_db_session, service_name="two", email_from="two")
job_1 = create_job(notify_db, notify_db_session, service=service_1)
job_2 = create_job(notify_db, notify_db_session, service=service_2)
create_notification(notify_db, notify_db_session, service=service_1, job=job_1, status='created')
create_notification(notify_db, notify_db_session, service=service_2, job=job_2, status='created')
assert len(dao_get_notification_outcomes_for_job(service_1.id, job_2.id)) == 0
def test_create_job(sample_template):
def test_create_sample_job(sample_template):
assert Job.query.count() == 0
job_id = uuid.uuid4()
@@ -147,14 +55,12 @@ def test_get_job_by_id(sample_job):
assert sample_job == job_from_db
def test_get_jobs_for_service(notify_db, notify_db_session, sample_template):
one_job = create_job(notify_db, notify_db_session, sample_template.service, sample_template)
def test_get_jobs_for_service(sample_template):
one_job = create_job(sample_template)
other_user = create_user(email="test@digital.cabinet-office.gov.uk")
other_service = create_service(notify_db, notify_db_session, user=other_user, service_name="other service",
email_from='other.service')
other_template = create_template(notify_db, notify_db_session, service=other_service)
other_job = create_job(notify_db, notify_db_session, service=other_service, template=other_template)
other_service = create_service(service_name="other service")
other_template = create_template(service=other_service)
other_job = create_job(other_template)
one_job_from_db = dao_get_jobs_by_service_id(one_job.service_id).items
other_job_from_db = dao_get_jobs_by_service_id(other_job.service_id).items
@@ -168,10 +74,9 @@ def test_get_jobs_for_service(notify_db, notify_db_session, sample_template):
assert one_job_from_db != other_job_from_db
def test_get_jobs_for_service_with_limit_days_param(notify_db, notify_db_session, sample_template):
one_job = create_job(notify_db, notify_db_session, sample_template.service, sample_template)
old_job = create_job(notify_db, notify_db_session, sample_template.service, sample_template,
created_at=datetime.now() - timedelta(days=8))
def test_get_jobs_for_service_with_limit_days_param(sample_template):
one_job = create_job(sample_template)
old_job = create_job(sample_template, created_at=datetime.now() - timedelta(days=8))
jobs = dao_get_jobs_by_service_id(one_job.service_id).items
@@ -185,34 +90,27 @@ def test_get_jobs_for_service_with_limit_days_param(notify_db, notify_db_session
assert old_job not in jobs_limit_days
def test_get_jobs_for_service_with_limit_days_edge_case(notify_db, notify_db_session, sample_template):
one_job = create_job(notify_db, notify_db_session, sample_template.service, sample_template)
job_two = create_job(notify_db, notify_db_session, sample_template.service, sample_template,
created_at=(datetime.now() - timedelta(days=7)).date())
one_second_after_midnight = datetime.combine((datetime.now() - timedelta(days=7)).date(),
datetime.strptime("000001", "%H%M%S").time())
just_after_midnight_job = create_job(notify_db, notify_db_session, sample_template.service, sample_template,
created_at=one_second_after_midnight)
job_eight_days_old = create_job(notify_db, notify_db_session, sample_template.service, sample_template,
created_at=datetime.now() - timedelta(days=8))
@freeze_time('2017-06-10')
def test_get_jobs_for_service_with_limit_days_edge_case(sample_template):
one_job = create_job(sample_template)
just_after_midnight_job = create_job(sample_template, created_at=datetime(2017, 6, 2, 23, 0, 1))
just_before_midnight_job = create_job(sample_template, created_at=datetime(2017, 6, 2, 22, 59, 0))
jobs_limit_days = dao_get_jobs_by_service_id(one_job.service_id, limit_days=7).items
assert len(jobs_limit_days) == 3
assert len(jobs_limit_days) == 2
assert one_job in jobs_limit_days
assert job_two in jobs_limit_days
assert just_after_midnight_job in jobs_limit_days
assert job_eight_days_old not in jobs_limit_days
assert just_before_midnight_job not in jobs_limit_days
def test_get_jobs_for_service_in_processed_at_then_created_at_order(notify_db, notify_db_session, sample_template):
_create_job = partial(create_job, notify_db, notify_db_session, sample_template.service, sample_template)
from_hour = partial(datetime, 2001, 1, 1)
created_jobs = [
_create_job(created_at=from_hour(2), processing_started=None),
_create_job(created_at=from_hour(1), processing_started=None),
_create_job(created_at=from_hour(1), processing_started=from_hour(4)),
_create_job(created_at=from_hour(2), processing_started=from_hour(3)),
create_job(sample_template, created_at=from_hour(2), processing_started=None),
create_job(sample_template, created_at=from_hour(1), processing_started=None),
create_job(sample_template, created_at=from_hour(1), processing_started=from_hour(4)),
create_job(sample_template, created_at=from_hour(2), processing_started=from_hour(3)),
]
jobs = dao_get_jobs_by_service_id(sample_template.service.id).items
@@ -235,21 +133,20 @@ def test_update_job(sample_job):
assert job_from_db.job_status == 'in progress'
def test_set_scheduled_jobs_to_pending_gets_all_jobs_in_scheduled_state_before_now(notify_db, notify_db_session):
def test_set_scheduled_jobs_to_pending_gets_all_jobs_in_scheduled_state_before_now(sample_template):
one_minute_ago = datetime.utcnow() - timedelta(minutes=1)
one_hour_ago = datetime.utcnow() - timedelta(minutes=60)
job_new = create_job(notify_db, notify_db_session, scheduled_for=one_minute_ago, job_status='scheduled')
job_old = create_job(notify_db, notify_db_session, scheduled_for=one_hour_ago, job_status='scheduled')
job_new = create_job(sample_template, scheduled_for=one_minute_ago, job_status='scheduled')
job_old = create_job(sample_template, scheduled_for=one_hour_ago, job_status='scheduled')
jobs = dao_set_scheduled_jobs_to_pending()
assert len(jobs) == 2
assert jobs[0].id == job_old.id
assert jobs[1].id == job_new.id
def test_set_scheduled_jobs_to_pending_gets_ignores_jobs_not_scheduled(notify_db, notify_db_session):
def test_set_scheduled_jobs_to_pending_gets_ignores_jobs_not_scheduled(sample_template, sample_job):
one_minute_ago = datetime.utcnow() - timedelta(minutes=1)
create_job(notify_db, notify_db_session)
job_scheduled = create_job(notify_db, notify_db_session, scheduled_for=one_minute_ago, job_status='scheduled')
job_scheduled = create_job(sample_template, scheduled_for=one_minute_ago, job_status='scheduled')
jobs = dao_set_scheduled_jobs_to_pending()
assert len(jobs) == 1
assert jobs[0].id == job_scheduled.id
@@ -260,11 +157,11 @@ def test_set_scheduled_jobs_to_pending_gets_ignores_jobs_scheduled_in_the_future
assert len(jobs) == 0
def test_set_scheduled_jobs_to_pending_updates_rows(notify_db, notify_db_session):
def test_set_scheduled_jobs_to_pending_updates_rows(sample_template):
one_minute_ago = datetime.utcnow() - timedelta(minutes=1)
one_hour_ago = datetime.utcnow() - timedelta(minutes=60)
create_job(notify_db, notify_db_session, scheduled_for=one_minute_ago, job_status='scheduled')
create_job(notify_db, notify_db_session, scheduled_for=one_hour_ago, job_status='scheduled')
create_job(sample_template, scheduled_for=one_minute_ago, job_status='scheduled')
create_job(sample_template, scheduled_for=one_hour_ago, job_status='scheduled')
jobs = dao_set_scheduled_jobs_to_pending()
assert len(jobs) == 2
assert jobs[0].job_status == 'pending'
@@ -277,7 +174,7 @@ def test_get_future_scheduled_job_gets_a_job_yet_to_send(sample_scheduled_job):
@freeze_time('2016-10-31 10:00:00')
def test_should_get_jobs_seven_days_old(notify_db, notify_db_session, sample_template):
def test_should_get_jobs_seven_days_old(sample_template):
"""
Jobs older than seven days are deleted, but only two day's worth (two-day window)
"""
@@ -289,12 +186,11 @@ def test_should_get_jobs_seven_days_old(notify_db, notify_db_session, sample_tem
nine_days_ago = eight_days_ago - timedelta(days=2)
nine_days_one_second_ago = nine_days_ago - timedelta(seconds=1)
job = partial(create_job, notify_db, notify_db_session)
job(created_at=seven_days_ago)
job(created_at=within_seven_days)
job_to_delete = job(created_at=eight_days_ago)
job(created_at=nine_days_ago, archived=True)
job(created_at=nine_days_one_second_ago, archived=True)
create_job(sample_template, created_at=seven_days_ago)
create_job(sample_template, created_at=within_seven_days)
job_to_delete = create_job(sample_template, created_at=eight_days_ago)
create_job(sample_template, created_at=nine_days_ago, archived=True)
create_job(sample_template, created_at=nine_days_one_second_ago, archived=True)
jobs = dao_get_jobs_older_than_data_retention(notification_types=[sample_template.template_type])
@@ -306,7 +202,7 @@ def test_get_jobs_for_service_is_paginated(notify_db, notify_db_session, sample_
with freeze_time('2015-01-01T00:00:00') as the_time:
for _ in range(10):
the_time.tick(timedelta(hours=1))
create_job(notify_db, notify_db_session, sample_service, sample_template)
create_job(sample_template)
res = dao_get_jobs_by_service_id(sample_service.id, page=1, page_size=2)
@@ -328,19 +224,11 @@ def test_get_jobs_for_service_is_paginated(notify_db, notify_db_session, sample_
'Report',
])
def test_get_jobs_for_service_doesnt_return_test_messages(
notify_db,
notify_db_session,
sample_template,
sample_job,
file_name,
):
create_job(
notify_db,
notify_db_session,
sample_template.service,
sample_template,
original_file_name=file_name,
)
create_job(sample_template, original_file_name=file_name,)
jobs = dao_get_jobs_by_service_id(sample_job.service_id).items
@@ -348,16 +236,15 @@ def test_get_jobs_for_service_doesnt_return_test_messages(
@freeze_time('2016-10-31 10:00:00')
def test_should_get_jobs_seven_days_old_filters_type(notify_db, notify_db_session):
def test_should_get_jobs_seven_days_old_filters_type(sample_service):
eight_days_ago = datetime.utcnow() - timedelta(days=8)
letter_template = create_template(notify_db, notify_db_session, template_type=LETTER_TYPE)
sms_template = create_template(notify_db, notify_db_session, template_type=SMS_TYPE)
email_template = create_template(notify_db, notify_db_session, template_type=EMAIL_TYPE)
letter_template = create_template(sample_service, template_type=LETTER_TYPE)
sms_template = create_template(sample_service, template_type=SMS_TYPE)
email_template = create_template(sample_service, template_type=EMAIL_TYPE)
job = partial(create_job, notify_db, notify_db_session, created_at=eight_days_ago)
job_to_remain = job(template=letter_template)
job(template=sms_template)
job(template=email_template)
job_to_remain = create_job(letter_template, created_at=eight_days_ago)
create_job(sms_template, created_at=eight_days_ago)
create_job(email_template, created_at=eight_days_ago)
jobs = dao_get_jobs_older_than_data_retention(
notification_types=[EMAIL_TYPE, SMS_TYPE]