notify-api-332 rename organisation

This commit is contained in:
Kenneth Kehl
2023-07-10 11:06:29 -07:00
parent 8be3864d7a
commit 4940d5e93b
42 changed files with 1301 additions and 1274 deletions

View File

@@ -0,0 +1,267 @@
import uuid
import pytest
from flask import current_app, json
from freezegun import freeze_time
from notifications_utils.url_safe_token import generate_token
from app.models import INVITE_PENDING, Notification
from tests import create_admin_authorization_header
from tests.app.db import create_invited_org_user
@pytest.mark.parametrize('platform_admin, expected_invited_by', (
(True, 'The GOV.UK Notify team'),
(False, 'Test User')
))
@pytest.mark.parametrize('extra_args, expected_start_of_invite_url', [
(
{},
'http://localhost:6012/organization-invitation/'
),
(
{'invite_link_host': 'https://www.example.com'},
'https://www.example.com/organization-invitation/'
),
])
def test_create_invited_org_user(
admin_request,
sample_organization,
sample_user,
mocker,
org_invite_email_template,
extra_args,
expected_start_of_invite_url,
platform_admin,
expected_invited_by,
):
mocked = mocker.patch('app.celery.provider_tasks.deliver_email.apply_async')
email_address = 'invited_user@example.com'
sample_user.platform_admin = platform_admin
data = dict(
organization=str(sample_organization.id),
email_address=email_address,
invited_by=str(sample_user.id),
**extra_args
)
json_resp = admin_request.post(
'organization_invite.invite_user_to_org',
organization_id=sample_organization.id,
_data=data,
_expected_status=201
)
assert json_resp['data']['organization'] == str(sample_organization.id)
assert json_resp['data']['email_address'] == email_address
assert json_resp['data']['invited_by'] == str(sample_user.id)
assert json_resp['data']['status'] == INVITE_PENDING
assert json_resp['data']['id']
notification = Notification.query.first()
assert notification.reply_to_text == sample_user.email_address
assert len(notification.personalisation.keys()) == 3
assert notification.personalisation['organization_name'] == 'sample organization'
assert notification.personalisation['user_name'] == expected_invited_by
assert notification.personalisation['url'].startswith(expected_start_of_invite_url)
assert len(notification.personalisation['url']) > len(expected_start_of_invite_url)
mocked.assert_called_once_with([(str(notification.id))], queue="notify-internal-tasks")
def test_create_invited_user_invalid_email(admin_request, sample_organization, sample_user, mocker):
mocked = mocker.patch('app.celery.provider_tasks.deliver_email.apply_async')
email_address = 'notanemail'
data = {
'service': str(sample_organization.id),
'email_address': email_address,
'invited_by': str(sample_user.id),
}
json_resp = admin_request.post(
'organization_invite.invite_user_to_org',
organization_id=sample_organization.id,
_data=data,
_expected_status=400
)
assert json_resp['errors'][0]['message'] == 'email_address Not a valid email address'
assert mocked.call_count == 0
def test_get_all_invited_users_by_service(admin_request, sample_organization, sample_user):
for i in range(5):
create_invited_org_user(
sample_organization,
sample_user,
email_address='invited_user_{}@service.gov.uk'.format(i)
)
json_resp = admin_request.get(
'organization_invite.get_invited_org_users_by_organization',
organization_id=sample_organization.id
)
assert len(json_resp['data']) == 5
for invite in json_resp['data']:
assert invite['organization'] == str(sample_organization.id)
assert invite['invited_by'] == str(sample_user.id)
assert invite['id']
def test_get_invited_users_by_service_with_no_invites(admin_request, sample_organization):
json_resp = admin_request.get(
'organization_invite.get_invited_org_users_by_organization',
organization_id=sample_organization.id
)
assert len(json_resp['data']) == 0
def test_get_invited_user_by_organization(admin_request, sample_invited_org_user):
json_resp = admin_request.get(
'organization_invite.get_invited_org_user_by_organization',
organization_id=sample_invited_org_user.organization.id,
invited_org_user_id=sample_invited_org_user.id
)
assert json_resp['data']['email_address'] == sample_invited_org_user.email_address
def test_get_invited_user_by_organization_when_user_does_not_belong_to_the_org(
admin_request,
sample_invited_org_user,
fake_uuid,
):
json_resp = admin_request.get(
'organization_invite.get_invited_org_user_by_organization',
organization_id=fake_uuid,
invited_org_user_id=sample_invited_org_user.id,
_expected_status=404
)
assert json_resp['result'] == 'error'
def test_update_org_invited_user_set_status_to_cancelled(admin_request, sample_invited_org_user):
data = {'status': 'cancelled'}
json_resp = admin_request.post(
'organization_invite.update_org_invite_status',
organization_id=sample_invited_org_user.organization_id,
invited_org_user_id=sample_invited_org_user.id,
_data=data
)
assert json_resp['data']['status'] == 'cancelled'
def test_update_org_invited_user_for_wrong_service_returns_404(admin_request, sample_invited_org_user, fake_uuid):
data = {'status': 'cancelled'}
json_resp = admin_request.post(
'organization_invite.update_org_invite_status',
organization_id=fake_uuid,
invited_org_user_id=sample_invited_org_user.id,
_data=data,
_expected_status=404
)
assert json_resp['message'] == 'No result found'
def test_update_org_invited_user_for_invalid_data_returns_400(admin_request, sample_invited_org_user):
data = {'status': 'garbage'}
json_resp = admin_request.post(
'organization_invite.update_org_invite_status',
organization_id=sample_invited_org_user.organization_id,
invited_org_user_id=sample_invited_org_user.id,
_data=data,
_expected_status=400
)
assert len(json_resp['errors']) == 1
assert json_resp['errors'][0]['message'] == 'status garbage is not one of [pending, accepted, cancelled]'
@pytest.mark.parametrize('endpoint_format_str', [
'/invite/organization/{}',
'/invite/organization/check/{}',
])
def test_validate_invitation_token_returns_200_when_token_valid(client, sample_invited_org_user, endpoint_format_str):
token = generate_token(str(sample_invited_org_user.id), current_app.config['SECRET_KEY'],
current_app.config['DANGEROUS_SALT'])
url = endpoint_format_str.format(token)
auth_header = create_admin_authorization_header()
response = client.get(url, headers=[('Content-Type', 'application/json'), auth_header])
assert response.status_code == 200
json_resp = json.loads(response.get_data(as_text=True))
assert json_resp['data'] == sample_invited_org_user.serialize()
def test_validate_invitation_token_for_expired_token_returns_400(client):
with freeze_time('2016-01-01T12:00:00'):
token = generate_token(str(uuid.uuid4()), current_app.config['SECRET_KEY'],
current_app.config['DANGEROUS_SALT'])
url = '/invite/organization/{}'.format(token)
auth_header = create_admin_authorization_header()
response = client.get(url, headers=[('Content-Type', 'application/json'), auth_header])
assert response.status_code == 400
json_resp = json.loads(response.get_data(as_text=True))
assert json_resp['result'] == 'error'
assert json_resp['message'] == {
'invitation': 'Your invitation to GOV.UK Notify has expired. '
'Please ask the person that invited you to send you another one'}
def test_validate_invitation_token_returns_400_when_invited_user_does_not_exist(client):
token = generate_token(str(uuid.uuid4()), current_app.config['SECRET_KEY'],
current_app.config['DANGEROUS_SALT'])
url = '/invite/organization/{}'.format(token)
auth_header = create_admin_authorization_header()
response = client.get(url, headers=[('Content-Type', 'application/json'), auth_header])
assert response.status_code == 404
json_resp = json.loads(response.get_data(as_text=True))
assert json_resp['result'] == 'error'
assert json_resp['message'] == 'No result found'
def test_validate_invitation_token_returns_400_when_token_is_malformed(client):
token = generate_token(
str(uuid.uuid4()),
current_app.config['SECRET_KEY'],
current_app.config['DANGEROUS_SALT']
)[:-2]
url = '/invite/organization/{}'.format(token)
auth_header = create_admin_authorization_header()
response = client.get(url, headers=[('Content-Type', 'application/json'), auth_header])
assert response.status_code == 400
json_resp = json.loads(response.get_data(as_text=True))
assert json_resp['result'] == 'error'
assert json_resp['message'] == {
'invitation': 'Somethings wrong with this link. Make sure youve copied the whole thing.'
}
def test_get_invited_org_user(admin_request, sample_invited_org_user):
json_resp = admin_request.get(
'organization_invite.get_invited_org_user',
invited_org_user_id=sample_invited_org_user.id
)
assert json_resp['data']['id'] == str(sample_invited_org_user.id)
assert json_resp['data']['email_address'] == sample_invited_org_user.email_address
assert json_resp['data']['organization'] == str(sample_invited_org_user.organization_id)
def test_get_invited_org_user_404s_if_invite_doesnt_exist(admin_request, sample_invited_org_user, fake_uuid):
json_resp = admin_request.get(
'organization_invite.get_invited_org_user',
invited_org_user_id=fake_uuid,
_expected_status=404
)
assert json_resp['result'] == 'error'

View File

@@ -0,0 +1,876 @@
import uuid
from datetime import datetime
import pytest
from flask import current_app
from freezegun import freeze_time
from sqlalchemy.exc import SQLAlchemyError
from app.dao.organization_dao import (
dao_add_service_to_organization,
dao_add_user_to_organization,
)
from app.dao.services_dao import dao_archive_service
from app.models import AnnualBilling, Organization
from tests.app.db import (
create_annual_billing,
create_domain,
create_email_branding,
create_ft_billing,
create_organization,
create_service,
create_template,
create_user,
)
def test_get_all_organizations(admin_request, notify_db_session):
create_organization(name='inactive org', active=False, organization_type='federal')
create_organization(name='active org', domains=['example.com'])
response = admin_request.get(
'organization.get_organizations',
_expected_status=200
)
assert len(response) == 2
assert set(response[0].keys()) == set(response[1].keys()) == {
'name',
'id',
'active',
'count_of_live_services',
'domains',
'organization_type',
}
assert response[0]['name'] == 'active org'
assert response[0]['active'] is True
assert response[0]['count_of_live_services'] == 0
assert response[0]['domains'] == ['example.com']
assert response[0]['organization_type'] is None
assert response[1]['name'] == 'inactive org'
assert response[1]['active'] is False
assert response[1]['count_of_live_services'] == 0
assert response[1]['domains'] == []
assert response[1]['organization_type'] == 'federal'
def test_get_organization_by_id(admin_request, notify_db_session):
org = create_organization()
response = admin_request.get(
'organization.get_organization_by_id',
_expected_status=200,
organization_id=org.id
)
assert set(response.keys()) == {
'id',
'name',
'active',
'organization_type',
'agreement_signed',
'agreement_signed_at',
'agreement_signed_by_id',
'agreement_signed_version',
'agreement_signed_on_behalf_of_name',
'agreement_signed_on_behalf_of_email_address',
'email_branding_id',
'domains',
'request_to_go_live_notes',
'count_of_live_services',
'notes',
'billing_contact_names',
'billing_contact_email_addresses',
'billing_reference',
'purchase_order_number'
}
assert response['id'] == str(org.id)
assert response['name'] == 'test_org_1'
assert response['active'] is True
assert response['organization_type'] is None
assert response['agreement_signed'] is None
assert response['agreement_signed_by_id'] is None
assert response['agreement_signed_version'] is None
assert response['email_branding_id'] is None
assert response['domains'] == []
assert response['request_to_go_live_notes'] is None
assert response['count_of_live_services'] == 0
assert response['agreement_signed_on_behalf_of_name'] is None
assert response['agreement_signed_on_behalf_of_email_address'] is None
assert response['notes'] is None
assert response['billing_contact_names'] is None
assert response['billing_contact_email_addresses'] is None
assert response['billing_reference'] is None
assert response['purchase_order_number'] is None
def test_get_organization_by_id_returns_domains(admin_request, notify_db_session):
org = create_organization(domains=[
'foo.gov.uk',
'bar.gov.uk',
])
response = admin_request.get(
'organization.get_organization_by_id',
_expected_status=200,
organization_id=org.id
)
assert set(response['domains']) == {
'foo.gov.uk',
'bar.gov.uk',
}
@pytest.mark.parametrize('domain, expected_status', (
('foo.gov.uk', 200),
('bar.gov.uk', 200),
('oof.gov.uk', 404),
('rab.gov.uk', 200),
(None, 400),
('personally.identifying.information@example.com', 400),
))
def test_get_organization_by_domain(
admin_request,
notify_db_session,
domain,
expected_status
):
org = create_organization()
other_org = create_organization('Other organization')
create_domain('foo.gov.uk', org.id)
create_domain('bar.gov.uk', org.id)
create_domain('rab.gov.uk', other_org.id)
response = admin_request.get(
'organization.get_organization_by_domain',
_expected_status=expected_status,
domain=domain,
)
if domain == 'rab.gov.uk' and expected_status == 200:
assert response['id'] == str(other_org.id)
elif expected_status == 200:
assert response['id'] == str(org.id)
else:
assert response['result'] == 'error'
def test_post_create_organization(admin_request, notify_db_session):
data = {
'name': 'test organization',
'active': True,
'organization_type': 'state',
}
response = admin_request.post(
'organization.create_organization',
_data=data,
_expected_status=201
)
organizations = Organization.query.all()
assert data['name'] == response['name']
assert data['active'] == response['active']
assert data['organization_type'] == response['organization_type']
assert len(organizations) == 1
# check that for non-nhs orgs, default branding is not set
assert organizations[0].email_branding_id is None
@pytest.mark.parametrize('org_type', ["nhs_central", "nhs_local", "nhs_gp"])
@pytest.mark.skip(reason='Update for TTS')
def test_post_create_organization_sets_default_nhs_branding_for_nhs_orgs(
admin_request, notify_db_session, nhs_email_branding, org_type
):
data = {
'name': 'test organization',
'active': True,
'organization_type': org_type,
}
admin_request.post(
'organization.create_organization',
_data=data,
_expected_status=201
)
organizations = Organization.query.all()
assert len(organizations) == 1
assert organizations[0].email_branding_id == uuid.UUID(current_app.config['NHS_EMAIL_BRANDING_ID'])
def test_post_create_organization_existing_name_raises_400(admin_request, sample_organization):
data = {
'name': sample_organization.name,
'active': True,
'organization_type': 'federal',
}
response = admin_request.post(
'organization.create_organization',
_data=data,
_expected_status=400
)
organization = Organization.query.all()
assert len(organization) == 1
assert response['message'] == 'Organization name already exists'
@pytest.mark.parametrize('data, expected_error', (
({
'active': False,
'organization_type': 'federal',
}, 'name is a required property'),
({
'active': False,
'name': 'Service name',
}, 'organization_type is a required property'),
({
'active': False,
'name': 'Service name',
'organization_type': 'foo',
}, (
'organization_type foo is not one of '
'[federal, state, other]'
)),
))
def test_post_create_organization_with_missing_data_gives_validation_error(
admin_request,
notify_db_session,
data,
expected_error,
):
response = admin_request.post(
'organization.create_organization',
_data=data,
_expected_status=400
)
assert len(response['errors']) == 1
assert response['errors'][0]['error'] == 'ValidationError'
assert response['errors'][0]['message'] == expected_error
def test_post_update_organization_updates_fields(
admin_request,
notify_db_session,
):
org = create_organization()
data = {
'name': 'new organization name',
'active': False,
'organization_type': 'federal',
}
admin_request.post(
'organization.update_organization',
_data=data,
organization_id=org.id,
_expected_status=204
)
organization = Organization.query.all()
assert len(organization) == 1
assert organization[0].id == org.id
assert organization[0].name == data['name']
assert organization[0].active == data['active']
assert organization[0].domains == []
assert organization[0].organization_type == 'federal'
@pytest.mark.parametrize('domain_list', (
['example.com'],
['example.com', 'example.org', 'example.net'],
[],
))
def test_post_update_organization_updates_domains(
admin_request,
notify_db_session,
domain_list,
):
org = create_organization(name='test_org_2')
data = {
'domains': domain_list
}
admin_request.post(
'organization.update_organization',
_data=data,
organization_id=org.id,
_expected_status=204
)
organization = Organization.query.all()
assert len(organization) == 1
assert [
domain.domain for domain in organization[0].domains
] == domain_list
def test_update_other_organization_attributes_doesnt_clear_domains(
admin_request,
notify_db_session,
):
org = create_organization(name='test_org_2')
create_domain('example.gov.uk', org.id)
admin_request.post(
'organization.update_organization',
_data={'domains': ['example.gov.uk']},
organization_id=org.id,
_expected_status=204
)
assert [
domain.domain for domain in org.domains
] == [
'example.gov.uk'
]
@pytest.mark.parametrize('new_org_type', ["nhs_central", "nhs_local", "nhs_gp"])
@pytest.mark.skip(reason='Update for TTS')
def test_post_update_organization_to_nhs_type_updates_branding_if_none_present(
admin_request,
nhs_email_branding,
notify_db_session,
new_org_type
):
org = create_organization(organization_type='central')
data = {
'organization_type': new_org_type,
}
admin_request.post(
'organization.update_organization',
_data=data,
organization_id=org.id,
_expected_status=204
)
organization = Organization.query.all()
assert len(organization) == 1
assert organization[0].id == org.id
assert organization[0].organization_type == new_org_type
assert organization[0].email_branding_id == uuid.UUID(current_app.config['NHS_EMAIL_BRANDING_ID'])
@pytest.mark.parametrize('new_org_type', ["nhs_central", "nhs_local", "nhs_gp"])
@pytest.mark.skip(reason='Update for TTS')
def test_post_update_organization_to_nhs_type_does_not_update_branding_if_default_branding_set(
admin_request,
nhs_email_branding,
notify_db_session,
new_org_type
):
current_branding = create_email_branding(
logo='example.png',
name='custom branding'
)
org = create_organization(organization_type='central', email_branding_id=current_branding.id)
data = {
'organization_type': new_org_type,
}
admin_request.post(
'organization.update_organization',
_data=data,
organization_id=org.id,
_expected_status=204
)
organization = Organization.query.all()
assert len(organization) == 1
assert organization[0].id == org.id
assert organization[0].organization_type == new_org_type
assert organization[0].email_branding_id == current_branding.id
def test_update_organization_default_branding(
admin_request,
notify_db_session,
):
org = create_organization(name='Test Organization')
email_branding = create_email_branding()
assert org.email_branding is None
admin_request.post(
'organization.update_organization',
_data={
'email_branding_id': str(email_branding.id),
},
organization_id=org.id,
_expected_status=204
)
assert org.email_branding == email_branding
def test_post_update_organization_raises_400_on_existing_org_name(
admin_request, sample_organization):
org = create_organization()
data = {
'name': sample_organization.name,
'active': False
}
response = admin_request.post(
'organization.update_organization',
_data=data,
organization_id=org.id,
_expected_status=400
)
assert response['message'] == 'Organization name already exists'
def test_post_update_organization_gives_404_status_if_org_does_not_exist(admin_request, notify_db_session):
data = {'name': 'new organization name'}
admin_request.post(
'organization.update_organization',
_data=data,
organization_id='31d42ce6-3dac-45a7-95cb-94423d5ca03c',
_expected_status=404
)
organization = Organization.query.all()
assert not organization
def test_post_update_organization_returns_400_if_domain_is_duplicate(admin_request, notify_db_session):
org = create_organization()
org2 = create_organization(name='Second org')
create_domain('same.com', org.id)
data = {'domains': ['new.com', 'same.com']}
response = admin_request.post(
'organization.update_organization',
_data=data,
organization_id=org2.id,
_expected_status=400
)
assert response['message'] == 'Domain already exists'
def test_post_update_organization_set_mou_doesnt_email_if_no_signed_by(
sample_organization,
admin_request,
mocker
):
queue_mock = mocker.patch('app.organization.rest.send_notification_to_queue')
data = {'agreement_signed': True}
admin_request.post(
'organization.update_organization',
_data=data,
organization_id=sample_organization.id,
_expected_status=204
)
assert queue_mock.called is False
@pytest.mark.parametrize('on_behalf_of_name, on_behalf_of_email_address, templates_and_recipients', [
(
None,
None,
{
'MOU_SIGNER_RECEIPT_TEMPLATE_ID': 'notify@digital.cabinet-office.gov.uk',
}
),
(
'Important Person',
'important@person.com',
{
'MOU_SIGNED_ON_BEHALF_ON_BEHALF_RECEIPT_TEMPLATE_ID': 'important@person.com',
'MOU_SIGNED_ON_BEHALF_SIGNER_RECEIPT_TEMPLATE_ID': 'notify@digital.cabinet-office.gov.uk',
}
),
])
def test_post_update_organization_set_mou_emails_signed_by(
sample_organization,
admin_request,
mou_signed_templates,
mocker,
sample_user,
on_behalf_of_name,
on_behalf_of_email_address,
templates_and_recipients
):
queue_mock = mocker.patch('app.organization.rest.send_notification_to_queue')
sample_organization.agreement_signed_on_behalf_of_name = on_behalf_of_name
sample_organization.agreement_signed_on_behalf_of_email_address = on_behalf_of_email_address
admin_request.post(
'organization.update_organization',
_data={'agreement_signed': True, 'agreement_signed_by_id': str(sample_user.id)},
organization_id=sample_organization.id,
_expected_status=204
)
notifications = [x[0][0] for x in queue_mock.call_args_list]
assert {n.template.name: n.to for n in notifications} == templates_and_recipients
for n in notifications:
# we pass in the same personalisation for all templates (though some templates don't use all fields)
assert n.personalisation == {
'mou_link': 'http://localhost:6012/agreement/agreement.pdf',
'org_name': 'sample organization',
'org_dashboard_link': 'http://localhost:6012/organizations/{}'.format(sample_organization.id),
'signed_by_name': 'Test User',
'on_behalf_of_name': on_behalf_of_name
}
def test_post_link_service_to_organization(admin_request, sample_service):
data = {
'service_id': str(sample_service.id)
}
organization = create_organization(organization_type='federal')
admin_request.post(
'organization.link_service_to_organization',
_data=data,
organization_id=organization.id,
_expected_status=204
)
assert len(organization.services) == 1
assert sample_service.organization_type == 'federal'
@freeze_time('2021-09-24 13:30')
def test_post_link_service_to_organization_inserts_annual_billing(admin_request, sample_service):
data = {
'service_id': str(sample_service.id)
}
organization = create_organization(organization_type='federal')
assert len(organization.services) == 0
assert len(AnnualBilling.query.all()) == 0
admin_request.post(
'organization.link_service_to_organization',
_data=data,
organization_id=organization.id,
_expected_status=204
)
annual_billing = AnnualBilling.query.all()
assert len(annual_billing) == 1
assert annual_billing[0].free_sms_fragment_limit == 150000
def test_post_link_service_to_organization_rollback_service_if_annual_billing_update_fails(
admin_request, sample_service, mocker
):
mocker.patch('app.dao.annual_billing_dao.dao_create_or_update_annual_billing_for_year',
side_effect=SQLAlchemyError)
data = {
'service_id': str(sample_service.id)
}
assert not sample_service.organization_type
organization = create_organization(organization_type='federal')
assert len(organization.services) == 0
assert len(AnnualBilling.query.all()) == 0
with pytest.raises(expected_exception=SQLAlchemyError):
admin_request.post(
'organization.link_service_to_organization',
_data=data,
organization_id=organization.id
)
assert not sample_service.organization_type
assert len(organization.services) == 0
assert len(AnnualBilling.query.all()) == 0
@freeze_time('2021-09-24 13:30')
def test_post_link_service_to_another_org(
admin_request, sample_service, sample_organization):
data = {
'service_id': str(sample_service.id)
}
assert len(sample_organization.services) == 0
assert not sample_service.organization_type
admin_request.post(
'organization.link_service_to_organization',
_data=data,
organization_id=sample_organization.id,
_expected_status=204
)
assert len(sample_organization.services) == 1
assert not sample_service.organization_type
new_org = create_organization(organization_type='federal')
admin_request.post(
'organization.link_service_to_organization',
_data=data,
organization_id=new_org.id,
_expected_status=204
)
assert not sample_organization.services
assert len(new_org.services) == 1
assert sample_service.organization_type == 'federal'
annual_billing = AnnualBilling.query.all()
assert len(annual_billing) == 1
assert annual_billing[0].free_sms_fragment_limit == 150000
def test_post_link_service_to_organization_nonexistent_organization(
admin_request, sample_service, fake_uuid):
data = {
'service_id': str(sample_service.id)
}
admin_request.post(
'organization.link_service_to_organization',
_data=data,
organization_id=fake_uuid,
_expected_status=404
)
def test_post_link_service_to_organization_nonexistent_service(
admin_request, sample_organization, fake_uuid):
data = {
'service_id': fake_uuid
}
admin_request.post(
'organization.link_service_to_organization',
_data=data,
organization_id=str(sample_organization.id),
_expected_status=404
)
def test_post_link_service_to_organization_missing_payload(
admin_request, sample_organization, fake_uuid):
admin_request.post(
'organization.link_service_to_organization',
organization_id=str(sample_organization.id),
_expected_status=400
)
def test_rest_get_organization_services(
admin_request, sample_organization, sample_service):
dao_add_service_to_organization(sample_service, sample_organization.id)
response = admin_request.get(
'organization.get_organization_services',
organization_id=str(sample_organization.id),
_expected_status=200
)
assert response == [sample_service.serialize_for_org_dashboard()]
def test_rest_get_organization_services_is_ordered_by_name(
admin_request, sample_organization, sample_service):
service_2 = create_service(service_name='service 2')
service_1 = create_service(service_name='service 1')
dao_add_service_to_organization(service_1, sample_organization.id)
dao_add_service_to_organization(service_2, sample_organization.id)
dao_add_service_to_organization(sample_service, sample_organization.id)
response = admin_request.get(
'organization.get_organization_services',
organization_id=str(sample_organization.id),
_expected_status=200
)
assert response[0]['name'] == sample_service.name
assert response[1]['name'] == service_1.name
assert response[2]['name'] == service_2.name
def test_rest_get_organization_services_inactive_services_at_end(
admin_request, sample_organization):
inactive_service = create_service(service_name='inactive service', active=False)
service = create_service()
inactive_service_1 = create_service(service_name='inactive service 1', active=False)
dao_add_service_to_organization(inactive_service, sample_organization.id)
dao_add_service_to_organization(service, sample_organization.id)
dao_add_service_to_organization(inactive_service_1, sample_organization.id)
response = admin_request.get(
'organization.get_organization_services',
organization_id=str(sample_organization.id),
_expected_status=200
)
assert response[0]['name'] == service.name
assert response[1]['name'] == inactive_service.name
assert response[2]['name'] == inactive_service_1.name
def test_add_user_to_organization_returns_added_user(admin_request, sample_organization, sample_user):
response = admin_request.post(
'organization.add_user_to_organization',
organization_id=str(sample_organization.id),
user_id=str(sample_user.id),
_expected_status=200
)
assert response['data']['id'] == str(sample_user.id)
assert len(response['data']['organizations']) == 1
assert response['data']['organizations'][0] == str(sample_organization.id)
def test_add_user_to_organization_returns_404_if_user_does_not_exist(admin_request, sample_organization):
admin_request.post(
'organization.add_user_to_organization',
organization_id=str(sample_organization.id),
user_id=str(uuid.uuid4()),
_expected_status=404
)
def test_remove_user_from_organization(admin_request, sample_organization, sample_user):
dao_add_user_to_organization(organization_id=sample_organization.id, user_id=sample_user.id)
admin_request.delete(
'organization.remove_user_from_organization',
organization_id=sample_organization.id,
user_id=sample_user.id
)
assert sample_organization.users == []
def test_remove_user_from_organization_when_user_is_not_an_org_member(admin_request, sample_organization, sample_user):
resp = admin_request.delete(
'organization.remove_user_from_organization',
organization_id=sample_organization.id,
user_id=sample_user.id,
_expected_status=404
)
assert resp == {
'result': 'error',
'message': 'User not found'
}
def test_get_organization_users_returns_users_for_organization(admin_request, sample_organization):
first = create_user(email='first@invited.com')
second = create_user(email='another@invited.com')
dao_add_user_to_organization(organization_id=sample_organization.id, user_id=first.id)
dao_add_user_to_organization(organization_id=sample_organization.id, user_id=second.id)
response = admin_request.get(
'organization.get_organization_users',
organization_id=sample_organization.id,
_expected_status=200
)
assert len(response['data']) == 2
assert response['data'][0]['id'] == str(first.id)
@freeze_time('2019-12-24 13:30')
def test_get_organization_services_usage(admin_request, notify_db_session):
org = create_organization(name='Organization without live services')
service = create_service()
template = create_template(service=service)
dao_add_service_to_organization(service=service, organization_id=org.id)
create_annual_billing(service_id=service.id, free_sms_fragment_limit=10, financial_year_start=2019)
create_ft_billing(local_date=datetime.utcnow().date(), template=template, billable_unit=19, rate=0.060,
notifications_sent=19)
response = admin_request.get(
'organization.get_organization_services_usage',
organization_id=org.id,
**{"year": 2019}
)
assert len(response) == 1
assert len(response['services']) == 1
service_usage = response['services'][0]
assert service_usage['service_id'] == str(service.id)
assert service_usage['service_name'] == service.name
assert service_usage['chargeable_billable_sms'] == 9.0
assert service_usage['emails_sent'] == 0
assert service_usage['free_sms_limit'] == 10
assert service_usage['sms_billable_units'] == 19
assert service_usage['sms_remainder'] == 0
assert service_usage['sms_cost'] == 0.54
@freeze_time('2020-02-24 13:30')
def test_get_organization_services_usage_sort_active_first(admin_request, notify_db_session):
org = create_organization(name='Organization without live services')
service = create_service(service_name='live service')
archived_service = create_service(service_name='archived_service')
template = create_template(service=service)
dao_add_service_to_organization(service=service, organization_id=org.id)
dao_add_service_to_organization(service=archived_service, organization_id=org.id)
create_annual_billing(service_id=service.id, free_sms_fragment_limit=10, financial_year_start=2019)
create_ft_billing(local_date=datetime.utcnow().date(), template=template, billable_unit=19, rate=0.060,
notifications_sent=19)
response = admin_request.get(
'organization.get_organization_services_usage',
organization_id=org.id,
**{"year": 2019}
)
assert len(response) == 1
assert len(response['services']) == 2
first_service = response['services'][0]
assert first_service['service_id'] == str(archived_service.id)
assert first_service['service_name'] == archived_service.name
assert first_service['active'] is True
last_service = response['services'][1]
assert last_service['service_id'] == str(service.id)
assert last_service['service_name'] == service.name
assert last_service['active'] is True
dao_archive_service(service_id=archived_service.id)
response_after_archive = admin_request.get(
'organization.get_organization_services_usage',
organization_id=org.id,
**{"year": 2019}
)
first_service = response_after_archive['services'][0]
assert first_service['service_id'] == str(service.id)
assert first_service['service_name'] == service.name
assert first_service['active'] is True
last_service = response_after_archive['services'][1]
assert last_service['service_id'] == str(archived_service.id)
assert last_service['service_name'] == archived_service.name
assert last_service['active'] is False
def test_get_organization_services_usage_returns_400_if_year_is_invalid(admin_request):
response = admin_request.get(
'organization.get_organization_services_usage',
organization_id=uuid.uuid4(),
**{"year": 'not-a-valid-year'},
_expected_status=400
)
assert response['message'] == 'No valid year provided'
def test_get_organization_services_usage_returns_400_if_year_is_empty(admin_request):
response = admin_request.get(
'organization.get_organization_services_usage',
organization_id=uuid.uuid4(),
_expected_status=400
)
assert response['message'] == 'No valid year provided'