mirror of
https://github.com/GSA/notifications-admin.git
synced 2026-02-08 20:33:47 -05:00
473 lines
14 KiB
Python
473 lines
14 KiB
Python
from collections import OrderedDict
|
||
from csv import DictReader
|
||
from io import StringIO
|
||
from pathlib import Path
|
||
|
||
import pytest
|
||
from freezegun import freeze_time
|
||
from tests.conftest import fake_uuid
|
||
|
||
from app.utils import (
|
||
AgreementInfo,
|
||
Spreadsheet,
|
||
email_safe,
|
||
generate_next_dict,
|
||
generate_notifications_csv,
|
||
generate_previous_dict,
|
||
get_cdn_domain,
|
||
get_letter_timings,
|
||
)
|
||
|
||
|
||
def _get_notifications_csv(
|
||
service_id,
|
||
page=1,
|
||
row_number=1,
|
||
recipient='foo@bar.com',
|
||
template_name='foo',
|
||
template_type='sms',
|
||
job_name='bar.csv',
|
||
status='Delivered',
|
||
created_at='Thursday 19 April at 12:00',
|
||
rows=1,
|
||
with_links=False,
|
||
job_id=fake_uuid
|
||
):
|
||
links = {}
|
||
if with_links:
|
||
links = {
|
||
'prev': '/service/{}/notifications?page=0'.format(service_id),
|
||
'next': '/service/{}/notifications?page=1'.format(service_id),
|
||
'last': '/service/{}/notifications?page=2'.format(service_id)
|
||
}
|
||
|
||
data = {
|
||
'notifications': [{
|
||
"row_number": row_number + i,
|
||
"to": recipient,
|
||
"recipient": recipient,
|
||
"template_name": template_name,
|
||
"template_type": template_type,
|
||
"template": {"name": template_name, "template_type": template_type},
|
||
"job_name": job_name,
|
||
"status": status,
|
||
"created_at": created_at,
|
||
"updated_at": None
|
||
} for i in range(rows)],
|
||
'total': rows,
|
||
'page_size': 50,
|
||
'links': links
|
||
}
|
||
return data
|
||
|
||
|
||
@pytest.fixture(scope='function')
|
||
def _get_notifications_csv_mock(
|
||
mocker,
|
||
api_user_active,
|
||
job_id=fake_uuid
|
||
):
|
||
return mocker.patch(
|
||
'app.notification_api_client.get_notifications_for_service',
|
||
side_effect=_get_notifications_csv
|
||
)
|
||
|
||
|
||
@pytest.mark.parametrize('service_name, safe_email', [
|
||
('name with spaces', 'name.with.spaces'),
|
||
('singleword', 'singleword'),
|
||
('UPPER CASE', 'upper.case'),
|
||
('Service - with dash', 'service.with.dash'),
|
||
('lots of spaces', 'lots.of.spaces'),
|
||
('name.with.dots', 'name.with.dots'),
|
||
('name-with-other-delimiters', 'namewithotherdelimiters'),
|
||
('.leading', 'leading'),
|
||
('trailing.', 'trailing'),
|
||
('üńïçödë wördś', 'unicode.words'),
|
||
])
|
||
def test_email_safe_return_dot_separated_email_domain(service_name, safe_email):
|
||
assert email_safe(service_name) == safe_email
|
||
|
||
|
||
def test_generate_previous_dict(client):
|
||
ret = generate_previous_dict('main.view_jobs', 'foo', 2, {})
|
||
assert 'page=1' in ret['url']
|
||
assert ret['title'] == 'Previous page'
|
||
assert ret['label'] == 'page 1'
|
||
|
||
|
||
def test_generate_next_dict(client):
|
||
ret = generate_next_dict('main.view_jobs', 'foo', 2, {})
|
||
assert 'page=3' in ret['url']
|
||
assert ret['title'] == 'Next page'
|
||
assert ret['label'] == 'page 3'
|
||
|
||
|
||
def test_generate_previous_next_dict_adds_other_url_args(client):
|
||
ret = generate_next_dict('main.view_notifications', 'foo', 2, {'message_type': 'blah'})
|
||
assert 'notifications/blah' in ret['url']
|
||
|
||
|
||
def test_can_create_spreadsheet_from_large_excel_file():
|
||
with open(str(Path.cwd() / 'tests' / 'spreadsheet_files' / 'excel 2007.xlsx'), 'rb') as xl:
|
||
ret = Spreadsheet.from_file(xl, filename='xl.xlsx')
|
||
assert ret.as_csv_data
|
||
|
||
|
||
def test_can_create_spreadsheet_from_dict():
|
||
assert Spreadsheet.from_dict(OrderedDict(
|
||
foo='bar',
|
||
name='Jane',
|
||
)).as_csv_data == (
|
||
"foo,name\r\n"
|
||
"bar,Jane\r\n"
|
||
)
|
||
|
||
|
||
def test_can_create_spreadsheet_from_dict_with_filename():
|
||
assert Spreadsheet.from_dict({}, filename='empty.csv').as_dict['file_name'] == "empty.csv"
|
||
|
||
|
||
@pytest.mark.parametrize('original_file_contents, expected_column_headers, expected_1st_row', [
|
||
(
|
||
"""
|
||
phone_number
|
||
07700900123
|
||
""",
|
||
['Row number', 'phone_number', 'Template', 'Type', 'Job', 'Status', 'Time'],
|
||
['1', '07700900123', 'foo', 'sms', 'bar.csv', 'Delivered', 'Thursday 19 April at 12:00'],
|
||
),
|
||
(
|
||
"""
|
||
phone_number, a, b, c
|
||
07700900123, 🐜,🐝,🦀
|
||
""",
|
||
['Row number', 'phone_number', 'a', 'b', 'c', 'Template', 'Type', 'Job', 'Status', 'Time'],
|
||
['1', '07700900123', '🐜', '🐝', '🦀', 'foo', 'sms', 'bar.csv', 'Delivered', 'Thursday 19 April at 12:00'],
|
||
),
|
||
(
|
||
"""
|
||
"phone_number", "a", "b", "c"
|
||
"07700900123","🐜,🐜","🐝,🐝","🦀"
|
||
""",
|
||
['Row number', 'phone_number', 'a', 'b', 'c', 'Template', 'Type', 'Job', 'Status', 'Time'],
|
||
['1', '07700900123', '🐜,🐜', '🐝,🐝', '🦀', 'foo', 'sms', 'bar.csv', 'Delivered', 'Thursday 19 April at 12:00'],
|
||
),
|
||
])
|
||
def test_generate_notifications_csv_returns_correct_csv_file(
|
||
app_,
|
||
mocker,
|
||
_get_notifications_csv_mock,
|
||
original_file_contents,
|
||
expected_column_headers,
|
||
expected_1st_row,
|
||
):
|
||
mocker.patch(
|
||
'app.main.s3_client.s3download',
|
||
return_value=original_file_contents,
|
||
)
|
||
csv_content = generate_notifications_csv(service_id='1234', job_id=fake_uuid, template_type='sms')
|
||
csv_file = DictReader(StringIO('\n'.join(csv_content)))
|
||
assert csv_file.fieldnames == expected_column_headers
|
||
assert next(csv_file) == dict(zip(expected_column_headers, expected_1st_row))
|
||
|
||
|
||
def test_generate_notifications_csv_only_calls_once_if_no_next_link(
|
||
app_,
|
||
_get_notifications_csv_mock,
|
||
):
|
||
list(generate_notifications_csv(service_id='1234'))
|
||
|
||
assert _get_notifications_csv_mock.call_count == 1
|
||
|
||
|
||
@pytest.mark.parametrize("job_id", ["some", None])
|
||
def test_generate_notifications_csv_calls_twice_if_next_link(
|
||
app_,
|
||
mocker,
|
||
job_id,
|
||
):
|
||
|
||
mocker.patch(
|
||
'app.main.s3_client.s3download',
|
||
return_value="""
|
||
phone_number
|
||
07700900000
|
||
07700900001
|
||
07700900002
|
||
07700900003
|
||
07700900004
|
||
07700900005
|
||
07700900006
|
||
07700900007
|
||
07700900008
|
||
07700900009
|
||
"""
|
||
)
|
||
|
||
service_id = '1234'
|
||
response_with_links = _get_notifications_csv(service_id, rows=7, with_links=True)
|
||
response_with_no_links = _get_notifications_csv(service_id, rows=3, row_number=8, with_links=False)
|
||
|
||
mock_get_notifications = mocker.patch(
|
||
'app.notification_api_client.get_notifications_for_service',
|
||
side_effect=[
|
||
response_with_links,
|
||
response_with_no_links,
|
||
]
|
||
)
|
||
|
||
csv_content = generate_notifications_csv(
|
||
service_id=service_id,
|
||
job_id=job_id or fake_uuid,
|
||
template_type='sms',
|
||
)
|
||
csv = list(DictReader(StringIO('\n'.join(csv_content))))
|
||
|
||
assert len(csv) == 10
|
||
assert csv[0]['phone_number'] == '07700900000'
|
||
assert csv[9]['phone_number'] == '07700900009'
|
||
assert mock_get_notifications.call_count == 2
|
||
# mock_calls[0][2] is the kwargs from first call
|
||
assert mock_get_notifications.mock_calls[0][2]['page'] == 1
|
||
assert mock_get_notifications.mock_calls[1][2]['page'] == 2
|
||
|
||
|
||
@freeze_time('2017-07-14 14:59:59') # Friday, before print deadline
|
||
@pytest.mark.parametrize('upload_time, expected_print_time, is_printed, expected_earliest, expected_latest', [
|
||
|
||
# BST
|
||
# ==================================================================
|
||
# First thing Monday
|
||
(
|
||
'2017-07-10 00:00:01',
|
||
'Tuesday 15:00',
|
||
True,
|
||
'Thursday 2017-07-13',
|
||
'Friday 2017-07-14'
|
||
),
|
||
# Monday at 16:59 BST
|
||
(
|
||
'2017-07-10 15:59:59',
|
||
'Tuesday 15:00',
|
||
True,
|
||
'Thursday 2017-07-13',
|
||
'Friday 2017-07-14'
|
||
),
|
||
# Monday at 17:00 BST
|
||
(
|
||
'2017-07-10 16:00:01',
|
||
'Wednesday 15:00',
|
||
True,
|
||
'Friday 2017-07-14',
|
||
'Saturday 2017-07-15'
|
||
),
|
||
# Tuesday before 17:00 BST
|
||
(
|
||
'2017-07-11 12:00:00',
|
||
'Wednesday 15:00',
|
||
True,
|
||
'Friday 2017-07-14',
|
||
'Saturday 2017-07-15'
|
||
),
|
||
# Wednesday before 17:00 BST
|
||
(
|
||
'2017-07-12 12:00:00',
|
||
'Thursday 15:00',
|
||
True,
|
||
'Saturday 2017-07-15',
|
||
'Monday 2017-07-17'
|
||
),
|
||
# Thursday before 17:00 BST
|
||
(
|
||
'2017-07-13 12:00:00',
|
||
'Friday 15:00',
|
||
True, # WRONG
|
||
'Monday 2017-07-17',
|
||
'Tuesday 2017-07-18'
|
||
),
|
||
# Friday anytime
|
||
(
|
||
'2017-07-14 00:00:00',
|
||
'Monday 15:00',
|
||
False,
|
||
'Wednesday 2017-07-19',
|
||
'Thursday 2017-07-20'
|
||
),
|
||
(
|
||
'2017-07-14 12:00:00',
|
||
'Monday 15:00',
|
||
False,
|
||
'Wednesday 2017-07-19',
|
||
'Thursday 2017-07-20'
|
||
),
|
||
(
|
||
'2017-07-14 22:00:00',
|
||
'Monday 15:00',
|
||
False,
|
||
'Wednesday 2017-07-19',
|
||
'Thursday 2017-07-20'
|
||
),
|
||
# Saturday anytime
|
||
(
|
||
'2017-07-14 12:00:00',
|
||
'Monday 15:00',
|
||
False,
|
||
'Wednesday 2017-07-19',
|
||
'Thursday 2017-07-20'
|
||
),
|
||
# Sunday before 1700 BST
|
||
(
|
||
'2017-07-15 15:59:59',
|
||
'Monday 15:00',
|
||
False,
|
||
'Wednesday 2017-07-19',
|
||
'Thursday 2017-07-20'
|
||
),
|
||
# Sunday after 17:00 BST
|
||
(
|
||
'2017-07-16 16:00:01',
|
||
'Tuesday 15:00',
|
||
False,
|
||
'Thursday 2017-07-20',
|
||
'Friday 2017-07-21'
|
||
),
|
||
|
||
# GMT
|
||
# ==================================================================
|
||
# Monday at 16:59 GMT
|
||
(
|
||
'2017-01-02 16:59:59',
|
||
'Tuesday 15:00',
|
||
True,
|
||
'Thursday 2017-01-05',
|
||
'Friday 2017-01-06',
|
||
),
|
||
# Monday at 17:00 GMT
|
||
(
|
||
'2017-01-02 17:00:01',
|
||
'Wednesday 15:00',
|
||
True,
|
||
'Friday 2017-01-06',
|
||
'Saturday 2017-01-07',
|
||
),
|
||
|
||
])
|
||
def test_get_estimated_delivery_date_for_letter(
|
||
upload_time,
|
||
expected_print_time,
|
||
is_printed,
|
||
expected_earliest,
|
||
expected_latest,
|
||
):
|
||
timings = get_letter_timings(upload_time)
|
||
assert timings.printed_by.strftime('%A %H:%M') == expected_print_time
|
||
assert timings.is_printed == is_printed
|
||
assert timings.earliest_delivery.strftime('%A %Y-%m-%d') == expected_earliest
|
||
assert timings.latest_delivery.strftime('%A %Y-%m-%d') == expected_latest
|
||
|
||
|
||
def test_get_cdn_domain_on_localhost(client, mocker):
|
||
mocker.patch.dict('app.current_app.config', values={'ADMIN_BASE_URL': 'http://localhost:6012'})
|
||
domain = get_cdn_domain()
|
||
assert domain == 'static-logos.notify.tools'
|
||
|
||
|
||
def test_get_cdn_domain_on_non_localhost(client, mocker):
|
||
mocker.patch.dict('app.current_app.config', values={'ADMIN_BASE_URL': 'https://some.admintest.com'})
|
||
domain = get_cdn_domain()
|
||
assert domain == 'static-logos.admintest.com'
|
||
|
||
|
||
@pytest.mark.parametrize("domain_or_email_address", (
|
||
"test@dclgdatamart.co.uk", "test@communities.gsi.gov.uk", "test@communities.gov.uk",
|
||
))
|
||
def test_get_valid_agreement_info_known_details(domain_or_email_address):
|
||
agreement_info = AgreementInfo(domain_or_email_address)
|
||
assert agreement_info.crown_status is None
|
||
assert agreement_info.owner == "Ministry of Housing, Communities & Local Government"
|
||
assert agreement_info.agreement_signed is True
|
||
assert agreement_info.as_human_readable == (
|
||
'Yes, on behalf of Ministry of Housing, Communities & Local Government'
|
||
)
|
||
|
||
|
||
@pytest.mark.parametrize("domain_or_email_address", (
|
||
"test@police.gov.uk", "police.gov.uk",
|
||
))
|
||
def test_get_valid_agreement_info_unknown_details(domain_or_email_address):
|
||
government_domain = AgreementInfo(domain_or_email_address)
|
||
assert government_domain.crown_status is None
|
||
assert government_domain.owner is None
|
||
assert government_domain.agreement_signed is None
|
||
assert government_domain.as_human_readable == 'Can’t tell'
|
||
|
||
|
||
def test_get_valid_agreement_info_only_org_known():
|
||
agreement_info = AgreementInfo('nhs.net')
|
||
# Some parts of the NHS are Crown, some aren’t
|
||
assert agreement_info.crown_status is None
|
||
assert agreement_info.owner == 'NHS'
|
||
assert agreement_info.agreement_signed is None
|
||
assert agreement_info.as_human_readable == 'Can’t tell (organisation is NHS, crown status unknown)'
|
||
|
||
|
||
def test_get_valid_agreement_info_some_known_details():
|
||
agreement_info = AgreementInfo("marinemanagement.org.uk")
|
||
assert agreement_info.crown_status is None
|
||
assert agreement_info.owner == "Marine Management Organisation"
|
||
assert agreement_info.agreement_signed is True
|
||
assert agreement_info.as_human_readable == (
|
||
'Yes, on behalf of Marine Management Organisation'
|
||
)
|
||
|
||
|
||
def test_get_valid_local_agreement_info_some_known_details():
|
||
agreement_info = AgreementInfo("aberdeenshire.gov.uk")
|
||
assert agreement_info.crown_status is False
|
||
assert agreement_info.owner == "Aberdeenshire Council"
|
||
assert agreement_info.agreement_signed is False
|
||
assert agreement_info.as_human_readable == (
|
||
'No (organisation is Aberdeenshire Council, a non-crown body)'
|
||
)
|
||
|
||
|
||
def test_get_valid_government_domain_gets_most_specific_first():
|
||
|
||
generic = AgreementInfo("gov.uk")
|
||
assert generic.crown_status is None
|
||
assert generic.owner is None
|
||
assert generic.agreement_signed is None
|
||
assert generic.as_human_readable == (
|
||
'Can’t tell'
|
||
)
|
||
|
||
specific = AgreementInfo("dacorum.gov.uk")
|
||
assert specific.crown_status is False
|
||
assert specific.owner == 'Dacorum Borough Council'
|
||
assert specific.agreement_signed is True
|
||
assert specific.as_human_readable == (
|
||
'Yes, on behalf of Dacorum Borough Council'
|
||
)
|
||
|
||
|
||
def test_validate_government_domain_data():
|
||
|
||
for domain in AgreementInfo.domains.keys():
|
||
|
||
agreement_info = AgreementInfo(domain)
|
||
|
||
assert agreement_info.crown_status in {
|
||
True, False, None
|
||
}
|
||
|
||
assert (
|
||
agreement_info.owner is None
|
||
) or (
|
||
isinstance(agreement_info.owner, str)
|
||
)
|
||
|
||
assert agreement_info.agreement_signed in {
|
||
True, False, None
|
||
}
|