notify-api-412 use black to enforce python style standards

This commit is contained in:
Kenneth Kehl
2023-08-23 10:35:43 -07:00
parent a7898118d7
commit 026dc14021
586 changed files with 33990 additions and 23461 deletions

View File

@@ -30,228 +30,273 @@ from tests.app import load_example_csv
from tests.app.db import create_job, create_notification, create_template
def test_should_call_delete_codes_on_delete_verify_codes_task(notify_db_session, mocker):
mocker.patch('app.celery.scheduled_tasks.delete_codes_older_created_more_than_a_day_ago')
def test_should_call_delete_codes_on_delete_verify_codes_task(
notify_db_session, mocker
):
mocker.patch(
"app.celery.scheduled_tasks.delete_codes_older_created_more_than_a_day_ago"
)
delete_verify_codes()
assert scheduled_tasks.delete_codes_older_created_more_than_a_day_ago.call_count == 1
assert (
scheduled_tasks.delete_codes_older_created_more_than_a_day_ago.call_count == 1
)
def test_should_call_delete_invotations_on_delete_invitations_task(notify_db_session, mocker):
mocker.patch('app.celery.scheduled_tasks.delete_invitations_created_more_than_two_days_ago')
def test_should_call_delete_invotations_on_delete_invitations_task(
notify_db_session, mocker
):
mocker.patch(
"app.celery.scheduled_tasks.delete_invitations_created_more_than_two_days_ago"
)
delete_invitations()
assert scheduled_tasks.delete_invitations_created_more_than_two_days_ago.call_count == 1
assert (
scheduled_tasks.delete_invitations_created_more_than_two_days_ago.call_count
== 1
)
def test_should_update_scheduled_jobs_and_put_on_queue(mocker, sample_template):
mocked = mocker.patch('app.celery.tasks.process_job.apply_async')
mocked = mocker.patch("app.celery.tasks.process_job.apply_async")
one_minute_in_the_past = datetime.utcnow() - timedelta(minutes=1)
job = create_job(sample_template, job_status='scheduled', scheduled_for=one_minute_in_the_past)
job = create_job(
sample_template, job_status="scheduled", scheduled_for=one_minute_in_the_past
)
run_scheduled_jobs()
updated_job = dao_get_job_by_id(job.id)
assert updated_job.job_status == 'pending'
assert updated_job.job_status == "pending"
mocked.assert_called_with([str(job.id)], queue="job-tasks")
def test_should_update_all_scheduled_jobs_and_put_on_queue(sample_template, mocker):
mocked = mocker.patch('app.celery.tasks.process_job.apply_async')
mocked = mocker.patch("app.celery.tasks.process_job.apply_async")
one_minute_in_the_past = datetime.utcnow() - timedelta(minutes=1)
ten_minutes_in_the_past = datetime.utcnow() - timedelta(minutes=10)
twenty_minutes_in_the_past = datetime.utcnow() - timedelta(minutes=20)
job_1 = create_job(sample_template, job_status='scheduled', scheduled_for=one_minute_in_the_past)
job_2 = create_job(sample_template, job_status='scheduled', scheduled_for=ten_minutes_in_the_past)
job_3 = create_job(sample_template, job_status='scheduled', scheduled_for=twenty_minutes_in_the_past)
job_1 = create_job(
sample_template, job_status="scheduled", scheduled_for=one_minute_in_the_past
)
job_2 = create_job(
sample_template, job_status="scheduled", scheduled_for=ten_minutes_in_the_past
)
job_3 = create_job(
sample_template,
job_status="scheduled",
scheduled_for=twenty_minutes_in_the_past,
)
run_scheduled_jobs()
assert dao_get_job_by_id(job_1.id).job_status == 'pending'
assert dao_get_job_by_id(job_2.id).job_status == 'pending'
assert dao_get_job_by_id(job_2.id).job_status == 'pending'
assert dao_get_job_by_id(job_1.id).job_status == "pending"
assert dao_get_job_by_id(job_2.id).job_status == "pending"
assert dao_get_job_by_id(job_2.id).job_status == "pending"
mocked.assert_has_calls([
call([str(job_3.id)], queue="job-tasks"),
call([str(job_2.id)], queue="job-tasks"),
call([str(job_1.id)], queue="job-tasks")
])
mocked.assert_has_calls(
[
call([str(job_3.id)], queue="job-tasks"),
call([str(job_2.id)], queue="job-tasks"),
call([str(job_1.id)], queue="job-tasks"),
]
)
def test_check_job_status_task_calls_process_incomplete_jobs(mocker, sample_template):
mock_celery = mocker.patch('app.celery.tasks.process_incomplete_jobs.apply_async')
job = create_job(template=sample_template, notification_count=3,
created_at=datetime.utcnow() - timedelta(minutes=31),
processing_started=datetime.utcnow() - timedelta(minutes=31),
job_status=JOB_STATUS_IN_PROGRESS)
mock_celery = mocker.patch("app.celery.tasks.process_incomplete_jobs.apply_async")
job = create_job(
template=sample_template,
notification_count=3,
created_at=datetime.utcnow() - timedelta(minutes=31),
processing_started=datetime.utcnow() - timedelta(minutes=31),
job_status=JOB_STATUS_IN_PROGRESS,
)
create_notification(template=sample_template, job=job)
check_job_status()
mock_celery.assert_called_once_with(
[[str(job.id)]],
queue=QueueNames.JOBS
)
mock_celery.assert_called_once_with([[str(job.id)]], queue=QueueNames.JOBS)
def test_check_job_status_task_calls_process_incomplete_jobs_when_scheduled_job_is_not_complete(
mocker, sample_template
):
mock_celery = mocker.patch('app.celery.tasks.process_incomplete_jobs.apply_async')
job = create_job(template=sample_template, notification_count=3,
created_at=datetime.utcnow() - timedelta(hours=2),
scheduled_for=datetime.utcnow() - timedelta(minutes=31),
processing_started=datetime.utcnow() - timedelta(minutes=31),
job_status=JOB_STATUS_IN_PROGRESS)
mock_celery = mocker.patch("app.celery.tasks.process_incomplete_jobs.apply_async")
job = create_job(
template=sample_template,
notification_count=3,
created_at=datetime.utcnow() - timedelta(hours=2),
scheduled_for=datetime.utcnow() - timedelta(minutes=31),
processing_started=datetime.utcnow() - timedelta(minutes=31),
job_status=JOB_STATUS_IN_PROGRESS,
)
check_job_status()
mock_celery.assert_called_once_with(
[[str(job.id)]],
queue=QueueNames.JOBS
)
mock_celery.assert_called_once_with([[str(job.id)]], queue=QueueNames.JOBS)
def test_check_job_status_task_calls_process_incomplete_jobs_for_pending_scheduled_jobs(
mocker, sample_template
):
mock_celery = mocker.patch('app.celery.tasks.process_incomplete_jobs.apply_async')
job = create_job(template=sample_template, notification_count=3,
created_at=datetime.utcnow() - timedelta(hours=2),
scheduled_for=datetime.utcnow() - timedelta(minutes=31),
job_status=JOB_STATUS_PENDING)
mock_celery = mocker.patch("app.celery.tasks.process_incomplete_jobs.apply_async")
job = create_job(
template=sample_template,
notification_count=3,
created_at=datetime.utcnow() - timedelta(hours=2),
scheduled_for=datetime.utcnow() - timedelta(minutes=31),
job_status=JOB_STATUS_PENDING,
)
check_job_status()
mock_celery.assert_called_once_with(
[[str(job.id)]],
queue=QueueNames.JOBS
)
mock_celery.assert_called_once_with([[str(job.id)]], queue=QueueNames.JOBS)
def test_check_job_status_task_does_not_call_process_incomplete_jobs_for_non_scheduled_pending_jobs(
mocker,
sample_template,
):
mock_celery = mocker.patch('app.celery.tasks.process_incomplete_jobs.apply_async')
mock_celery = mocker.patch("app.celery.tasks.process_incomplete_jobs.apply_async")
create_job(
template=sample_template,
notification_count=3,
created_at=datetime.utcnow() - timedelta(hours=2),
job_status=JOB_STATUS_PENDING
job_status=JOB_STATUS_PENDING,
)
check_job_status()
assert not mock_celery.called
def test_check_job_status_task_calls_process_incomplete_jobs_for_multiple_jobs(mocker, sample_template):
mock_celery = mocker.patch('app.celery.tasks.process_incomplete_jobs.apply_async')
job = create_job(template=sample_template, notification_count=3,
created_at=datetime.utcnow() - timedelta(hours=2),
scheduled_for=datetime.utcnow() - timedelta(minutes=31),
processing_started=datetime.utcnow() - timedelta(minutes=31),
job_status=JOB_STATUS_IN_PROGRESS)
job_2 = create_job(template=sample_template, notification_count=3,
created_at=datetime.utcnow() - timedelta(hours=2),
scheduled_for=datetime.utcnow() - timedelta(minutes=31),
processing_started=datetime.utcnow() - timedelta(minutes=31),
job_status=JOB_STATUS_IN_PROGRESS)
check_job_status()
mock_celery.assert_called_once_with(
[[str(job.id), str(job_2.id)]],
queue=QueueNames.JOBS
)
def test_check_job_status_task_only_sends_old_tasks(mocker, sample_template):
mock_celery = mocker.patch('app.celery.tasks.process_incomplete_jobs.apply_async')
def test_check_job_status_task_calls_process_incomplete_jobs_for_multiple_jobs(
mocker, sample_template
):
mock_celery = mocker.patch("app.celery.tasks.process_incomplete_jobs.apply_async")
job = create_job(
template=sample_template,
notification_count=3,
created_at=datetime.utcnow() - timedelta(hours=2),
scheduled_for=datetime.utcnow() - timedelta(minutes=31),
processing_started=datetime.utcnow() - timedelta(minutes=31),
job_status=JOB_STATUS_IN_PROGRESS
job_status=JOB_STATUS_IN_PROGRESS,
)
job_2 = create_job(
template=sample_template,
notification_count=3,
created_at=datetime.utcnow() - timedelta(hours=2),
scheduled_for=datetime.utcnow() - timedelta(minutes=31),
processing_started=datetime.utcnow() - timedelta(minutes=31),
job_status=JOB_STATUS_IN_PROGRESS,
)
check_job_status()
mock_celery.assert_called_once_with(
[[str(job.id), str(job_2.id)]], queue=QueueNames.JOBS
)
def test_check_job_status_task_only_sends_old_tasks(mocker, sample_template):
mock_celery = mocker.patch("app.celery.tasks.process_incomplete_jobs.apply_async")
job = create_job(
template=sample_template,
notification_count=3,
created_at=datetime.utcnow() - timedelta(hours=2),
scheduled_for=datetime.utcnow() - timedelta(minutes=31),
processing_started=datetime.utcnow() - timedelta(minutes=31),
job_status=JOB_STATUS_IN_PROGRESS,
)
create_job(
template=sample_template,
notification_count=3,
created_at=datetime.utcnow() - timedelta(minutes=31),
processing_started=datetime.utcnow() - timedelta(minutes=29),
job_status=JOB_STATUS_IN_PROGRESS
job_status=JOB_STATUS_IN_PROGRESS,
)
create_job(
template=sample_template,
notification_count=3,
created_at=datetime.utcnow() - timedelta(minutes=50),
scheduled_for=datetime.utcnow() - timedelta(minutes=29),
job_status=JOB_STATUS_PENDING
job_status=JOB_STATUS_PENDING,
)
check_job_status()
# jobs 2 and 3 were created less than 30 minutes ago, so are not sent to Celery task
mock_celery.assert_called_once_with(
[[str(job.id)]],
queue=QueueNames.JOBS
)
mock_celery.assert_called_once_with([[str(job.id)]], queue=QueueNames.JOBS)
def test_check_job_status_task_sets_jobs_to_error(mocker, sample_template):
mock_celery = mocker.patch('app.celery.tasks.process_incomplete_jobs.apply_async')
mock_celery = mocker.patch("app.celery.tasks.process_incomplete_jobs.apply_async")
job = create_job(
template=sample_template,
notification_count=3,
created_at=datetime.utcnow() - timedelta(hours=2),
scheduled_for=datetime.utcnow() - timedelta(minutes=31),
processing_started=datetime.utcnow() - timedelta(minutes=31),
job_status=JOB_STATUS_IN_PROGRESS
job_status=JOB_STATUS_IN_PROGRESS,
)
job_2 = create_job(
template=sample_template,
notification_count=3,
created_at=datetime.utcnow() - timedelta(minutes=31),
processing_started=datetime.utcnow() - timedelta(minutes=29),
job_status=JOB_STATUS_IN_PROGRESS
job_status=JOB_STATUS_IN_PROGRESS,
)
check_job_status()
# job 2 not in celery task
mock_celery.assert_called_once_with(
[[str(job.id)]],
queue=QueueNames.JOBS
)
mock_celery.assert_called_once_with([[str(job.id)]], queue=QueueNames.JOBS)
assert job.job_status == JOB_STATUS_ERROR
assert job_2.job_status == JOB_STATUS_IN_PROGRESS
def test_replay_created_notifications(notify_db_session, sample_service, mocker):
email_delivery_queue = mocker.patch('app.celery.provider_tasks.deliver_email.apply_async')
sms_delivery_queue = mocker.patch('app.celery.provider_tasks.deliver_sms.apply_async')
email_delivery_queue = mocker.patch(
"app.celery.provider_tasks.deliver_email.apply_async"
)
sms_delivery_queue = mocker.patch(
"app.celery.provider_tasks.deliver_sms.apply_async"
)
sms_template = create_template(service=sample_service, template_type='sms')
email_template = create_template(service=sample_service, template_type='email')
sms_template = create_template(service=sample_service, template_type="sms")
email_template = create_template(service=sample_service, template_type="email")
older_than = (60 * 60) + (60 * 15) # 1 hour 15 minutes
# notifications expected to be resent
old_sms = create_notification(template=sms_template, created_at=datetime.utcnow() - timedelta(seconds=older_than),
status='created')
old_email = create_notification(template=email_template,
created_at=datetime.utcnow() - timedelta(seconds=older_than),
status='created')
old_sms = create_notification(
template=sms_template,
created_at=datetime.utcnow() - timedelta(seconds=older_than),
status="created",
)
old_email = create_notification(
template=email_template,
created_at=datetime.utcnow() - timedelta(seconds=older_than),
status="created",
)
# notifications that are not to be resent
create_notification(template=sms_template, created_at=datetime.utcnow() - timedelta(seconds=older_than),
status='sending')
create_notification(template=email_template, created_at=datetime.utcnow() - timedelta(seconds=older_than),
status='delivered')
create_notification(template=sms_template, created_at=datetime.utcnow(),
status='created')
create_notification(template=email_template, created_at=datetime.utcnow(),
status='created')
create_notification(
template=sms_template,
created_at=datetime.utcnow() - timedelta(seconds=older_than),
status="sending",
)
create_notification(
template=email_template,
created_at=datetime.utcnow() - timedelta(seconds=older_than),
status="delivered",
)
create_notification(
template=sms_template, created_at=datetime.utcnow(), status="created"
)
create_notification(
template=email_template, created_at=datetime.utcnow(), status="created"
)
replay_created_notifications()
email_delivery_queue.assert_called_once_with([str(old_email.id)],
queue='send-email-tasks')
sms_delivery_queue.assert_called_once_with([str(old_sms.id)],
queue="send-sms-tasks")
email_delivery_queue.assert_called_once_with(
[str(old_email.id)], queue="send-email-tasks"
)
sms_delivery_queue.assert_called_once_with(
[str(old_sms.id)], queue="send-sms-tasks"
)
def test_check_job_status_task_does_not_raise_error(sample_template):
@@ -261,32 +306,39 @@ def test_check_job_status_task_does_not_raise_error(sample_template):
created_at=datetime.utcnow() - timedelta(hours=2),
scheduled_for=datetime.utcnow() - timedelta(minutes=31),
processing_started=datetime.utcnow() - timedelta(minutes=31),
job_status=JOB_STATUS_FINISHED)
job_status=JOB_STATUS_FINISHED,
)
create_job(
template=sample_template,
notification_count=3,
created_at=datetime.utcnow() - timedelta(minutes=31),
processing_started=datetime.utcnow() - timedelta(minutes=31),
job_status=JOB_STATUS_FINISHED)
job_status=JOB_STATUS_FINISHED,
)
check_job_status()
@pytest.mark.parametrize('offset', (
timedelta(days=1),
pytest.param(timedelta(hours=23, minutes=59), marks=pytest.mark.xfail),
pytest.param(timedelta(minutes=20), marks=pytest.mark.xfail),
timedelta(minutes=19),
))
@pytest.mark.parametrize(
"offset",
(
timedelta(days=1),
pytest.param(timedelta(hours=23, minutes=59), marks=pytest.mark.xfail),
pytest.param(timedelta(minutes=20), marks=pytest.mark.xfail),
timedelta(minutes=19),
),
)
def test_check_for_missing_rows_in_completed_jobs_ignores_old_and_new_jobs(
mocker,
sample_email_template,
offset,
):
mocker.patch('app.celery.tasks.s3.get_job_and_metadata_from_s3',
return_value=(load_example_csv('multiple_email'), {"sender_id": None}))
mocker.patch('app.encryption.encrypt', return_value="something_encrypted")
process_row = mocker.patch('app.celery.scheduled_tasks.process_row')
mocker.patch(
"app.celery.tasks.s3.get_job_and_metadata_from_s3",
return_value=(load_example_csv("multiple_email"), {"sender_id": None}),
)
mocker.patch("app.encryption.encrypt", return_value="something_encrypted")
process_row = mocker.patch("app.celery.scheduled_tasks.process_row")
job = create_job(
template=sample_email_template,
@@ -303,15 +355,19 @@ def test_check_for_missing_rows_in_completed_jobs_ignores_old_and_new_jobs(
def test_check_for_missing_rows_in_completed_jobs(mocker, sample_email_template):
mocker.patch('app.celery.tasks.s3.get_job_and_metadata_from_s3',
return_value=(load_example_csv('multiple_email'), {"sender_id": None}))
mocker.patch('app.encryption.encrypt', return_value="something_encrypted")
process_row = mocker.patch('app.celery.scheduled_tasks.process_row')
mocker.patch(
"app.celery.tasks.s3.get_job_and_metadata_from_s3",
return_value=(load_example_csv("multiple_email"), {"sender_id": None}),
)
mocker.patch("app.encryption.encrypt", return_value="something_encrypted")
process_row = mocker.patch("app.celery.scheduled_tasks.process_row")
job = create_job(template=sample_email_template,
notification_count=5,
job_status=JOB_STATUS_FINISHED,
processing_finished=datetime.utcnow() - timedelta(minutes=20))
job = create_job(
template=sample_email_template,
notification_count=5,
job_status=JOB_STATUS_FINISHED,
processing_finished=datetime.utcnow() - timedelta(minutes=20),
)
for i in range(0, 4):
create_notification(job=job, job_row_number=i)
@@ -322,17 +378,23 @@ def test_check_for_missing_rows_in_completed_jobs(mocker, sample_email_template)
)
def test_check_for_missing_rows_in_completed_jobs_calls_save_email(mocker, sample_email_template):
mocker.patch('app.celery.tasks.s3.get_job_and_metadata_from_s3',
return_value=(load_example_csv('multiple_email'), {'sender_id': None}))
save_email_task = mocker.patch('app.celery.tasks.save_email.apply_async')
mocker.patch('app.encryption.encrypt', return_value="something_encrypted")
mocker.patch('app.celery.tasks.create_uuid', return_value='uuid')
def test_check_for_missing_rows_in_completed_jobs_calls_save_email(
mocker, sample_email_template
):
mocker.patch(
"app.celery.tasks.s3.get_job_and_metadata_from_s3",
return_value=(load_example_csv("multiple_email"), {"sender_id": None}),
)
save_email_task = mocker.patch("app.celery.tasks.save_email.apply_async")
mocker.patch("app.encryption.encrypt", return_value="something_encrypted")
mocker.patch("app.celery.tasks.create_uuid", return_value="uuid")
job = create_job(template=sample_email_template,
notification_count=5,
job_status=JOB_STATUS_FINISHED,
processing_finished=datetime.utcnow() - timedelta(minutes=20))
job = create_job(
template=sample_email_template,
notification_count=5,
job_status=JOB_STATUS_FINISHED,
processing_finished=datetime.utcnow() - timedelta(minutes=20),
)
for i in range(0, 4):
create_notification(job=job, job_row_number=i)
@@ -344,19 +406,25 @@ def test_check_for_missing_rows_in_completed_jobs_calls_save_email(mocker, sampl
"something_encrypted",
),
{},
queue="database-tasks"
queue="database-tasks",
)
def test_check_for_missing_rows_in_completed_jobs_uses_sender_id(mocker, sample_email_template, fake_uuid):
mocker.patch('app.celery.tasks.s3.get_job_and_metadata_from_s3',
return_value=(load_example_csv('multiple_email'), {'sender_id': fake_uuid}))
mock_process_row = mocker.patch('app.celery.scheduled_tasks.process_row')
def test_check_for_missing_rows_in_completed_jobs_uses_sender_id(
mocker, sample_email_template, fake_uuid
):
mocker.patch(
"app.celery.tasks.s3.get_job_and_metadata_from_s3",
return_value=(load_example_csv("multiple_email"), {"sender_id": fake_uuid}),
)
mock_process_row = mocker.patch("app.celery.scheduled_tasks.process_row")
job = create_job(template=sample_email_template,
notification_count=5,
job_status=JOB_STATUS_FINISHED,
processing_finished=datetime.utcnow() - timedelta(minutes=20))
job = create_job(
template=sample_email_template,
notification_count=5,
job_status=JOB_STATUS_FINISHED,
processing_finished=datetime.utcnow() - timedelta(minutes=20),
)
for i in range(0, 4):
create_notification(job=job, job_row_number=i)
@@ -367,53 +435,58 @@ def test_check_for_missing_rows_in_completed_jobs_uses_sender_id(mocker, sample_
MockServicesSendingToTVNumbers = namedtuple(
'ServicesSendingToTVNumbers',
"ServicesSendingToTVNumbers",
[
'service_id',
'notification_count',
]
"service_id",
"notification_count",
],
)
MockServicesWithHighFailureRate = namedtuple(
'ServicesWithHighFailureRate',
"ServicesWithHighFailureRate",
[
'service_id',
'permanent_failure_rate',
]
"service_id",
"permanent_failure_rate",
],
)
@pytest.mark.parametrize("failure_rates, sms_to_tv_numbers, expected_message", [
@pytest.mark.parametrize(
"failure_rates, sms_to_tv_numbers, expected_message",
[
[MockServicesWithHighFailureRate("123", 0.3)],
[],
"1 service(s) have had high permanent-failure rates for sms messages in last "
"24 hours:\nservice: {}/services/{} failure rate: 0.3,\n".format(
Test.ADMIN_BASE_URL, "123"
)
[
[MockServicesWithHighFailureRate("123", 0.3)],
[],
"1 service(s) have had high permanent-failure rates for sms messages in last "
"24 hours:\nservice: {}/services/{} failure rate: 0.3,\n".format(
Test.ADMIN_BASE_URL, "123"
),
],
[
[],
[MockServicesSendingToTVNumbers("123", 300)],
"1 service(s) have sent over 500 sms messages to tv numbers in last 24 hours:\n"
"service: {}/services/{} count of sms to tv numbers: 300,\n".format(
Test.ADMIN_BASE_URL, "123"
),
],
],
[
[],
[MockServicesSendingToTVNumbers("123", 300)],
"1 service(s) have sent over 500 sms messages to tv numbers in last 24 hours:\n"
"service: {}/services/{} count of sms to tv numbers: 300,\n".format(
Test.ADMIN_BASE_URL, "123"
)
]
])
)
def test_check_for_services_with_high_failure_rates_or_sending_to_tv_numbers(
mocker, notify_db_session, failure_rates, sms_to_tv_numbers, expected_message
):
mock_logger = mocker.patch('app.celery.tasks.current_app.logger.warning')
mock_create_ticket = mocker.spy(NotifySupportTicket, '__init__')
mock_logger = mocker.patch("app.celery.tasks.current_app.logger.warning")
mock_create_ticket = mocker.spy(NotifySupportTicket, "__init__")
mock_send_ticket_to_zendesk = mocker.patch(
'app.celery.scheduled_tasks.zendesk_client.send_ticket_to_zendesk',
"app.celery.scheduled_tasks.zendesk_client.send_ticket_to_zendesk",
autospec=True,
)
mock_failure_rates = mocker.patch(
'app.celery.scheduled_tasks.dao_find_services_with_high_failure_rates', return_value=failure_rates
"app.celery.scheduled_tasks.dao_find_services_with_high_failure_rates",
return_value=failure_rates,
)
mock_sms_to_tv_numbers = mocker.patch(
'app.celery.scheduled_tasks.dao_find_services_sending_to_tv_numbers', return_value=sms_to_tv_numbers
"app.celery.scheduled_tasks.dao_find_services_sending_to_tv_numbers",
return_value=sms_to_tv_numbers,
)
zendesk_actions = "\nYou can find instructions for this ticket in our manual:\nhttps://github.com/alphagov/notifications-manuals/wiki/Support-Runbook#Deal-with-services-with-high-failure-rates-or-sending-sms-to-tv-numbers" # noqa
@@ -427,7 +500,7 @@ def test_check_for_services_with_high_failure_rates_or_sending_to_tv_numbers(
ANY,
message=expected_message + zendesk_actions,
subject="[test] High failure rates for sms spotted for services",
ticket_type='incident',
technical_ticket=True
ticket_type="incident",
technical_ticket=True,
)
mock_send_ticket_to_zendesk.assert_called_once()