mirror of
https://github.com/GSA/notifications-api.git
synced 2026-01-31 06:52:06 -05:00
@@ -53,7 +53,6 @@ from tests.app.conftest import (
|
||||
sample_job as create_sample_job,
|
||||
sample_email_template as create_sample_email_template,
|
||||
sample_notification as create_sample_notification,
|
||||
sample_letter_job
|
||||
)
|
||||
from tests.app.db import (
|
||||
create_inbound_sms,
|
||||
@@ -683,7 +682,7 @@ def test_should_update_job_to_sent_to_dvla_in_research_mode_for_letter_job(
|
||||
mocker.patch('app.celery.tasks.save_letter.apply_async')
|
||||
mocker.patch('app.celery.tasks.create_uuid', return_value=fake_uuid)
|
||||
mocker.patch('app.celery.tasks.encryption.encrypt', return_value=test_encrypted_data)
|
||||
mock_dvla_file_task = mocker.patch('app.celery.tasks.build_dvla_file.apply_async')
|
||||
mocker.patch('app.celery.tasks.build_dvla_file.apply_async')
|
||||
|
||||
process_job(sample_letter_job.id)
|
||||
|
||||
@@ -1071,7 +1070,7 @@ def test_build_dvla_file(sample_letter_template, mocker):
|
||||
create_notification(template=job.template, job=job)
|
||||
create_notification(template=job.template, job=job)
|
||||
mocked_upload = mocker.patch("app.celery.tasks.s3upload")
|
||||
mocked_send_task = mocker.patch("app.celery.tasks.notify_celery.send_task")
|
||||
mocker.patch("app.celery.tasks.notify_celery.send_task")
|
||||
mocker.patch("app.celery.tasks.LetterDVLATemplate", return_value='dvla|string')
|
||||
build_dvla_file(job.id)
|
||||
|
||||
@@ -1211,17 +1210,15 @@ def test_send_inbound_sms_to_service_retries_if_request_returns_500(notify_api,
|
||||
status_code=500)
|
||||
send_inbound_sms_to_service(inbound_sms.id, inbound_sms.service_id)
|
||||
|
||||
exc_msg = 'Unable to send_inbound_sms_to_service for service_id: {} and url: {url}'.format(
|
||||
sample_service.id,
|
||||
url=inbound_api.url
|
||||
)
|
||||
assert mocked.call_count == 1
|
||||
assert mocked.call_args[1]['queue'] == 'retry-tasks'
|
||||
|
||||
|
||||
def test_send_inbound_sms_to_service_retries_if_request_throws_unknown(notify_api, sample_service, mocker):
|
||||
inbound_api = create_service_inbound_api(service=sample_service, url="https://some.service.gov.uk/",
|
||||
bearer_token="something_unique")
|
||||
create_service_inbound_api(
|
||||
service=sample_service,
|
||||
url="https://some.service.gov.uk/",
|
||||
bearer_token="something_unique")
|
||||
inbound_sms = create_inbound_sms(service=sample_service, notify_number="0751421", user_number="447700900111",
|
||||
provider_date=datetime(2017, 6, 20), content="Here is some content")
|
||||
|
||||
@@ -1230,10 +1227,6 @@ def test_send_inbound_sms_to_service_retries_if_request_throws_unknown(notify_ap
|
||||
|
||||
send_inbound_sms_to_service(inbound_sms.id, inbound_sms.service_id)
|
||||
|
||||
exc_msg = 'Unable to send_inbound_sms_to_service for service_id: {} and url: {url}'.format(
|
||||
sample_service.id,
|
||||
url=inbound_api.url
|
||||
)
|
||||
assert mocked.call_count == 1
|
||||
assert mocked.call_args[1]['queue'] == 'retry-tasks'
|
||||
|
||||
@@ -1255,15 +1248,19 @@ def test_send_inbound_sms_to_service_does_not_retries_if_request_returns_404(not
|
||||
|
||||
|
||||
def test_check_job_status_task_does_not_raise_error(sample_template):
|
||||
job = create_job(template=sample_template, notification_count=3,
|
||||
created_at=datetime.utcnow() - timedelta(hours=2),
|
||||
scheduled_for=datetime.utcnow() - timedelta(minutes=31),
|
||||
processing_started=datetime.utcnow() - timedelta(minutes=31),
|
||||
job_status=JOB_STATUS_FINISHED)
|
||||
job_2 = create_job(template=sample_template, notification_count=3,
|
||||
created_at=datetime.utcnow() - timedelta(minutes=31),
|
||||
processing_started=datetime.utcnow() - timedelta(minutes=31),
|
||||
job_status=JOB_STATUS_FINISHED)
|
||||
create_job(
|
||||
template=sample_template,
|
||||
notification_count=3,
|
||||
created_at=datetime.utcnow() - timedelta(hours=2),
|
||||
scheduled_for=datetime.utcnow() - timedelta(minutes=31),
|
||||
processing_started=datetime.utcnow() - timedelta(minutes=31),
|
||||
job_status=JOB_STATUS_FINISHED)
|
||||
create_job(
|
||||
template=sample_template,
|
||||
notification_count=3,
|
||||
created_at=datetime.utcnow() - timedelta(minutes=31),
|
||||
processing_started=datetime.utcnow() - timedelta(minutes=31),
|
||||
job_status=JOB_STATUS_FINISHED)
|
||||
|
||||
check_job_status()
|
||||
|
||||
@@ -1406,7 +1403,7 @@ def test_process_incomplete_job_no_job_in_database(mocker, fake_uuid):
|
||||
mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_sms'))
|
||||
mock_save_sms = mocker.patch('app.celery.tasks.save_sms.apply_async')
|
||||
|
||||
with pytest.raises(expected_exception=Exception) as e:
|
||||
with pytest.raises(expected_exception=Exception):
|
||||
process_incomplete_job(fake_uuid)
|
||||
|
||||
assert mock_save_sms.call_count == 0 # There is no job in the db it will not have been called
|
||||
|
||||
Reference in New Issue
Block a user