Normalise whitespace and use client fixtures

Using the client fixture means that fewer nested indentation is needed.
Which, along with consistent indenting, makes the code easier to read.
This commit is contained in:
Chris Hill-Scott
2018-04-30 11:46:58 +01:00
parent 53c2bb9855
commit 79c6671500

View File

@@ -1,6 +1,7 @@
import json import json
import uuid import uuid
from datetime import datetime, timedelta from datetime import datetime, timedelta
from functools import partial
from freezegun import freeze_time from freezegun import freeze_time
import pytest import pytest
@@ -17,143 +18,131 @@ from app.dao.templates_dao import dao_update_template
from app.models import NOTIFICATION_STATUS_TYPES, JOB_STATUS_TYPES, JOB_STATUS_PENDING from app.models import NOTIFICATION_STATUS_TYPES, JOB_STATUS_TYPES, JOB_STATUS_PENDING
def test_get_job_with_invalid_service_id_returns404(notify_api, sample_service): def test_get_job_with_invalid_service_id_returns404(client, sample_service):
with notify_api.test_request_context(): path = '/service/{}/job'.format(sample_service.id)
with notify_api.test_client() as client: auth_header = create_authorization_header()
path = '/service/{}/job'.format(sample_service.id) response = client.get(path, headers=[auth_header])
auth_header = create_authorization_header() assert response.status_code == 200
response = client.get(path, headers=[auth_header]) resp_json = json.loads(response.get_data(as_text=True))
assert response.status_code == 200 assert len(resp_json['data']) == 0
resp_json = json.loads(response.get_data(as_text=True))
assert len(resp_json['data']) == 0
def test_get_job_with_invalid_job_id_returns404(notify_api, sample_template): def test_get_job_with_invalid_job_id_returns404(client, sample_template):
service_id = sample_template.service.id service_id = sample_template.service.id
with notify_api.test_request_context(): path = '/service/{}/job/{}'.format(service_id, "bad-id")
with notify_api.test_client() as client: auth_header = create_authorization_header()
path = '/service/{}/job/{}'.format(service_id, "bad-id") response = client.get(path, headers=[auth_header])
auth_header = create_authorization_header() assert response.status_code == 404
response = client.get(path, headers=[auth_header]) resp_json = json.loads(response.get_data(as_text=True))
assert response.status_code == 404 assert resp_json['result'] == 'error'
resp_json = json.loads(response.get_data(as_text=True)) assert resp_json['message'] == 'No result found'
assert resp_json['result'] == 'error'
assert resp_json['message'] == 'No result found'
def test_get_job_with_unknown_id_returns404(notify_api, sample_template, fake_uuid): def test_get_job_with_unknown_id_returns404(client, sample_template, fake_uuid):
service_id = sample_template.service.id service_id = sample_template.service.id
with notify_api.test_request_context(): path = '/service/{}/job/{}'.format(service_id, fake_uuid)
with notify_api.test_client() as client: auth_header = create_authorization_header()
path = '/service/{}/job/{}'.format(service_id, fake_uuid) response = client.get(path, headers=[auth_header])
auth_header = create_authorization_header() assert response.status_code == 404
response = client.get(path, headers=[auth_header]) resp_json = json.loads(response.get_data(as_text=True))
assert response.status_code == 404 assert resp_json == {
resp_json = json.loads(response.get_data(as_text=True)) 'message': 'No result found',
assert resp_json == { 'result': 'error'
'message': 'No result found', }
'result': 'error'
}
def test_cancel_job(notify_api, sample_scheduled_job): def test_cancel_job(client, sample_scheduled_job):
job_id = str(sample_scheduled_job.id) job_id = str(sample_scheduled_job.id)
service_id = sample_scheduled_job.service.id service_id = sample_scheduled_job.service.id
with notify_api.test_request_context(), notify_api.test_client() as client: path = '/service/{}/job/{}/cancel'.format(service_id, job_id)
path = '/service/{}/job/{}/cancel'.format(service_id, job_id) auth_header = create_authorization_header()
auth_header = create_authorization_header() response = client.post(path, headers=[auth_header])
response = client.post(path, headers=[auth_header]) assert response.status_code == 200
assert response.status_code == 200 resp_json = json.loads(response.get_data(as_text=True))
resp_json = json.loads(response.get_data(as_text=True)) assert resp_json['data']['id'] == job_id
assert resp_json['data']['id'] == job_id assert resp_json['data']['job_status'] == 'cancelled'
assert resp_json['data']['job_status'] == 'cancelled'
def test_cant_cancel_normal_job(notify_api, sample_job, mocker): def test_cant_cancel_normal_job(client, sample_job, mocker):
job_id = str(sample_job.id) job_id = str(sample_job.id)
service_id = sample_job.service.id service_id = sample_job.service.id
with notify_api.test_request_context(), notify_api.test_client() as client: mock_update = mocker.patch('app.dao.jobs_dao.dao_update_job')
mock_update = mocker.patch('app.dao.jobs_dao.dao_update_job') path = '/service/{}/job/{}/cancel'.format(service_id, job_id)
path = '/service/{}/job/{}/cancel'.format(service_id, job_id) auth_header = create_authorization_header()
auth_header = create_authorization_header() response = client.post(path, headers=[auth_header])
response = client.post(path, headers=[auth_header]) assert response.status_code == 404
assert response.status_code == 404 assert mock_update.call_count == 0
assert mock_update.call_count == 0
def test_create_unscheduled_job(notify_api, sample_template, mocker, fake_uuid): def test_create_unscheduled_job(client, sample_template, mocker, fake_uuid):
with notify_api.test_request_context(): mocker.patch('app.celery.tasks.process_job.apply_async')
with notify_api.test_client() as client: data = {
mocker.patch('app.celery.tasks.process_job.apply_async') 'id': fake_uuid,
data = { 'service': str(sample_template.service.id),
'id': fake_uuid, 'template': str(sample_template.id),
'service': str(sample_template.service.id), 'original_file_name': 'thisisatest.csv',
'template': str(sample_template.id), 'notification_count': 1,
'original_file_name': 'thisisatest.csv', 'created_by': str(sample_template.created_by.id)
'notification_count': 1, }
'created_by': str(sample_template.created_by.id) path = '/service/{}/job'.format(sample_template.service.id)
} auth_header = create_authorization_header()
path = '/service/{}/job'.format(sample_template.service.id) headers = [('Content-Type', 'application/json'), auth_header]
auth_header = create_authorization_header()
headers = [('Content-Type', 'application/json'), auth_header]
response = client.post( response = client.post(
path, path,
data=json.dumps(data), data=json.dumps(data),
headers=headers) headers=headers)
assert response.status_code == 201 assert response.status_code == 201
app.celery.tasks.process_job.apply_async.assert_called_once_with( app.celery.tasks.process_job.apply_async.assert_called_once_with(
([str(fake_uuid)]), ([str(fake_uuid)]),
queue="job-tasks" queue="job-tasks"
) )
resp_json = json.loads(response.get_data(as_text=True)) resp_json = json.loads(response.get_data(as_text=True))
assert resp_json['data']['id'] == fake_uuid assert resp_json['data']['id'] == fake_uuid
assert resp_json['data']['statistics'] == [] assert resp_json['data']['statistics'] == []
assert resp_json['data']['job_status'] == 'pending' assert resp_json['data']['job_status'] == 'pending'
assert not resp_json['data']['scheduled_for'] assert not resp_json['data']['scheduled_for']
assert resp_json['data']['job_status'] == 'pending' assert resp_json['data']['job_status'] == 'pending'
assert resp_json['data']['template'] == str(sample_template.id) assert resp_json['data']['template'] == str(sample_template.id)
assert resp_json['data']['original_file_name'] == 'thisisatest.csv' assert resp_json['data']['original_file_name'] == 'thisisatest.csv'
def test_create_scheduled_job(notify_api, sample_template, mocker, fake_uuid): @freeze_time("2016-01-01 12:00:00.000000")
with notify_api.test_request_context(): def test_create_scheduled_job(client, sample_template, mocker, fake_uuid):
with notify_api.test_client() as client: scheduled_date = (datetime.utcnow() + timedelta(hours=95, minutes=59)).isoformat()
with freeze_time("2016-01-01 12:00:00.000000"): mocker.patch('app.celery.tasks.process_job.apply_async')
scheduled_date = (datetime.utcnow() + timedelta(hours=95, minutes=59)).isoformat() data = {
mocker.patch('app.celery.tasks.process_job.apply_async') 'id': fake_uuid,
data = { 'service': str(sample_template.service.id),
'id': fake_uuid, 'template': str(sample_template.id),
'service': str(sample_template.service.id), 'original_file_name': 'thisisatest.csv',
'template': str(sample_template.id), 'notification_count': 1,
'original_file_name': 'thisisatest.csv', 'created_by': str(sample_template.created_by.id),
'notification_count': 1, 'scheduled_for': scheduled_date
'created_by': str(sample_template.created_by.id), }
'scheduled_for': scheduled_date path = '/service/{}/job'.format(sample_template.service.id)
} auth_header = create_authorization_header()
path = '/service/{}/job'.format(sample_template.service.id) headers = [('Content-Type', 'application/json'), auth_header]
auth_header = create_authorization_header()
headers = [('Content-Type', 'application/json'), auth_header]
response = client.post( response = client.post(
path, path,
data=json.dumps(data), data=json.dumps(data),
headers=headers) headers=headers)
assert response.status_code == 201 assert response.status_code == 201
app.celery.tasks.process_job.apply_async.assert_not_called() app.celery.tasks.process_job.apply_async.assert_not_called()
resp_json = json.loads(response.get_data(as_text=True)) resp_json = json.loads(response.get_data(as_text=True))
assert resp_json['data']['id'] == fake_uuid assert resp_json['data']['id'] == fake_uuid
assert resp_json['data']['scheduled_for'] == datetime(2016, 1, 5, 11, 59, 0, assert resp_json['data']['scheduled_for'] == datetime(2016, 1, 5, 11, 59, 0,
tzinfo=pytz.UTC).isoformat() tzinfo=pytz.UTC).isoformat()
assert resp_json['data']['job_status'] == 'scheduled' assert resp_json['data']['job_status'] == 'scheduled'
assert resp_json['data']['template'] == str(sample_template.id) assert resp_json['data']['template'] == str(sample_template.id)
assert resp_json['data']['original_file_name'] == 'thisisatest.csv' assert resp_json['data']['original_file_name'] == 'thisisatest.csv'
def test_create_job_returns_403_if_service_is_not_active(client, fake_uuid, sample_service, mocker): def test_create_job_returns_403_if_service_is_not_active(client, fake_uuid, sample_service, mocker):
@@ -172,7 +161,8 @@ def test_create_job_returns_403_if_service_is_not_active(client, fake_uuid, samp
def test_create_job_returns_403_if_letter_template_type_and_service_in_trial( def test_create_job_returns_403_if_letter_template_type_and_service_in_trial(
client, fake_uuid, sample_trial_letter_template, mocker): client, fake_uuid, sample_trial_letter_template, mocker
):
data = { data = {
'id': fake_uuid, 'id': fake_uuid,
'service': str(sample_trial_letter_template.service.id), 'service': str(sample_trial_letter_template.service.id),
@@ -194,166 +184,152 @@ def test_create_job_returns_403_if_letter_template_type_and_service_in_trial(
mock_job_dao.assert_not_called() mock_job_dao.assert_not_called()
def test_should_not_create_scheduled_job_more_then_24_hours_hence(notify_api, sample_template, mocker, fake_uuid): @freeze_time("2016-01-01 11:09:00.061258")
with notify_api.test_request_context(): def test_should_not_create_scheduled_job_more_then_24_hours_hence(client, sample_template, mocker, fake_uuid):
with notify_api.test_client() as client: scheduled_date = (datetime.utcnow() + timedelta(hours=96, minutes=1)).isoformat()
with freeze_time("2016-01-01 11:09:00.061258"): mocker.patch('app.celery.tasks.process_job.apply_async')
scheduled_date = (datetime.utcnow() + timedelta(hours=96, minutes=1)).isoformat() data = {
'id': fake_uuid,
'service': str(sample_template.service.id),
'template': str(sample_template.id),
'original_file_name': 'thisisatest.csv',
'notification_count': 1,
'created_by': str(sample_template.created_by.id),
'scheduled_for': scheduled_date
}
path = '/service/{}/job'.format(sample_template.service.id)
auth_header = create_authorization_header()
headers = [('Content-Type', 'application/json'), auth_header]
mocker.patch('app.celery.tasks.process_job.apply_async') response = client.post(
data = { path,
'id': fake_uuid, data=json.dumps(data),
'service': str(sample_template.service.id), headers=headers)
'template': str(sample_template.id), assert response.status_code == 400
'original_file_name': 'thisisatest.csv',
'notification_count': 1,
'created_by': str(sample_template.created_by.id),
'scheduled_for': scheduled_date
}
path = '/service/{}/job'.format(sample_template.service.id)
auth_header = create_authorization_header()
headers = [('Content-Type', 'application/json'), auth_header]
response = client.post( app.celery.tasks.process_job.apply_async.assert_not_called()
path,
data=json.dumps(data),
headers=headers)
assert response.status_code == 400
app.celery.tasks.process_job.apply_async.assert_not_called() resp_json = json.loads(response.get_data(as_text=True))
assert resp_json['result'] == 'error'
resp_json = json.loads(response.get_data(as_text=True)) assert 'scheduled_for' in resp_json['message']
assert resp_json['result'] == 'error' assert resp_json['message']['scheduled_for'] == ['Date cannot be more than 96hrs in the future']
assert 'scheduled_for' in resp_json['message']
assert resp_json['message']['scheduled_for'] == ['Date cannot be more than 96hrs in the future']
def test_should_not_create_scheduled_job_in_the_past(notify_api, sample_template, mocker, fake_uuid): @freeze_time("2016-01-01 11:09:00.061258")
with notify_api.test_request_context(): def test_should_not_create_scheduled_job_in_the_past(client, sample_template, mocker, fake_uuid):
with notify_api.test_client() as client: scheduled_date = (datetime.utcnow() - timedelta(minutes=1)).isoformat()
with freeze_time("2016-01-01 11:09:00.061258"): mocker.patch('app.celery.tasks.process_job.apply_async')
scheduled_date = (datetime.utcnow() - timedelta(minutes=1)).isoformat() data = {
'id': fake_uuid,
'service': str(sample_template.service.id),
'template': str(sample_template.id),
'original_file_name': 'thisisatest.csv',
'notification_count': 1,
'created_by': str(sample_template.created_by.id),
'scheduled_for': scheduled_date
}
path = '/service/{}/job'.format(sample_template.service.id)
auth_header = create_authorization_header()
headers = [('Content-Type', 'application/json'), auth_header]
mocker.patch('app.celery.tasks.process_job.apply_async') response = client.post(
data = { path,
'id': fake_uuid, data=json.dumps(data),
'service': str(sample_template.service.id), headers=headers)
'template': str(sample_template.id), assert response.status_code == 400
'original_file_name': 'thisisatest.csv',
'notification_count': 1,
'created_by': str(sample_template.created_by.id),
'scheduled_for': scheduled_date
}
path = '/service/{}/job'.format(sample_template.service.id)
auth_header = create_authorization_header()
headers = [('Content-Type', 'application/json'), auth_header]
response = client.post( app.celery.tasks.process_job.apply_async.assert_not_called()
path,
data=json.dumps(data),
headers=headers)
assert response.status_code == 400
app.celery.tasks.process_job.apply_async.assert_not_called() resp_json = json.loads(response.get_data(as_text=True))
assert resp_json['result'] == 'error'
resp_json = json.loads(response.get_data(as_text=True)) assert 'scheduled_for' in resp_json['message']
assert resp_json['result'] == 'error' assert resp_json['message']['scheduled_for'] == ['Date cannot be in the past']
assert 'scheduled_for' in resp_json['message']
assert resp_json['message']['scheduled_for'] == ['Date cannot be in the past']
def test_create_job_returns_400_if_missing_data(notify_api, sample_template, mocker): def test_create_job_returns_400_if_missing_data(client, sample_template, mocker):
with notify_api.test_request_context(): mocker.patch('app.celery.tasks.process_job.apply_async')
with notify_api.test_client() as client: data = {
mocker.patch('app.celery.tasks.process_job.apply_async') 'template': str(sample_template.id)
data = { }
'template': str(sample_template.id) path = '/service/{}/job'.format(sample_template.service.id)
} auth_header = create_authorization_header()
path = '/service/{}/job'.format(sample_template.service.id) headers = [('Content-Type', 'application/json'), auth_header]
auth_header = create_authorization_header() response = client.post(
headers = [('Content-Type', 'application/json'), auth_header] path,
response = client.post( data=json.dumps(data),
path, headers=headers)
data=json.dumps(data),
headers=headers)
resp_json = json.loads(response.get_data(as_text=True)) resp_json = json.loads(response.get_data(as_text=True))
assert response.status_code == 400 assert response.status_code == 400
app.celery.tasks.process_job.apply_async.assert_not_called() app.celery.tasks.process_job.apply_async.assert_not_called()
assert resp_json['result'] == 'error' assert resp_json['result'] == 'error'
assert 'Missing data for required field.' in resp_json['message']['original_file_name'] assert 'Missing data for required field.' in resp_json['message']['original_file_name']
assert 'Missing data for required field.' in resp_json['message']['notification_count'] assert 'Missing data for required field.' in resp_json['message']['notification_count']
def test_create_job_returns_404_if_template_does_not_exist(notify_api, sample_service, mocker): def test_create_job_returns_404_if_template_does_not_exist(client, sample_service, mocker):
with notify_api.test_request_context(): mocker.patch('app.celery.tasks.process_job.apply_async')
with notify_api.test_client() as client: data = {
mocker.patch('app.celery.tasks.process_job.apply_async') 'template': str(sample_service.id)
data = { }
'template': str(sample_service.id) path = '/service/{}/job'.format(sample_service.id)
} auth_header = create_authorization_header()
path = '/service/{}/job'.format(sample_service.id) headers = [('Content-Type', 'application/json'), auth_header]
auth_header = create_authorization_header() response = client.post(
headers = [('Content-Type', 'application/json'), auth_header] path,
response = client.post( data=json.dumps(data),
path, headers=headers)
data=json.dumps(data),
headers=headers)
resp_json = json.loads(response.get_data(as_text=True)) resp_json = json.loads(response.get_data(as_text=True))
assert response.status_code == 404 assert response.status_code == 404
app.celery.tasks.process_job.apply_async.assert_not_called() app.celery.tasks.process_job.apply_async.assert_not_called()
assert resp_json['result'] == 'error' assert resp_json['result'] == 'error'
assert resp_json['message'] == 'No result found' assert resp_json['message'] == 'No result found'
def test_create_job_returns_404_if_missing_service(notify_api, sample_template, mocker): def test_create_job_returns_404_if_missing_service(client, sample_template, mocker):
with notify_api.test_request_context(): mocker.patch('app.celery.tasks.process_job.apply_async')
with notify_api.test_client() as client: random_id = str(uuid.uuid4())
mocker.patch('app.celery.tasks.process_job.apply_async') data = {'template': str(sample_template.id)}
random_id = str(uuid.uuid4()) path = '/service/{}/job'.format(random_id)
data = {'template': str(sample_template.id)} auth_header = create_authorization_header()
path = '/service/{}/job'.format(random_id) headers = [('Content-Type', 'application/json'), auth_header]
auth_header = create_authorization_header() response = client.post(
headers = [('Content-Type', 'application/json'), auth_header] path,
response = client.post( data=json.dumps(data),
path, headers=headers)
data=json.dumps(data),
headers=headers)
resp_json = json.loads(response.get_data(as_text=True)) resp_json = json.loads(response.get_data(as_text=True))
assert response.status_code == 404 assert response.status_code == 404
app.celery.tasks.process_job.apply_async.assert_not_called() app.celery.tasks.process_job.apply_async.assert_not_called()
assert resp_json['result'] == 'error' assert resp_json['result'] == 'error'
assert resp_json['message'] == 'No result found' assert resp_json['message'] == 'No result found'
def test_create_job_returns_400_if_archived_template(notify_api, sample_template, mocker): def test_create_job_returns_400_if_archived_template(client, sample_template, mocker):
with notify_api.test_request_context(): mocker.patch('app.celery.tasks.process_job.apply_async')
with notify_api.test_client() as client: sample_template.archived = True
mocker.patch('app.celery.tasks.process_job.apply_async') dao_update_template(sample_template)
sample_template.archived = True data = {
dao_update_template(sample_template) 'template': str(sample_template.id)
data = { }
'template': str(sample_template.id) path = '/service/{}/job'.format(sample_template.service.id)
} auth_header = create_authorization_header()
path = '/service/{}/job'.format(sample_template.service.id) headers = [('Content-Type', 'application/json'), auth_header]
auth_header = create_authorization_header() response = client.post(
headers = [('Content-Type', 'application/json'), auth_header] path,
response = client.post( data=json.dumps(data),
path, headers=headers)
data=json.dumps(data),
headers=headers)
resp_json = json.loads(response.get_data(as_text=True)) resp_json = json.loads(response.get_data(as_text=True))
assert response.status_code == 400 assert response.status_code == 400
app.celery.tasks.process_job.apply_async.assert_not_called() app.celery.tasks.process_job.apply_async.assert_not_called()
assert resp_json['result'] == 'error' assert resp_json['result'] == 'error'
assert 'Template has been deleted' in resp_json['message']['template'] assert 'Template has been deleted' in resp_json['message']['template']
def _setup_jobs(notify_db, notify_db_session, template, number_of_jobs=5): def _setup_jobs(notify_db, notify_db_session, template, number_of_jobs=5):
@@ -365,55 +341,53 @@ def _setup_jobs(notify_db, notify_db_session, template, number_of_jobs=5):
template=template) template=template)
def test_get_all_notifications_for_job_in_order_of_job_number(notify_api, def test_get_all_notifications_for_job_in_order_of_job_number(
notify_db, client, notify_db, notify_db_session, sample_service
notify_db_session, ):
sample_service): main_job = create_job(notify_db, notify_db_session, service=sample_service)
with notify_api.test_request_context(), notify_api.test_client() as client: another_job = create_job(notify_db, notify_db_session, service=sample_service)
main_job = create_job(notify_db, notify_db_session, service=sample_service)
another_job = create_job(notify_db, notify_db_session, service=sample_service)
notification_1 = create_notification( notification_1 = create_notification(
notify_db, notify_db,
notify_db_session, notify_db_session,
job=main_job, job=main_job,
to_field="1", to_field="1",
created_at=datetime.utcnow(), created_at=datetime.utcnow(),
job_row_number=1 job_row_number=1
) )
notification_2 = create_notification( notification_2 = create_notification(
notify_db, notify_db,
notify_db_session, notify_db_session,
job=main_job, job=main_job,
to_field="2", to_field="2",
created_at=datetime.utcnow(), created_at=datetime.utcnow(),
job_row_number=2 job_row_number=2
) )
notification_3 = create_notification( notification_3 = create_notification(
notify_db, notify_db,
notify_db_session, notify_db_session,
job=main_job, job=main_job,
to_field="3", to_field="3",
created_at=datetime.utcnow(), created_at=datetime.utcnow(),
job_row_number=3 job_row_number=3
) )
create_notification(notify_db, notify_db_session, job=another_job) create_notification(notify_db, notify_db_session, job=another_job)
auth_header = create_authorization_header() auth_header = create_authorization_header()
response = client.get( response = client.get(
path='/service/{}/job/{}/notifications'.format(sample_service.id, main_job.id), path='/service/{}/job/{}/notifications'.format(sample_service.id, main_job.id),
headers=[auth_header]) headers=[auth_header])
resp = json.loads(response.get_data(as_text=True)) resp = json.loads(response.get_data(as_text=True))
assert len(resp['notifications']) == 3 assert len(resp['notifications']) == 3
assert resp['notifications'][0]['to'] == notification_1.to assert resp['notifications'][0]['to'] == notification_1.to
assert resp['notifications'][0]['job_row_number'] == notification_1.job_row_number assert resp['notifications'][0]['job_row_number'] == notification_1.job_row_number
assert resp['notifications'][1]['to'] == notification_2.to assert resp['notifications'][1]['to'] == notification_2.to
assert resp['notifications'][1]['job_row_number'] == notification_2.job_row_number assert resp['notifications'][1]['job_row_number'] == notification_2.job_row_number
assert resp['notifications'][2]['to'] == notification_3.to assert resp['notifications'][2]['to'] == notification_3.to
assert resp['notifications'][2]['job_row_number'] == notification_3.job_row_number assert resp['notifications'][2]['job_row_number'] == notification_3.job_row_number
assert response.status_code == 200 assert response.status_code == 200
@pytest.mark.parametrize( @pytest.mark.parametrize(
@@ -426,33 +400,32 @@ def test_get_all_notifications_for_job_in_order_of_job_number(notify_api,
] ]
) )
def test_get_all_notifications_for_job_filtered_by_status( def test_get_all_notifications_for_job_filtered_by_status(
notify_api, client,
notify_db, notify_db,
notify_db_session, notify_db_session,
sample_service, sample_service,
expected_notification_count, expected_notification_count,
status_args status_args
): ):
with notify_api.test_request_context(), notify_api.test_client() as client: job = create_job(notify_db, notify_db_session, service=sample_service)
job = create_job(notify_db, notify_db_session, service=sample_service)
create_notification( create_notification(
notify_db, notify_db,
notify_db_session, notify_db_session,
job=job, job=job,
to_field="1", to_field="1",
created_at=datetime.utcnow(), created_at=datetime.utcnow(),
status=NOTIFICATION_STATUS_TYPES[0], status=NOTIFICATION_STATUS_TYPES[0],
job_row_number=1 job_row_number=1
) )
response = client.get( response = client.get(
path='/service/{}/job/{}/notifications{}'.format(sample_service.id, job.id, status_args), path='/service/{}/job/{}/notifications{}'.format(sample_service.id, job.id, status_args),
headers=[create_authorization_header()] headers=[create_authorization_header()]
) )
resp = json.loads(response.get_data(as_text=True)) resp = json.loads(response.get_data(as_text=True))
assert len(resp['notifications']) == expected_notification_count assert len(resp['notifications']) == expected_notification_count
assert response.status_code == 200 assert response.status_code == 200
def test_get_all_notifications_for_job_returns_correct_format( def test_get_all_notifications_for_job_returns_correct_format(
@@ -487,85 +460,83 @@ def test_get_job_by_id(notify_api, sample_job):
assert resp_json['data']['created_by']['name'] == 'Test User' assert resp_json['data']['created_by']['name'] == 'Test User'
def test_get_job_by_id_should_return_statistics(notify_db, notify_db_session, notify_api, sample_job): def test_get_job_by_id_should_return_statistics(client, notify_db, notify_db_session, notify_api, sample_job):
job_id = str(sample_job.id) job_id = str(sample_job.id)
service_id = sample_job.service.id service_id = sample_job.service.id
partial_notification = partial(
create_notification, notify_db, notify_db_session, service=sample_job.service, job=sample_job
)
partial_notification(status='created')
partial_notification(status='sending')
partial_notification(status='delivered')
partial_notification(status='pending')
partial_notification(status='failed')
partial_notification(status='technical-failure') # noqa
partial_notification(status='temporary-failure') # noqa
partial_notification(status='permanent-failure') # noqa
create_notification(notify_db, notify_db_session, service=sample_job.service, job=sample_job, status='created') path = '/service/{}/job/{}'.format(service_id, job_id)
create_notification(notify_db, notify_db_session, service=sample_job.service, job=sample_job, status='sending') auth_header = create_authorization_header()
create_notification(notify_db, notify_db_session, service=sample_job.service, job=sample_job, status='delivered') response = client.get(path, headers=[auth_header])
create_notification(notify_db, notify_db_session, service=sample_job.service, job=sample_job, status='pending') assert response.status_code == 200
create_notification(notify_db, notify_db_session, service=sample_job.service, job=sample_job, status='failed') resp_json = json.loads(response.get_data(as_text=True))
create_notification(notify_db, notify_db_session, service=sample_job.service, job=sample_job, status='technical-failure') # noqa assert resp_json['data']['id'] == job_id
create_notification(notify_db, notify_db_session, service=sample_job.service, job=sample_job, status='temporary-failure') # noqa assert {'status': 'created', 'count': 1} in resp_json['data']['statistics']
create_notification(notify_db, notify_db_session, service=sample_job.service, job=sample_job, status='permanent-failure') # noqa assert {'status': 'sending', 'count': 1} in resp_json['data']['statistics']
assert {'status': 'delivered', 'count': 1} in resp_json['data']['statistics']
with notify_api.test_request_context(): assert {'status': 'pending', 'count': 1} in resp_json['data']['statistics']
with notify_api.test_client() as client: assert {'status': 'failed', 'count': 1} in resp_json['data']['statistics']
path = '/service/{}/job/{}'.format(service_id, job_id) assert {'status': 'technical-failure', 'count': 1} in resp_json['data']['statistics']
auth_header = create_authorization_header() assert {'status': 'temporary-failure', 'count': 1} in resp_json['data']['statistics']
response = client.get(path, headers=[auth_header]) assert {'status': 'permanent-failure', 'count': 1} in resp_json['data']['statistics']
assert response.status_code == 200 assert resp_json['data']['created_by']['name'] == 'Test User'
resp_json = json.loads(response.get_data(as_text=True))
assert resp_json['data']['id'] == job_id
assert {'status': 'created', 'count': 1} in resp_json['data']['statistics']
assert {'status': 'sending', 'count': 1} in resp_json['data']['statistics']
assert {'status': 'delivered', 'count': 1} in resp_json['data']['statistics']
assert {'status': 'pending', 'count': 1} in resp_json['data']['statistics']
assert {'status': 'failed', 'count': 1} in resp_json['data']['statistics']
assert {'status': 'technical-failure', 'count': 1} in resp_json['data']['statistics']
assert {'status': 'temporary-failure', 'count': 1} in resp_json['data']['statistics']
assert {'status': 'permanent-failure', 'count': 1} in resp_json['data']['statistics']
assert resp_json['data']['created_by']['name'] == 'Test User'
def test_get_job_by_id_should_return_summed_statistics(notify_db, notify_db_session, notify_api, sample_job): def test_get_job_by_id_should_return_summed_statistics(client, notify_db, notify_db_session, notify_api, sample_job):
job_id = str(sample_job.id) job_id = str(sample_job.id)
service_id = sample_job.service.id service_id = sample_job.service.id
partial_notification = partial(
create_notification, notify_db, notify_db_session, service=sample_job.service, job=sample_job
)
partial_notification(status='created')
partial_notification(status='created')
partial_notification(status='created')
partial_notification(status='sending')
partial_notification(status='failed')
partial_notification(status='failed')
partial_notification(status='failed')
partial_notification(status='technical-failure')
partial_notification(status='temporary-failure')
partial_notification(status='temporary-failure')
create_notification(notify_db, notify_db_session, service=sample_job.service, job=sample_job, status='created') path = '/service/{}/job/{}'.format(service_id, job_id)
create_notification(notify_db, notify_db_session, service=sample_job.service, job=sample_job, status='created') auth_header = create_authorization_header()
create_notification(notify_db, notify_db_session, service=sample_job.service, job=sample_job, status='created') response = client.get(path, headers=[auth_header])
create_notification(notify_db, notify_db_session, service=sample_job.service, job=sample_job, status='sending') assert response.status_code == 200
create_notification(notify_db, notify_db_session, service=sample_job.service, job=sample_job, status='failed') resp_json = json.loads(response.get_data(as_text=True))
create_notification(notify_db, notify_db_session, service=sample_job.service, job=sample_job, status='failed') assert resp_json['data']['id'] == job_id
create_notification(notify_db, notify_db_session, service=sample_job.service, job=sample_job, status='failed') assert {'status': 'created', 'count': 3} in resp_json['data']['statistics']
create_notification(notify_db, notify_db_session, service=sample_job.service, job=sample_job, status='technical-failure') # noqa assert {'status': 'sending', 'count': 1} in resp_json['data']['statistics']
create_notification(notify_db, notify_db_session, service=sample_job.service, job=sample_job, status='temporary-failure') # noqa assert {'status': 'failed', 'count': 3} in resp_json['data']['statistics']
create_notification(notify_db, notify_db_session, service=sample_job.service, job=sample_job, status='temporary-failure') # noqa assert {'status': 'technical-failure', 'count': 1} in resp_json['data']['statistics']
assert {'status': 'temporary-failure', 'count': 2} in resp_json['data']['statistics']
with notify_api.test_request_context(): assert resp_json['data']['created_by']['name'] == 'Test User'
with notify_api.test_client() as client:
path = '/service/{}/job/{}'.format(service_id, job_id)
auth_header = create_authorization_header()
response = client.get(path, headers=[auth_header])
assert response.status_code == 200
resp_json = json.loads(response.get_data(as_text=True))
assert resp_json['data']['id'] == job_id
assert {'status': 'created', 'count': 3} in resp_json['data']['statistics']
assert {'status': 'sending', 'count': 1} in resp_json['data']['statistics']
assert {'status': 'failed', 'count': 3} in resp_json['data']['statistics']
assert {'status': 'technical-failure', 'count': 1} in resp_json['data']['statistics']
assert {'status': 'temporary-failure', 'count': 2} in resp_json['data']['statistics']
assert resp_json['data']['created_by']['name'] == 'Test User'
def test_get_jobs(notify_api, notify_db, notify_db_session, sample_template): def test_get_jobs(client, notify_db, notify_db_session, sample_template):
_setup_jobs(notify_db, notify_db_session, sample_template) _setup_jobs(notify_db, notify_db_session, sample_template)
service_id = sample_template.service.id service_id = sample_template.service.id
with notify_api.test_request_context(): path = '/service/{}/job'.format(service_id)
with notify_api.test_client() as client: auth_header = create_authorization_header()
path = '/service/{}/job'.format(service_id) response = client.get(path, headers=[auth_header])
auth_header = create_authorization_header() assert response.status_code == 200
response = client.get(path, headers=[auth_header]) resp_json = json.loads(response.get_data(as_text=True))
assert response.status_code == 200 assert len(resp_json['data']) == 5
resp_json = json.loads(response.get_data(as_text=True))
assert len(resp_json['data']) == 5
def test_get_jobs_with_limit_days(notify_api, notify_db, notify_db_session, sample_template): def test_get_jobs_with_limit_days(client, notify_db, notify_db_session, sample_template):
create_job( create_job(
notify_db, notify_db,
notify_db_session, notify_db_session,
@@ -581,28 +552,26 @@ def test_get_jobs_with_limit_days(notify_api, notify_db, notify_db_session, samp
service_id = sample_template.service.id service_id = sample_template.service.id
with notify_api.test_request_context(): path = '/service/{}/job'.format(service_id)
with notify_api.test_client() as client: auth_header = create_authorization_header()
path = '/service/{}/job'.format(service_id) response = client.get(path, headers=[auth_header], query_string={'limit_days': 5})
auth_header = create_authorization_header() assert response.status_code == 200
response = client.get(path, headers=[auth_header], query_string={'limit_days': 5}) resp_json = json.loads(response.get_data(as_text=True))
assert response.status_code == 200 assert len(resp_json['data']) == 1
resp_json = json.loads(response.get_data(as_text=True))
assert len(resp_json['data']) == 1
def test_get_jobs_should_return_statistics(notify_db, notify_db_session, notify_api, sample_service): def test_get_jobs_should_return_statistics(client, notify_db, notify_db_session, notify_api, sample_service):
now = datetime.utcnow() now = datetime.utcnow()
earlier = datetime.utcnow() - timedelta(days=1) earlier = datetime.utcnow() - timedelta(days=1)
job_1 = create_job(notify_db, notify_db_session, service=sample_service, created_at=earlier) job_1 = create_job(notify_db, notify_db_session, service=sample_service, created_at=earlier)
job_2 = create_job(notify_db, notify_db_session, service=sample_service, created_at=now) job_2 = create_job(notify_db, notify_db_session, service=sample_service, created_at=now)
partial_notification = partial(create_notification, notify_db, notify_db_session, service=sample_service)
create_notification(notify_db, notify_db_session, service=sample_service, job=job_1, status='created') partial_notification(job=job_1, status='created')
create_notification(notify_db, notify_db_session, service=sample_service, job=job_1, status='created') partial_notification(job=job_1, status='created')
create_notification(notify_db, notify_db_session, service=sample_service, job=job_1, status='created') partial_notification(job=job_1, status='created')
create_notification(notify_db, notify_db_session, service=sample_service, job=job_2, status='sending') partial_notification(job=job_2, status='sending')
create_notification(notify_db, notify_db_session, service=sample_service, job=job_2, status='sending') partial_notification(job=job_2, status='sending')
create_notification(notify_db, notify_db_session, service=sample_service, job=job_2, status='sending') partial_notification(job=job_2, status='sending')
with notify_api.test_request_context(): with notify_api.test_request_context():
with notify_api.test_client() as client: with notify_api.test_client() as client:
@@ -619,35 +588,35 @@ def test_get_jobs_should_return_statistics(notify_db, notify_db_session, notify_
def test_get_jobs_should_return_no_stats_if_no_rows_in_notifications( def test_get_jobs_should_return_no_stats_if_no_rows_in_notifications(
notify_db, client,
notify_db_session, notify_db,
notify_api, notify_db_session,
sample_service): notify_api,
sample_service,
):
now = datetime.utcnow() now = datetime.utcnow()
earlier = datetime.utcnow() - timedelta(days=1) earlier = datetime.utcnow() - timedelta(days=1)
job_1 = create_job(notify_db, notify_db_session, service=sample_service, created_at=earlier) job_1 = create_job(notify_db, notify_db_session, service=sample_service, created_at=earlier)
job_2 = create_job(notify_db, notify_db_session, service=sample_service, created_at=now) job_2 = create_job(notify_db, notify_db_session, service=sample_service, created_at=now)
with notify_api.test_request_context(): path = '/service/{}/job'.format(sample_service.id)
with notify_api.test_client() as client: auth_header = create_authorization_header()
path = '/service/{}/job'.format(sample_service.id) response = client.get(path, headers=[auth_header])
auth_header = create_authorization_header() assert response.status_code == 200
response = client.get(path, headers=[auth_header]) resp_json = json.loads(response.get_data(as_text=True))
assert response.status_code == 200 assert len(resp_json['data']) == 2
resp_json = json.loads(response.get_data(as_text=True)) assert resp_json['data'][0]['id'] == str(job_2.id)
assert len(resp_json['data']) == 2 assert resp_json['data'][0]['statistics'] == []
assert resp_json['data'][0]['id'] == str(job_2.id) assert resp_json['data'][1]['id'] == str(job_1.id)
assert resp_json['data'][0]['statistics'] == [] assert resp_json['data'][1]['statistics'] == []
assert resp_json['data'][1]['id'] == str(job_1.id)
assert resp_json['data'][1]['statistics'] == []
def test_get_jobs_should_paginate( def test_get_jobs_should_paginate(
notify_db, notify_db,
notify_db_session, notify_db_session,
client, client,
sample_template sample_template
): ):
create_10_jobs(notify_db, notify_db_session, sample_template.service, sample_template) create_10_jobs(notify_db, notify_db_session, sample_template.service, sample_template)
@@ -669,10 +638,10 @@ def test_get_jobs_should_paginate(
def test_get_jobs_accepts_page_parameter( def test_get_jobs_accepts_page_parameter(
notify_db, notify_db,
notify_db_session, notify_db_session,
client, client,
sample_template sample_template
): ):
create_10_jobs(notify_db, notify_db_session, sample_template.service, sample_template) create_10_jobs(notify_db, notify_db_session, sample_template.service, sample_template)
@@ -702,12 +671,12 @@ def test_get_jobs_accepts_page_parameter(
('foo', []) ('foo', [])
]) ])
def test_get_jobs_can_filter_on_statuses( def test_get_jobs_can_filter_on_statuses(
notify_db, notify_db,
notify_db_session, notify_db_session,
client, client,
sample_service, sample_service,
statuses_filter, statuses_filter,
expected_statuses expected_statuses
): ):
create_job(notify_db, notify_db_session, job_status='pending') create_job(notify_db, notify_db_session, job_status='pending')
create_job(notify_db, notify_db_session, job_status='in progress') create_job(notify_db, notify_db_session, job_status='in progress')