move job rest tests to use db.py instead of conftest directly

This commit is contained in:
Leo Hemsted
2018-12-12 12:49:52 +00:00
parent 63b3a3849f
commit e555a7595b

View File

@@ -1,21 +1,18 @@
import json import json
import uuid import uuid
from datetime import datetime, timedelta from datetime import datetime, timedelta
from functools import partial
from freezegun import freeze_time from freezegun import freeze_time
import pytest import pytest
import pytz import pytz
import app.celery.tasks import app.celery.tasks
from app.dao.templates_dao import dao_update_template
from app.models import JOB_STATUS_TYPES, JOB_STATUS_PENDING
from tests import create_authorization_header from tests import create_authorization_header
from tests.conftest import set_config from tests.conftest import set_config
from tests.app.conftest import ( from tests.app.db import create_job, create_notification
sample_job as create_job,
sample_notification as create_notification
)
from app.dao.templates_dao import dao_update_template
from app.models import NOTIFICATION_STATUS_TYPES, JOB_STATUS_TYPES, JOB_STATUS_PENDING
def test_get_job_with_invalid_service_id_returns404(client, sample_service): def test_get_job_with_invalid_service_id_returns404(client, sample_service):
@@ -444,54 +441,26 @@ def test_create_job_returns_400_if_archived_template(client, sample_template, mo
assert 'Template has been deleted' in resp_json['message']['template'] assert 'Template has been deleted' in resp_json['message']['template']
def _setup_jobs(notify_db, notify_db_session, template, number_of_jobs=5): def _setup_jobs(template, number_of_jobs=5):
for i in range(number_of_jobs): for i in range(number_of_jobs):
create_job( create_job(template=template)
notify_db,
notify_db_session,
service=template.service,
template=template)
def test_get_all_notifications_for_job_in_order_of_job_number( def test_get_all_notifications_for_job_in_order_of_job_number(admin_request, sample_template):
client, notify_db, notify_db_session, sample_service main_job = create_job(sample_template)
): another_job = create_job(sample_template)
main_job = create_job(notify_db, notify_db_session, service=sample_service)
another_job = create_job(notify_db, notify_db_session, service=sample_service)
notification_1 = create_notification( notification_1 = create_notification(job=main_job, to_field="1", job_row_number=1)
notify_db, notification_2 = create_notification(job=main_job, to_field="2", job_row_number=2)
notify_db_session, notification_3 = create_notification(job=main_job, to_field="3", job_row_number=3)
job=main_job, create_notification(job=another_job)
to_field="1",
created_at=datetime.utcnow(), resp = admin_request.get(
job_row_number=1 'job.get_all_notifications_for_service_job',
service_id=main_job.service_id,
job_id=main_job.id
) )
notification_2 = create_notification(
notify_db,
notify_db_session,
job=main_job,
to_field="2",
created_at=datetime.utcnow(),
job_row_number=2
)
notification_3 = create_notification(
notify_db,
notify_db_session,
job=main_job,
to_field="3",
created_at=datetime.utcnow(),
job_row_number=3
)
create_notification(notify_db, notify_db_session, job=another_job)
auth_header = create_authorization_header()
response = client.get(
path='/service/{}/job/{}/notifications'.format(sample_service.id, main_job.id),
headers=[auth_header])
resp = json.loads(response.get_data(as_text=True))
assert len(resp['notifications']) == 3 assert len(resp['notifications']) == 3
assert resp['notifications'][0]['to'] == notification_1.to assert resp['notifications'][0]['to'] == notification_1.to
assert resp['notifications'][0]['job_row_number'] == notification_1.job_row_number assert resp['notifications'][0]['job_row_number'] == notification_1.job_row_number
@@ -499,133 +468,76 @@ def test_get_all_notifications_for_job_in_order_of_job_number(
assert resp['notifications'][1]['job_row_number'] == notification_2.job_row_number assert resp['notifications'][1]['job_row_number'] == notification_2.job_row_number
assert resp['notifications'][2]['to'] == notification_3.to assert resp['notifications'][2]['to'] == notification_3.to
assert resp['notifications'][2]['job_row_number'] == notification_3.job_row_number assert resp['notifications'][2]['job_row_number'] == notification_3.job_row_number
assert response.status_code == 200
@pytest.mark.parametrize( @pytest.mark.parametrize(
"expected_notification_count, status_args", "expected_notification_count, status_args",
[ [
(1, '?status={}'.format(NOTIFICATION_STATUS_TYPES[0])), (1, ['created']),
(0, '?status={}'.format(NOTIFICATION_STATUS_TYPES[1])), (0, ['sending']),
(1, '?status={}&status={}&status={}'.format(*NOTIFICATION_STATUS_TYPES[0:3])), (1, ['created', 'sending']),
(0, '?status={}&status={}&status={}'.format(*NOTIFICATION_STATUS_TYPES[3:6])), (0, ['sending', 'delivered']),
] ]
) )
def test_get_all_notifications_for_job_filtered_by_status( def test_get_all_notifications_for_job_filtered_by_status(
client, admin_request,
notify_db, sample_job,
notify_db_session,
sample_service,
expected_notification_count, expected_notification_count,
status_args status_args
): ):
job = create_job(notify_db, notify_db_session, service=sample_service) create_notification(job=sample_job, to_field="1", status='created')
create_notification( resp = admin_request.get(
notify_db, 'job.get_all_notifications_for_service_job',
notify_db_session, service_id=sample_job.service_id,
job=job, job_id=sample_job.id,
to_field="1", status=status_args
created_at=datetime.utcnow(),
status=NOTIFICATION_STATUS_TYPES[0],
job_row_number=1
) )
response = client.get(
path='/service/{}/job/{}/notifications{}'.format(sample_service.id, job.id, status_args),
headers=[create_authorization_header()]
)
resp = json.loads(response.get_data(as_text=True))
assert len(resp['notifications']) == expected_notification_count assert len(resp['notifications']) == expected_notification_count
assert response.status_code == 200
def test_get_all_notifications_for_job_returns_correct_format( def test_get_all_notifications_for_job_returns_correct_format(
client, admin_request,
sample_notification_with_job sample_notification_with_job
): ):
service_id = sample_notification_with_job.service_id service_id = sample_notification_with_job.service_id
job_id = sample_notification_with_job.job_id job_id = sample_notification_with_job.job_id
response = client.get(
path='/service/{}/job/{}/notifications'.format(service_id, job_id), resp = admin_request.get('job.get_all_notifications_for_service_job', service_id=service_id, job_id=job_id)
headers=[create_authorization_header()]
)
assert response.status_code == 200
resp = json.loads(response.get_data(as_text=True))
assert len(resp['notifications']) == 1 assert len(resp['notifications']) == 1
assert resp['notifications'][0]['id'] == str(sample_notification_with_job.id) assert resp['notifications'][0]['id'] == str(sample_notification_with_job.id)
assert resp['notifications'][0]['status'] == sample_notification_with_job.status assert resp['notifications'][0]['status'] == sample_notification_with_job.status
def test_get_job_by_id(notify_api, sample_job): def test_get_job_by_id(admin_request, sample_job):
job_id = str(sample_job.id) job_id = str(sample_job.id)
service_id = sample_job.service.id service_id = sample_job.service.id
with notify_api.test_request_context():
with notify_api.test_client() as client: resp_json = admin_request.get('job.get_job_by_service_and_job_id', service_id=service_id, job_id=job_id)
path = '/service/{}/job/{}'.format(service_id, job_id)
auth_header = create_authorization_header()
response = client.get(path, headers=[auth_header])
assert response.status_code == 200
resp_json = json.loads(response.get_data(as_text=True))
assert resp_json['data']['id'] == job_id assert resp_json['data']['id'] == job_id
assert resp_json['data']['statistics'] == [] assert resp_json['data']['statistics'] == []
assert resp_json['data']['created_by']['name'] == 'Test User' assert resp_json['data']['created_by']['name'] == 'Test User'
def test_get_job_by_id_should_return_statistics(client, notify_db, notify_db_session, notify_api, sample_job): def test_get_job_by_id_should_return_summed_statistics(admin_request, sample_job):
job_id = str(sample_job.id) job_id = str(sample_job.id)
service_id = sample_job.service.id service_id = sample_job.service.id
partial_notification = partial(
create_notification, notify_db, notify_db_session, service=sample_job.service, job=sample_job
)
partial_notification(status='created')
partial_notification(status='sending')
partial_notification(status='delivered')
partial_notification(status='pending')
partial_notification(status='failed')
partial_notification(status='technical-failure') # noqa
partial_notification(status='temporary-failure') # noqa
partial_notification(status='permanent-failure') # noqa
path = '/service/{}/job/{}'.format(service_id, job_id) create_notification(job=sample_job, status='created')
auth_header = create_authorization_header() create_notification(job=sample_job, status='created')
response = client.get(path, headers=[auth_header]) create_notification(job=sample_job, status='created')
assert response.status_code == 200 create_notification(job=sample_job, status='sending')
resp_json = json.loads(response.get_data(as_text=True)) create_notification(job=sample_job, status='failed')
assert resp_json['data']['id'] == job_id create_notification(job=sample_job, status='failed')
assert {'status': 'created', 'count': 1} in resp_json['data']['statistics'] create_notification(job=sample_job, status='failed')
assert {'status': 'sending', 'count': 1} in resp_json['data']['statistics'] create_notification(job=sample_job, status='technical-failure')
assert {'status': 'delivered', 'count': 1} in resp_json['data']['statistics'] create_notification(job=sample_job, status='temporary-failure')
assert {'status': 'pending', 'count': 1} in resp_json['data']['statistics'] create_notification(job=sample_job, status='temporary-failure')
assert {'status': 'failed', 'count': 1} in resp_json['data']['statistics']
assert {'status': 'technical-failure', 'count': 1} in resp_json['data']['statistics']
assert {'status': 'temporary-failure', 'count': 1} in resp_json['data']['statistics']
assert {'status': 'permanent-failure', 'count': 1} in resp_json['data']['statistics']
assert resp_json['data']['created_by']['name'] == 'Test User'
resp_json = admin_request.get('job.get_job_by_service_and_job_id', service_id=service_id, job_id=job_id)
def test_get_job_by_id_should_return_summed_statistics(client, notify_db, notify_db_session, notify_api, sample_job):
job_id = str(sample_job.id)
service_id = sample_job.service.id
partial_notification = partial(
create_notification, notify_db, notify_db_session, service=sample_job.service, job=sample_job
)
partial_notification(status='created')
partial_notification(status='created')
partial_notification(status='created')
partial_notification(status='sending')
partial_notification(status='failed')
partial_notification(status='failed')
partial_notification(status='failed')
partial_notification(status='technical-failure')
partial_notification(status='temporary-failure')
partial_notification(status='temporary-failure')
path = '/service/{}/job/{}'.format(service_id, job_id)
auth_header = create_authorization_header()
response = client.get(path, headers=[auth_header])
assert response.status_code == 200
resp_json = json.loads(response.get_data(as_text=True))
assert resp_json['data']['id'] == job_id assert resp_json['data']['id'] == job_id
assert {'status': 'created', 'count': 3} in resp_json['data']['statistics'] assert {'status': 'created', 'count': 3} in resp_json['data']['statistics']
assert {'status': 'sending', 'count': 1} in resp_json['data']['statistics'] assert {'status': 'sending', 'count': 1} in resp_json['data']['statistics']
@@ -635,32 +547,23 @@ def test_get_job_by_id_should_return_summed_statistics(client, notify_db, notify
assert resp_json['data']['created_by']['name'] == 'Test User' assert resp_json['data']['created_by']['name'] == 'Test User'
def test_get_jobs(client, notify_db, notify_db_session, sample_template): def test_get_jobs(admin_request, sample_template):
_setup_jobs(notify_db, notify_db_session, sample_template) _setup_jobs(sample_template)
service_id = sample_template.service.id service_id = sample_template.service.id
path = '/service/{}/job'.format(service_id) resp_json = admin_request.get('job.get_jobs_by_service', service_id=service_id)
auth_header = create_authorization_header()
response = client.get(path, headers=[auth_header])
assert response.status_code == 200
resp_json = json.loads(response.get_data(as_text=True))
assert len(resp_json['data']) == 5 assert len(resp_json['data']) == 5
def test_get_jobs_with_limit_days(admin_request, notify_db, notify_db_session, sample_template): def test_get_jobs_with_limit_days(admin_request, sample_template):
for time in [ for time in [
'Sunday 1st July 2018 22:59', 'Sunday 1st July 2018 22:59',
'Sunday 2nd July 2018 23:00', # beginning of monday morning 'Sunday 2nd July 2018 23:00', # beginning of monday morning
'Monday 3rd July 2018 12:00' 'Monday 3rd July 2018 12:00'
]: ]:
with freeze_time(time): with freeze_time(time):
create_job( create_job(template=sample_template)
notify_db,
notify_db_session,
service=sample_template.service,
template=sample_template,
)
with freeze_time('Monday 9th July 2018 12:00'): with freeze_time('Monday 9th July 2018 12:00'):
resp_json = admin_request.get('job.get_jobs_by_service', service_id=sample_template.service_id, limit_days=7) resp_json = admin_request.get('job.get_jobs_by_service', service_id=sample_template.service_id, limit_days=7)
@@ -668,26 +571,20 @@ def test_get_jobs_with_limit_days(admin_request, notify_db, notify_db_session, s
assert len(resp_json['data']) == 2 assert len(resp_json['data']) == 2
def test_get_jobs_should_return_statistics(client, notify_db, notify_db_session, notify_api, sample_service): def test_get_jobs_should_return_statistics(admin_request, sample_template):
now = datetime.utcnow() now = datetime.utcnow()
earlier = datetime.utcnow() - timedelta(days=1) earlier = datetime.utcnow() - timedelta(days=1)
job_1 = create_job(notify_db, notify_db_session, service=sample_service, created_at=earlier) job_1 = create_job(sample_template, created_at=earlier)
job_2 = create_job(notify_db, notify_db_session, service=sample_service, created_at=now) job_2 = create_job(sample_template, created_at=now)
partial_notification = partial(create_notification, notify_db, notify_db_session, service=sample_service) create_notification(job=job_1, status='created')
partial_notification(job=job_1, status='created') create_notification(job=job_1, status='created')
partial_notification(job=job_1, status='created') create_notification(job=job_1, status='created')
partial_notification(job=job_1, status='created') create_notification(job=job_2, status='sending')
partial_notification(job=job_2, status='sending') create_notification(job=job_2, status='sending')
partial_notification(job=job_2, status='sending') create_notification(job=job_2, status='sending')
partial_notification(job=job_2, status='sending')
resp_json = admin_request.get('job.get_jobs_by_service', service_id=sample_template.service_id)
with notify_api.test_request_context():
with notify_api.test_client() as client:
path = '/service/{}/job'.format(sample_service.id)
auth_header = create_authorization_header()
response = client.get(path, headers=[auth_header])
assert response.status_code == 200
resp_json = json.loads(response.get_data(as_text=True))
assert len(resp_json['data']) == 2 assert len(resp_json['data']) == 2
assert resp_json['data'][0]['id'] == str(job_2.id) assert resp_json['data'][0]['id'] == str(job_2.id)
assert {'status': 'sending', 'count': 3} in resp_json['data'][0]['statistics'] assert {'status': 'sending', 'count': 3} in resp_json['data'][0]['statistics']
@@ -695,24 +592,14 @@ def test_get_jobs_should_return_statistics(client, notify_db, notify_db_session,
assert {'status': 'created', 'count': 3} in resp_json['data'][1]['statistics'] assert {'status': 'created', 'count': 3} in resp_json['data'][1]['statistics']
def test_get_jobs_should_return_no_stats_if_no_rows_in_notifications( def test_get_jobs_should_return_no_stats_if_no_rows_in_notifications(admin_request, sample_template):
client,
notify_db,
notify_db_session,
notify_api,
sample_service,
):
now = datetime.utcnow() now = datetime.utcnow()
earlier = datetime.utcnow() - timedelta(days=1) earlier = datetime.utcnow() - timedelta(days=1)
job_1 = create_job(notify_db, notify_db_session, service=sample_service, created_at=earlier) job_1 = create_job(sample_template, created_at=earlier)
job_2 = create_job(notify_db, notify_db_session, service=sample_service, created_at=now) job_2 = create_job(sample_template, created_at=now)
resp_json = admin_request.get('job.get_jobs_by_service', service_id=sample_template.service_id)
path = '/service/{}/job'.format(sample_service.id)
auth_header = create_authorization_header()
response = client.get(path, headers=[auth_header])
assert response.status_code == 200
resp_json = json.loads(response.get_data(as_text=True))
assert len(resp_json['data']) == 2 assert len(resp_json['data']) == 2
assert resp_json['data'][0]['id'] == str(job_2.id) assert resp_json['data'][0]['id'] == str(job_2.id)
assert resp_json['data'][0]['statistics'] == [] assert resp_json['data'][0]['statistics'] == []
@@ -720,23 +607,12 @@ def test_get_jobs_should_return_no_stats_if_no_rows_in_notifications(
assert resp_json['data'][1]['statistics'] == [] assert resp_json['data'][1]['statistics'] == []
def test_get_jobs_should_paginate( def test_get_jobs_should_paginate(admin_request, sample_template):
notify_db, create_10_jobs(sample_template)
notify_db_session,
client,
sample_template
):
create_10_jobs(notify_db, notify_db_session, sample_template.service, sample_template)
path = '/service/{}/job'.format(sample_template.service_id) with set_config(admin_request.app, 'PAGE_SIZE', 2):
auth_header = create_authorization_header() resp_json = admin_request.get('job.get_jobs_by_service', service_id=sample_template.service_id)
with set_config(client.application, 'PAGE_SIZE', 2):
response = client.get(path, headers=[auth_header])
assert response.status_code == 200
resp_json = json.loads(response.get_data(as_text=True))
assert len(resp_json['data']) == 2
assert resp_json['data'][0]['created_at'] == '2015-01-01T10:00:00+00:00' assert resp_json['data'][0]['created_at'] == '2015-01-01T10:00:00+00:00'
assert resp_json['data'][1]['created_at'] == '2015-01-01T09:00:00+00:00' assert resp_json['data'][1]['created_at'] == '2015-01-01T09:00:00+00:00'
assert resp_json['page_size'] == 2 assert resp_json['page_size'] == 2
@@ -745,23 +621,12 @@ def test_get_jobs_should_paginate(
assert set(resp_json['links'].keys()) == {'next', 'last'} assert set(resp_json['links'].keys()) == {'next', 'last'}
def test_get_jobs_accepts_page_parameter( def test_get_jobs_accepts_page_parameter(admin_request, sample_template):
notify_db, create_10_jobs(sample_template)
notify_db_session,
client,
sample_template
):
create_10_jobs(notify_db, notify_db_session, sample_template.service, sample_template)
path = '/service/{}/job'.format(sample_template.service_id) with set_config(admin_request.app, 'PAGE_SIZE', 2):
auth_header = create_authorization_header() resp_json = admin_request.get('job.get_jobs_by_service', service_id=sample_template.service_id, page=2)
with set_config(client.application, 'PAGE_SIZE', 2):
response = client.get(path, headers=[auth_header], query_string={'page': 2})
assert response.status_code == 200
resp_json = json.loads(response.get_data(as_text=True))
assert len(resp_json['data']) == 2
assert resp_json['data'][0]['created_at'] == '2015-01-01T08:00:00+00:00' assert resp_json['data'][0]['created_at'] == '2015-01-01T08:00:00+00:00'
assert resp_json['data'][1]['created_at'] == '2015-01-01T07:00:00+00:00' assert resp_json['data'][1]['created_at'] == '2015-01-01T07:00:00+00:00'
assert resp_json['page_size'] == 2 assert resp_json['page_size'] == 2
@@ -778,71 +643,50 @@ def test_get_jobs_accepts_page_parameter(
# bad statuses are accepted, just return no data # bad statuses are accepted, just return no data
('foo', []) ('foo', [])
]) ])
def test_get_jobs_can_filter_on_statuses( def test_get_jobs_can_filter_on_statuses(admin_request, sample_template, statuses_filter, expected_statuses):
notify_db, create_job(sample_template, job_status='pending')
notify_db_session, create_job(sample_template, job_status='in progress')
client, create_job(sample_template, job_status='finished')
sample_service, create_job(sample_template, job_status='sending limits exceeded')
statuses_filter, create_job(sample_template, job_status='scheduled')
expected_statuses create_job(sample_template, job_status='cancelled')
): create_job(sample_template, job_status='ready to send')
create_job(notify_db, notify_db_session, job_status='pending') create_job(sample_template, job_status='sent to dvla')
create_job(notify_db, notify_db_session, job_status='in progress') create_job(sample_template, job_status='error')
create_job(notify_db, notify_db_session, job_status='finished')
create_job(notify_db, notify_db_session, job_status='sending limits exceeded')
create_job(notify_db, notify_db_session, job_status='scheduled')
create_job(notify_db, notify_db_session, job_status='cancelled')
create_job(notify_db, notify_db_session, job_status='ready to send')
create_job(notify_db, notify_db_session, job_status='sent to dvla')
create_job(notify_db, notify_db_session, job_status='error')
path = '/service/{}/job'.format(sample_service.id) resp_json = admin_request.get(
response = client.get( 'job.get_jobs_by_service',
path, service_id=sample_template.service_id,
headers=[create_authorization_header()], statuses=statuses_filter
query_string={'statuses': statuses_filter}
) )
assert response.status_code == 200
resp_json = json.loads(response.get_data(as_text=True))
from pprint import pprint
pprint(resp_json)
assert {x['job_status'] for x in resp_json['data']} == set(expected_statuses) assert {x['job_status'] for x in resp_json['data']} == set(expected_statuses)
def create_10_jobs(db, session, service, template): def create_10_jobs(template):
with freeze_time('2015-01-01T00:00:00') as the_time: with freeze_time('2015-01-01T00:00:00') as the_time:
for _ in range(10): for _ in range(10):
the_time.tick(timedelta(hours=1)) the_time.tick(timedelta(hours=1))
create_job(db, session, service, template) create_job(template)
def test_get_all_notifications_for_job_returns_csv_format( def test_get_all_notifications_for_job_returns_csv_format(admin_request, sample_notification_with_job):
client, resp = admin_request.get(
notify_db, 'job.get_all_notifications_for_service_job',
notify_db_session, service_id=sample_notification_with_job.service_id,
): job_id=sample_notification_with_job.job_id,
job = create_job(notify_db, notify_db_session) format_for_csv=True
notification = create_notification(
notify_db,
notify_db_session,
job=job,
job_row_number=1,
created_at=datetime.utcnow(),
) )
path = '/service/{}/job/{}/notifications'.format(notification.service.id, job.id)
response = client.get(
path=path,
headers=[create_authorization_header()],
query_string={'format_for_csv': True}
)
assert response.status_code == 200
resp = json.loads(response.get_data(as_text=True))
assert len(resp['notifications']) == 1 assert len(resp['notifications']) == 1
notification = resp['notifications'][0] assert set(resp['notifications'][0].keys()) == {
assert set(notification.keys()) == \ 'created_at',
set(['created_at', 'created_by_name', 'created_by_email_address', 'template_type', 'created_by_name',
'template_name', 'job_name', 'status', 'row_number', 'recipient']) 'created_by_email_address',
'template_type',
'template_name',
'job_name',
'status',
'row_number',
'recipient'
}