More stuff done.

Signed-off-by: Cliff Hill <Clifford.hill@gsa.gov>
This commit is contained in:
Cliff Hill
2024-02-13 16:21:37 -05:00
parent 3624cd812b
commit 64adb09182
2 changed files with 81 additions and 81 deletions

View File

@@ -206,10 +206,10 @@ def test_get_inbound_sms_by_id_with_invalid_service_id_returns_404(
@pytest.mark.parametrize(
"page_given, expected_rows, has_next_link", [(True, 10, False), (False, 50, True)]
"page_given, expected_rows, has_next_link", [(True, 10, False), (False, 50, True)],
)
def test_get_most_recent_inbound_sms_for_service(
admin_request, page_given, sample_service, expected_rows, has_next_link
admin_request, page_given, sample_service, expected_rows, has_next_link,
):
for i in range(60):
create_inbound_sms(
@@ -229,13 +229,13 @@ def test_get_most_recent_inbound_sms_for_service(
@freeze_time("Monday 10th April 2017 12:00")
def test_get_most_recent_inbound_sms_for_service_respects_data_retention(
admin_request, sample_service
admin_request, sample_service,
):
create_service_data_retention(sample_service, "sms", 5)
create_service_data_retention(sample_service, NotificationType.SMS, 5)
for i in range(10):
created = datetime.utcnow() - timedelta(days=i)
create_inbound_sms(
sample_service, user_number="44770090000{}".format(i), created_at=created
sample_service, user_number="44770090000{}".format(i), created_at=created,
)
response = admin_request.get(
@@ -258,7 +258,7 @@ def test_get_most_recent_inbound_sms_for_service_respects_data_retention(
def test_get_most_recent_inbound_sms_for_service_respects_data_retention_if_older_than_a_week(
admin_request, sample_service
):
create_service_data_retention(sample_service, "sms", 14)
create_service_data_retention(sample_service, NotificationType.SMS, 14)
create_inbound_sms(sample_service, created_at=datetime(2017, 4, 1, 12, 0))
response = admin_request.get(
@@ -274,7 +274,7 @@ def test_get_most_recent_inbound_sms_for_service_respects_data_retention_if_olde
def test_get_inbound_sms_for_service_respects_data_retention(
admin_request, sample_service
):
create_service_data_retention(sample_service, "sms", 5)
create_service_data_retention(sample_service, NotificationType.SMS, 5)
for i in range(10):
created = datetime.utcnow() - timedelta(days=i)
create_inbound_sms(

View File

@@ -9,7 +9,7 @@ from freezegun import freeze_time
import app.celery.tasks
from app.dao.templates_dao import dao_update_template
from app.enums import JobStatus
from app.enums import JobStatus, KeyType, NotificationStatus, NotificationType, TemplateType
from tests import create_admin_authorization_header
from tests.app.db import (
create_ft_notification_status,
@@ -60,7 +60,7 @@ def test_cancel_job(client, sample_scheduled_job):
assert response.status_code == 200
resp_json = json.loads(response.get_data(as_text=True))
assert resp_json["data"]["id"] == job_id
assert resp_json["data"]["job_status"] == "cancelled"
assert resp_json["data"]["job_status"] == JobStatus.CANCELLED
def test_cant_cancel_normal_job(client, sample_job, mocker):
@@ -104,9 +104,9 @@ def test_create_unscheduled_job(client, sample_template, mocker, fake_uuid):
assert resp_json["data"]["id"] == fake_uuid
assert resp_json["data"]["statistics"] == []
assert resp_json["data"]["job_status"] == "pending"
assert resp_json["data"]["job_status"] == JobStatus.PENDING
assert not resp_json["data"]["scheduled_for"]
assert resp_json["data"]["job_status"] == "pending"
assert resp_json["data"]["job_status"] == JobStatus.PENDING
assert resp_json["data"]["template"] == str(sample_template.id)
assert resp_json["data"]["original_file_name"] == "thisisatest.csv"
assert resp_json["data"]["notification_count"] == 1
@@ -138,7 +138,7 @@ def test_create_unscheduled_job_with_sender_id_in_metadata(
assert response.status_code == 201
app.celery.tasks.process_job.apply_async.assert_called_once_with(
([str(fake_uuid)]), {"sender_id": fake_uuid}, queue="job-tasks"
([str(fake_uuid)]), {"sender_id": fake_uuid}, queue="job-tasks",
)
@@ -176,7 +176,7 @@ def test_create_scheduled_job(client, sample_template, mocker, fake_uuid):
resp_json["data"]["scheduled_for"]
== datetime(2016, 1, 5, 11, 59, 0, tzinfo=pytz.UTC).isoformat()
)
assert resp_json["data"]["job_status"] == "scheduled"
assert resp_json["data"]["job_status"] == JobStatus.SCHEDULED
assert resp_json["data"]["template"] == str(sample_template.id)
assert resp_json["data"]["original_file_name"] == "thisisatest.csv"
assert resp_json["data"]["notification_count"] == 1
@@ -479,19 +479,19 @@ def test_get_all_notifications_for_job_in_order_of_job_number(
@pytest.mark.parametrize(
"expected_notification_count, status_args",
[
(1, ["created"]),
(0, ["sending"]),
(1, ["created", "sending"]),
(0, ["sending", "delivered"]),
(1, [NotificationStatus.CREATED]),
(0, [NotificationStatus.SENDING]),
(1, [NotificationStatus.CREATED, NotificationStatus.SENDING]),
(0, [NotificationStatus.SENDING, NotificationStatus.DELIVERED]),
],
)
def test_get_all_notifications_for_job_filtered_by_status(
admin_request, sample_job, expected_notification_count, status_args, mocker
admin_request, sample_job, expected_notification_count, status_args, mocker,
):
mock_s3 = mocker.patch("app.job.rest.get_phone_number_from_s3")
mock_s3.return_value = "15555555555"
create_notification(job=sample_job, to_field="1", status="created")
create_notification(job=sample_job, to_field="1", status=NotificationStatus.CREATED)
resp = admin_request.get(
"job.get_all_notifications_for_service_job",
@@ -578,29 +578,29 @@ def test_get_job_by_id_should_return_summed_statistics(admin_request, sample_job
job_id = str(sample_job.id)
service_id = sample_job.service.id
create_notification(job=sample_job, status="created")
create_notification(job=sample_job, status="created")
create_notification(job=sample_job, status="created")
create_notification(job=sample_job, status="sending")
create_notification(job=sample_job, status="failed")
create_notification(job=sample_job, status="failed")
create_notification(job=sample_job, status="failed")
create_notification(job=sample_job, status="technical-failure")
create_notification(job=sample_job, status="temporary-failure")
create_notification(job=sample_job, status="temporary-failure")
create_notification(job=sample_job, status=NotificationStatus.CREATED)
create_notification(job=sample_job, status=NotificationStatus.CREATED)
create_notification(job=sample_job, status=NotificationStatus.CREATED)
create_notification(job=sample_job, status=NotificationStatus.SENDING)
create_notification(job=sample_job, status=NotificationStatus.FAILED)
create_notification(job=sample_job, status=NotificationStatus.FAILED)
create_notification(job=sample_job, status=NotificationStatus.FAILED)
create_notification(job=sample_job, status=NotificationStatus.TECHNICAL_FAILURE)
create_notification(job=sample_job, status=NotificationStatus.TEMPORARY_FAILURE)
create_notification(job=sample_job, status=NotificationStatus.TEMPORARY_FAILURE)
resp_json = admin_request.get(
"job.get_job_by_service_and_job_id", service_id=service_id, job_id=job_id
"job.get_job_by_service_and_job_id", service_id=service_id, job_id=job_id,
)
assert resp_json["data"]["id"] == job_id
assert {"status": "created", "count": 3} in resp_json["data"]["statistics"]
assert {"status": "sending", "count": 1} in resp_json["data"]["statistics"]
assert {"status": "failed", "count": 3} in resp_json["data"]["statistics"]
assert {"status": "technical-failure", "count": 1} in resp_json["data"][
assert {"status": NotificationStatus.CREATED, "count": 3,} in resp_json["data"]["statistics"]
assert {"status": NotificationStatus.SENDING, "count": 1,} in resp_json["data"]["statistics"]
assert {"status": NotificationStatus.FAILED, "count": 3,} in resp_json["data"]["statistics"]
assert {"status": NotificationStatus.TECHNICAL_FAILURE, "count": 1,} in resp_json["data"][
"statistics"
]
assert {"status": "temporary-failure", "count": 2} in resp_json["data"][
assert {"status": NotificationStatus.TEMPORARY_FAILURE, "count": 2,} in resp_json["data"][
"statistics"
]
assert resp_json["data"]["created_by"]["name"] == "Test User"
@@ -613,26 +613,26 @@ def test_get_job_by_id_with_stats_for_old_job_where_notifications_have_been_purg
sample_template,
notification_count=10,
created_at=datetime.utcnow() - timedelta(days=9),
job_status="finished",
job_status=JobStatus.FINISHED,
)
def __create_ft_status(job, status, count):
create_ft_notification_status(
local_date=job.created_at.date(),
notification_type="sms",
notification_type=NotificationType.SMS,
service=job.service,
job=job,
template=job.template,
key_type="normal",
key_type=KeyType.NORMAL,
notification_status=status,
count=count,
)
__create_ft_status(old_job, "created", 3)
__create_ft_status(old_job, "sending", 1)
__create_ft_status(old_job, "failed", 3)
__create_ft_status(old_job, "technical-failure", 1)
__create_ft_status(old_job, "temporary-failure", 2)
__create_ft_status(old_job, NotificationStatus.CREATED, 3)
__create_ft_status(old_job, NotificationStatus.SENDING, 1)
__create_ft_status(old_job, NotificationStatus.FAILED, 3)
__create_ft_status(old_job, NotificationStatus.TECHNICAL_FAILURE, 1)
__create_ft_status(old_job, NotificationStatus.TEMPORARY_FAILURE, 2)
resp_json = admin_request.get(
"job.get_job_by_service_and_job_id",
@@ -641,13 +641,13 @@ def test_get_job_by_id_with_stats_for_old_job_where_notifications_have_been_purg
)
assert resp_json["data"]["id"] == str(old_job.id)
assert {"status": "created", "count": 3} in resp_json["data"]["statistics"]
assert {"status": "sending", "count": 1} in resp_json["data"]["statistics"]
assert {"status": "failed", "count": 3} in resp_json["data"]["statistics"]
assert {"status": "technical-failure", "count": 1} in resp_json["data"][
assert {"status": NotificationStatus.CREATED, "count": 3,} in resp_json["data"]["statistics"]
assert {"status": NotificationStatus.SENDING, "count": 1,} in resp_json["data"]["statistics"]
assert {"status": NotificationStatus.FAILED, "count": 3,} in resp_json["data"]["statistics"]
assert {"status": NotificationStatus.TECHNICAL_FAILURE, "count": 1,} in resp_json["data"][
"statistics"
]
assert {"status": "temporary-failure", "count": 2} in resp_json["data"][
assert {"status": NotificationStatus.TEMPORARY_FAILURE, "count": 2,} in resp_json["data"][
"statistics"
]
assert resp_json["data"]["created_by"]["name"] == "Test User"
@@ -669,7 +669,7 @@ def test_get_jobs(admin_request, sample_template):
"name": "Test User",
},
"id": ANY,
"job_status": "pending",
"job_status": JobStatus.PENDING,
"notification_count": 1,
"original_file_name": "some.csv",
"processing_finished": None,
@@ -680,7 +680,7 @@ def test_get_jobs(admin_request, sample_template):
"statistics": [],
"template": str(sample_template.id),
"template_name": sample_template.name,
"template_type": "sms",
"template_type": TemplateType.SMS,
"template_version": 1,
"updated_at": None,
}
@@ -710,12 +710,12 @@ def test_get_jobs_should_return_statistics(admin_request, sample_template):
earlier = datetime.utcnow() - timedelta(days=1)
job_1 = create_job(sample_template, processing_started=earlier)
job_2 = create_job(sample_template, processing_started=now)
create_notification(job=job_1, status="created")
create_notification(job=job_1, status="created")
create_notification(job=job_1, status="created")
create_notification(job=job_2, status="sending")
create_notification(job=job_2, status="sending")
create_notification(job=job_2, status="sending")
create_notification(job=job_1, status=NotificationStatus.CREATED)
create_notification(job=job_1, status=NotificationStatus.CREATED)
create_notification(job=job_1, status=NotificationStatus.CREATED)
create_notification(job=job_2, status=NotificationStatus.SENDING)
create_notification(job=job_2, status=NotificationStatus.SENDING)
create_notification(job=job_2, status=NotificationStatus.SENDING)
resp_json = admin_request.get(
"job.get_jobs_by_service", service_id=sample_template.service_id
@@ -723,13 +723,13 @@ def test_get_jobs_should_return_statistics(admin_request, sample_template):
assert len(resp_json["data"]) == 2
assert resp_json["data"][0]["id"] == str(job_2.id)
assert {"status": "sending", "count": 3} in resp_json["data"][0]["statistics"]
assert {"status": NotificationStatus.SENDING, "count": 3,} in resp_json["data"][0]["statistics"]
assert resp_json["data"][1]["id"] == str(job_1.id)
assert {"status": "created", "count": 3} in resp_json["data"][1]["statistics"]
assert {"status": NotificationStatus.CREATED, "count": 3,} in resp_json["data"][1]["statistics"]
def test_get_jobs_should_return_no_stats_if_no_rows_in_notifications(
admin_request, sample_template
admin_request, sample_template,
):
now = datetime.utcnow()
earlier = datetime.utcnow() - timedelta(days=1)
@@ -795,15 +795,15 @@ def test_get_jobs_accepts_page_parameter(admin_request, sample_template):
def test_get_jobs_can_filter_on_statuses(
admin_request, sample_template, statuses_filter, expected_statuses
):
create_job(sample_template, job_status="pending")
create_job(sample_template, job_status="in progress")
create_job(sample_template, job_status="finished")
create_job(sample_template, job_status="sending limits exceeded")
create_job(sample_template, job_status="scheduled")
create_job(sample_template, job_status="cancelled")
create_job(sample_template, job_status="ready to send")
create_job(sample_template, job_status="sent to dvla")
create_job(sample_template, job_status="error")
create_job(sample_template, job_status=JobStatus.PENDING)
create_job(sample_template, job_status=JobStatus.IN_PROGRESS)
create_job(sample_template, job_status=JobStatus.FINISHED)
create_job(sample_template, job_status=JobStatus.SENDING_LIMITS_EXCEEDED)
create_job(sample_template, job_status=JobStatus.SCHEDULED)
create_job(sample_template, job_status=JobStatus.CANCELLED)
create_job(sample_template, job_status=JobStatus.READY_TO_SEND)
create_job(sample_template, job_status=JobStatus.SENT_TO_DVLA)
create_job(sample_template, job_status=JobStatus.ERROR)
resp_json = admin_request.get(
"job.get_jobs_by_service",
@@ -876,32 +876,32 @@ def test_get_jobs_should_retrieve_from_ft_notification_status_for_old_jobs(
# some notifications created more than three days ago, some created after the midnight cutoff
create_ft_notification_status(
date(2017, 6, 6), job=job_1, notification_status="delivered", count=2
date(2017, 6, 6), job=job_1, notification_status=NotificationStatus.DELIVERED, count=2,
)
create_ft_notification_status(
date(2017, 6, 7), job=job_1, notification_status="delivered", count=4
date(2017, 6, 7), job=job_1, notification_status=NotificationStatus.DELIVERED, count=4,
)
# job2's new enough
create_notification(
job=job_2, status="created", created_at=not_quite_three_days_ago
job=job_2, status=NotificationStatus.CREATED, created_at=not_quite_three_days_ago,
)
# this isn't picked up because the job is too new
create_ft_notification_status(
date(2017, 6, 7), job=job_2, notification_status="delivered", count=8
date(2017, 6, 7), job=job_2, notification_status=NotificationStatus.DELIVERED, count=8,
)
# this isn't picked up - while the job is old, it started in last 3 days so we look at notification table instead
create_ft_notification_status(
date(2017, 6, 7), job=job_3, notification_status="delivered", count=16
date(2017, 6, 7), job=job_3, notification_status=NotificationStatus.DELIVERED, count=16,
)
# this isn't picked up because we're using the ft status table for job_1 as it's old
create_notification(
job=job_1, status="created", created_at=not_quite_three_days_ago
job=job_1, status=NotificationStatus.CREATED, created_at=not_quite_three_days_ago,
)
resp_json = admin_request.get(
"job.get_jobs_by_service", service_id=sample_template.service_id
"job.get_jobs_by_service", service_id=sample_template.service_id,
)
assert resp_json["data"][0]["id"] == str(job_3.id)
@@ -935,26 +935,26 @@ def test_get_scheduled_job_stats(admin_request):
# Shouldnt be counted wrong status
create_job(
service_1_template, job_status="finished", scheduled_for="2017-07-17 07:00"
service_1_template, job_status=JobStatus.FINISHED, scheduled_for="2017-07-17 07:00",
)
create_job(
service_1_template, job_status="in progress", scheduled_for="2017-07-17 08:00"
service_1_template, job_status=JobStatus.IN_PROGRESS, scheduled_for="2017-07-17 08:00",
)
# Should be counted service 1
create_job(
service_1_template, job_status="scheduled", scheduled_for="2017-07-17 09:00"
service_1_template, job_status=JobStatus.SCHEDULED, scheduled_for="2017-07-17 09:00",
)
create_job(
service_1_template, job_status="scheduled", scheduled_for="2017-07-17 10:00"
service_1_template, job_status=JobStatus.SCHEDULED, scheduled_for="2017-07-17 10:00",
)
create_job(
service_1_template, job_status="scheduled", scheduled_for="2017-07-17 11:00"
service_1_template, job_status=JobStatus.SCHEDULED, scheduled_for="2017-07-17 11:00",
)
# Should be counted service 2
create_job(
service_2_template, job_status="scheduled", scheduled_for="2017-07-17 11:00"
service_2_template, job_status=JobStatus.SCHEDULED, scheduled_for="2017-07-17 11:00",
)
assert admin_request.get(