Merge pull request #347 from alphagov/get-jobs-created-in-last-7-days

Add a limit days query param for get all jobs.
This commit is contained in:
Rebecca Law
2016-05-25 11:36:31 +01:00
6 changed files with 97 additions and 11 deletions

View File

@@ -1,4 +1,4 @@
from datetime import datetime
from datetime import (datetime, timedelta)
import uuid
from app.dao.jobs_dao import (
@@ -66,6 +66,46 @@ def test_get_jobs_for_service(notify_db, notify_db_session, sample_template):
assert one_job_from_db != other_job_from_db
def test_get_jobs_for_service_with_limit_days_param(notify_db, notify_db_session, sample_template):
from tests.app.conftest import sample_job as create_job
one_job = create_job(notify_db, notify_db_session, sample_template.service, sample_template)
old_job = create_job(notify_db, notify_db_session, sample_template.service, sample_template,
created_at=datetime.now() - timedelta(days=8))
jobs = dao_get_jobs_by_service_id(one_job.service_id)
assert len(jobs) == 2
assert one_job in jobs
assert old_job in jobs
jobs_limit_days = dao_get_jobs_by_service_id(one_job.service_id, limit_days=7)
assert len(jobs_limit_days) == 1
assert one_job in jobs_limit_days
assert old_job not in jobs_limit_days
def test_get_jobs_for_service_with_limit_days_edge_case(notify_db, notify_db_session, sample_template):
from tests.app.conftest import sample_job as create_job
one_job = create_job(notify_db, notify_db_session, sample_template.service, sample_template)
job_two = create_job(notify_db, notify_db_session, sample_template.service, sample_template,
created_at=(datetime.now() - timedelta(days=7)).date())
one_second_after_midnight = datetime.combine((datetime.now() - timedelta(days=7)).date(),
datetime.strptime("000001", "%H%M%S").time())
just_after_midnight_job = create_job(notify_db, notify_db_session, sample_template.service, sample_template,
created_at=one_second_after_midnight)
job_eight_days_old = create_job(notify_db, notify_db_session, sample_template.service, sample_template,
created_at=datetime.now() - timedelta(days=8))
jobs_limit_days = dao_get_jobs_by_service_id(one_job.service_id, limit_days=7)
assert len(jobs_limit_days) == 3
assert one_job in jobs_limit_days
assert job_two in jobs_limit_days
assert just_after_midnight_job in jobs_limit_days
assert job_eight_days_old not in jobs_limit_days
def test_get_jobs_for_service_in_created_at_order(notify_db, notify_db_session, sample_template):
from tests.app.conftest import sample_job as create_job

View File

@@ -1,5 +1,7 @@
import json
import uuid
from datetime import datetime, timedelta
import app.celery.tasks
from tests import create_authorization_header
@@ -22,6 +24,32 @@ def test_get_jobs(notify_api, notify_db, notify_db_session, sample_template):
assert len(resp_json['data']) == 5
def test_get_jobs_with_limit_days(notify_api, notify_db, notify_db_session, sample_template):
create_job(
notify_db,
notify_db_session,
service=sample_template.service,
template=sample_template,
)
create_job(
notify_db,
notify_db_session,
service=sample_template.service,
template=sample_template,
created_at=datetime.now() - timedelta(days=7))
service_id = sample_template.service.id
with notify_api.test_request_context():
with notify_api.test_client() as client:
path = '/service/{}/job'.format(service_id)
auth_header = create_authorization_header(service_id=service_id)
response = client.get(path, headers=[auth_header], query_string={'limit_days': 5})
assert response.status_code == 200
resp_json = json.loads(response.get_data(as_text=True))
assert len(resp_json['data']) == 1
def test_get_job_with_invalid_service_id_returns404(notify_api, sample_api_key, sample_service):
with notify_api.test_request_context():
with notify_api.test_client() as client: