add statuses filter to GET /service/{}/job

can now pass in a query string `?statuses=x,y,z` to filter jobs based on
`Job.job_status`. Not passing in a status or passing in an empty string is
equivalent to selecting every filter type at once.
This commit is contained in:
Leo Hemsted
2016-09-23 16:34:13 +01:00
parent 281323b435
commit fb6cb5f236
4 changed files with 56 additions and 7 deletions

View File

@@ -14,7 +14,7 @@ from tests.app.conftest import (
sample_notification as create_notification
)
from app.dao.templates_dao import dao_update_template
from app.models import NOTIFICATION_STATUS_TYPES
from app.models import NOTIFICATION_STATUS_TYPES, JOB_STATUS_TYPES, JOB_STATUS_PENDING
def test_get_job_with_invalid_service_id_returns404(notify_api, sample_api_key, sample_service):
@@ -655,6 +655,42 @@ def test_get_jobs_accepts_page_parameter(
assert set(resp_json['links'].keys()) == {'prev', 'next', 'last'}
@pytest.mark.parametrize('statuses_filter, expected_statuses', [
('', JOB_STATUS_TYPES),
('pending', [JOB_STATUS_PENDING]),
('pending, in progress, finished, sending limits exceeded, scheduled, cancelled', JOB_STATUS_TYPES),
# bad statuses are accepted, just return no data
('foo', [])
])
def test_get_jobs_can_filter_on_statuses(
notify_db,
notify_db_session,
client,
sample_service,
statuses_filter,
expected_statuses
):
create_job(notify_db, notify_db_session, job_status='pending')
create_job(notify_db, notify_db_session, job_status='in progress')
create_job(notify_db, notify_db_session, job_status='finished')
create_job(notify_db, notify_db_session, job_status='sending limits exceeded')
create_job(notify_db, notify_db_session, job_status='scheduled')
create_job(notify_db, notify_db_session, job_status='cancelled')
path = '/service/{}/job'.format(sample_service.id)
response = client.get(
path,
headers=[create_authorization_header()],
query_string={'statuses': statuses_filter}
)
assert response.status_code == 200
resp_json = json.loads(response.get_data(as_text=True))
from pprint import pprint
pprint(resp_json)
assert {x['job_status'] for x in resp_json['data']} == set(expected_statuses)
def create_10_jobs(db, session, service, template):
with freeze_time('2015-01-01T00:00:00') as the_time:
for _ in range(10):