import json import uuid from flask import url_for from app.dao.users_dao import save_model_user from app.dao.services_dao import dao_remove_user_from_service from app.models import User from tests import create_authorization_header from tests.app.conftest import sample_service as create_sample_service from tests.app.conftest import sample_service_permission as create_sample_service_permission from tests.app.conftest import sample_user as create_sample_user def test_get_service_list(notify_api, service_factory): with notify_api.test_request_context(): with notify_api.test_client() as client: service_factory.get('one', email_from='one') service_factory.get('two', email_from='two') service_factory.get('three', email_from='three') auth_header = create_authorization_header() response = client.get( '/service', headers=[auth_header] ) assert response.status_code == 200 json_resp = json.loads(response.get_data(as_text=True)) assert len(json_resp['data']) == 3 assert json_resp['data'][0]['name'] == 'one' assert json_resp['data'][1]['name'] == 'two' assert json_resp['data'][2]['name'] == 'three' def test_get_service_list_by_user(notify_api, sample_user, service_factory): with notify_api.test_request_context(): with notify_api.test_client() as client: service_factory.get('one', sample_user, email_from='one') service_factory.get('two', sample_user, email_from='two') service_factory.get('three', sample_user, email_from='three') auth_header = create_authorization_header() response = client.get( '/service?user_id='.format(sample_user.id), headers=[auth_header] ) json_resp = json.loads(response.get_data(as_text=True)) assert response.status_code == 200 assert len(json_resp['data']) == 3 assert json_resp['data'][0]['name'] == 'one' assert json_resp['data'][1]['name'] == 'two' assert json_resp['data'][2]['name'] == 'three' def test_get_service_list_by_user_should_return_empty_list_if_no_services(notify_api, service_factory, sample_user): with notify_api.test_request_context(): with notify_api.test_client() as client: new_user = User( name='Test User', email_address='new_user@digital.cabinet-office.gov.uk', password='password', mobile_number='+447700900986' ) save_model_user(new_user) service_factory.get('one', sample_user, email_from='one') service_factory.get('two', sample_user, email_from='two') service_factory.get('three', sample_user, email_from='three') auth_header = create_authorization_header() response = client.get( '/service?user_id={}'.format(new_user.id), headers=[auth_header] ) json_resp = json.loads(response.get_data(as_text=True)) assert response.status_code == 200 assert len(json_resp['data']) == 0 def test_get_service_list_should_return_empty_list_if_no_services(notify_api, notify_db, notify_db_session): with notify_api.test_request_context(): with notify_api.test_client() as client: auth_header = create_authorization_header() response = client.get( '/service', headers=[auth_header] ) assert response.status_code == 200 json_resp = json.loads(response.get_data(as_text=True)) assert len(json_resp['data']) == 0 def test_get_service_by_id(notify_api, sample_service): with notify_api.test_request_context(): with notify_api.test_client() as client: auth_header = create_authorization_header() resp = client.get( '/service/{}'.format(sample_service.id), headers=[auth_header] ) assert resp.status_code == 200 json_resp = json.loads(resp.get_data(as_text=True)) assert json_resp['data']['name'] == sample_service.name assert json_resp['data']['id'] == str(sample_service.id) assert not json_resp['data']['research_mode'] def test_get_service_by_id_should_404_if_no_service(notify_api, notify_db): with notify_api.test_request_context(): with notify_api.test_client() as client: service_id = str(uuid.uuid4()) auth_header = create_authorization_header() resp = client.get( '/service/{}'.format(service_id), headers=[auth_header] ) assert resp.status_code == 404 json_resp = json.loads(resp.get_data(as_text=True)) assert json_resp['result'] == 'error' assert json_resp['message'] == 'No result found' def test_get_service_by_id_and_user(notify_api, service_factory, sample_user): with notify_api.test_request_context(): with notify_api.test_client() as client: service = service_factory.get('new service', sample_user, email_from='new.service') auth_header = create_authorization_header() resp = client.get( '/service/{}?user_id={}'.format(service.id, sample_user.id), headers=[auth_header] ) assert resp.status_code == 200 json_resp = json.loads(resp.get_data(as_text=True)) assert json_resp['data']['name'] == service.name assert json_resp['data']['id'] == str(service.id) def test_get_service_by_id_should_404_if_no_service_for_user(notify_api, sample_user): with notify_api.test_request_context(): with notify_api.test_client() as client: service_id = str(uuid.uuid4()) auth_header = create_authorization_header() resp = client.get( '/service/{}?user_id={}'.format(service_id, sample_user.id), headers=[auth_header] ) assert resp.status_code == 404 json_resp = json.loads(resp.get_data(as_text=True)) assert json_resp['result'] == 'error' assert json_resp['message'] == 'No result found' def test_create_service(notify_api, sample_user): with notify_api.test_request_context(): with notify_api.test_client() as client: data = { 'name': 'created service', 'user_id': str(sample_user.id), 'message_limit': 1000, 'restricted': False, 'active': False, 'email_from': 'created.service', 'created_by': str(sample_user.id)} auth_header = create_authorization_header() headers = [('Content-Type', 'application/json'), auth_header] resp = client.post( '/service', data=json.dumps(data), headers=headers) json_resp = json.loads(resp.get_data(as_text=True)) assert resp.status_code == 201 assert json_resp['data']['id'] assert json_resp['data']['name'] == 'created service' assert json_resp['data']['email_from'] == 'created.service' assert not json_resp['data']['research_mode'] auth_header_fetch = create_authorization_header() resp = client.get( '/service/{}?user_id={}'.format(json_resp['data']['id'], sample_user.id), headers=[auth_header_fetch] ) assert resp.status_code == 200 json_resp = json.loads(resp.get_data(as_text=True)) assert json_resp['data']['name'] == 'created service' assert not json_resp['data']['research_mode'] def test_should_not_create_service_with_missing_user_id_field(notify_api, fake_uuid): with notify_api.test_request_context(): with notify_api.test_client() as client: data = { 'email_from': 'service', 'name': 'created service', 'message_limit': 1000, 'restricted': False, 'active': False, 'created_by': str(fake_uuid) } auth_header = create_authorization_header() headers = [('Content-Type', 'application/json'), auth_header] resp = client.post( '/service', data=json.dumps(data), headers=headers) json_resp = json.loads(resp.get_data(as_text=True)) assert resp.status_code == 400 assert json_resp['result'] == 'error' assert 'Missing data for required field.' in json_resp['message']['user_id'] def test_should_error_if_created_by_missing(notify_api, sample_user): with notify_api.test_request_context(): with notify_api.test_client() as client: data = { 'email_from': 'service', 'name': 'created service', 'message_limit': 1000, 'restricted': False, 'active': False, 'user_id': str(sample_user.id) } auth_header = create_authorization_header() headers = [('Content-Type', 'application/json'), auth_header] resp = client.post( '/service', data=json.dumps(data), headers=headers) json_resp = json.loads(resp.get_data(as_text=True)) assert resp.status_code == 400 assert json_resp['result'] == 'error' assert 'Missing data for required field.' in json_resp['message']['created_by'] def test_should_not_create_service_with_missing_if_user_id_is_not_in_database(notify_api, notify_db, notify_db_session, fake_uuid): with notify_api.test_request_context(): with notify_api.test_client() as client: data = { 'email_from': 'service', 'user_id': fake_uuid, 'name': 'created service', 'message_limit': 1000, 'restricted': False, 'active': False, 'created_by': str(fake_uuid) } auth_header = create_authorization_header() headers = [('Content-Type', 'application/json'), auth_header] resp = client.post( '/service', data=json.dumps(data), headers=headers) json_resp = json.loads(resp.get_data(as_text=True)) assert resp.status_code == 404 assert json_resp['result'] == 'error' assert 'No result found' == json_resp['message'] def test_should_not_create_service_if_missing_data(notify_api, sample_user): with notify_api.test_request_context(): with notify_api.test_client() as client: data = { 'user_id': str(sample_user.id) } auth_header = create_authorization_header() headers = [('Content-Type', 'application/json'), auth_header] resp = client.post( '/service', data=json.dumps(data), headers=headers) json_resp = json.loads(resp.get_data(as_text=True)) assert resp.status_code == 400 assert json_resp['result'] == 'error' assert 'Missing data for required field.' in json_resp['message']['name'] assert 'Missing data for required field.' in json_resp['message']['active'] assert 'Missing data for required field.' in json_resp['message']['message_limit'] assert 'Missing data for required field.' in json_resp['message']['restricted'] def test_should_not_create_service_with_duplicate_name(notify_api, notify_db, notify_db_session, sample_user, sample_service): with notify_api.test_request_context(): with notify_api.test_client() as client: data = { 'name': sample_service.name, 'user_id': str(sample_service.users[0].id), 'message_limit': 1000, 'restricted': False, 'active': False, 'email_from': 'sample.service2', 'created_by': str(sample_user.id)} auth_header = create_authorization_header() headers = [('Content-Type', 'application/json'), auth_header] resp = client.post( '/service', data=json.dumps(data), headers=headers) json_resp = json.loads(resp.get_data(as_text=True)) assert json_resp['result'] == 'error' assert "Duplicate service name '{}'".format(sample_service.name) in json_resp['message']['name'] def test_create_service_should_throw_duplicate_key_constraint_for_existing_email_from(notify_api, notify_db, notify_db_session, service_factory, sample_user): first_service = service_factory.get('First service', email_from='first.service') with notify_api.test_request_context(): with notify_api.test_client() as client: service_name = 'First SERVICE' data = { 'name': service_name, 'user_id': str(first_service.users[0].id), 'message_limit': 1000, 'restricted': False, 'active': False, 'email_from': 'first.service', 'created_by': str(sample_user.id)} auth_header = create_authorization_header() headers = [('Content-Type', 'application/json'), auth_header] resp = client.post( '/service', data=json.dumps(data), headers=headers) json_resp = json.loads(resp.get_data(as_text=True)) assert json_resp['result'] == 'error' assert "Duplicate service name '{}'".format(service_name) in json_resp['message']['name'] def test_update_service(notify_api, sample_service): with notify_api.test_request_context(): with notify_api.test_client() as client: auth_header = create_authorization_header() resp = client.get( '/service/{}'.format(sample_service.id), headers=[auth_header] ) json_resp = json.loads(resp.get_data(as_text=True)) assert resp.status_code == 200 assert json_resp['data']['name'] == sample_service.name data = { 'name': 'updated service name', 'email_from': 'updated.service.name', 'created_by': str(sample_service.created_by.id) } auth_header = create_authorization_header() resp = client.post( '/service/{}'.format(sample_service.id), data=json.dumps(data), headers=[('Content-Type', 'application/json'), auth_header] ) result = json.loads(resp.get_data(as_text=True)) assert resp.status_code == 200 assert result['data']['name'] == 'updated service name' assert result['data']['email_from'] == 'updated.service.name' def test_update_service_research_mode(notify_api, sample_service): with notify_api.test_request_context(): with notify_api.test_client() as client: auth_header = create_authorization_header() resp = client.get( '/service/{}'.format(sample_service.id), headers=[auth_header] ) json_resp = json.loads(resp.get_data(as_text=True)) assert resp.status_code == 200 assert json_resp['data']['name'] == sample_service.name assert not json_resp['data']['research_mode'] data = { 'research_mode': True } auth_header = create_authorization_header() resp = client.post( '/service/{}'.format(sample_service.id), data=json.dumps(data), headers=[('Content-Type', 'application/json'), auth_header] ) result = json.loads(resp.get_data(as_text=True)) assert resp.status_code == 200 assert result['data']['research_mode'] def test_update_service_research_mode_throws_validation_error(notify_api, sample_service): with notify_api.test_request_context(): with notify_api.test_client() as client: auth_header = create_authorization_header() resp = client.get( '/service/{}'.format(sample_service.id), headers=[auth_header] ) json_resp = json.loads(resp.get_data(as_text=True)) assert resp.status_code == 200 assert json_resp['data']['name'] == sample_service.name assert not json_resp['data']['research_mode'] data = { 'research_mode': "dedede" } auth_header = create_authorization_header() resp = client.post( '/service/{}'.format(sample_service.id), data=json.dumps(data), headers=[('Content-Type', 'application/json'), auth_header] ) result = json.loads(resp.get_data(as_text=True)) result['message']['research_mode'][0] == "Not a valid boolean." assert resp.status_code == 400 def test_should_not_update_service_with_duplicate_name(notify_api, notify_db, notify_db_session, sample_user, sample_service): with notify_api.test_request_context(): with notify_api.test_client() as client: service_name = "another name" service = create_sample_service( notify_db, notify_db_session, service_name=service_name, user=sample_user, email_from='another.name') data = { 'name': service_name, 'created_by': str(service.created_by.id) } auth_header = create_authorization_header() resp = client.post( '/service/{}'.format(sample_service.id), data=json.dumps(data), headers=[('Content-Type', 'application/json'), auth_header] ) assert resp.status_code == 400 json_resp = json.loads(resp.get_data(as_text=True)) assert json_resp['result'] == 'error' assert "Duplicate service name '{}'".format(service_name) in json_resp['message']['name'] def test_should_not_update_service_with_duplicate_email_from(notify_api, notify_db, notify_db_session, sample_user, sample_service): with notify_api.test_request_context(): with notify_api.test_client() as client: email_from = "duplicate.name" service_name = "duplicate name" service = create_sample_service( notify_db, notify_db_session, service_name=service_name, user=sample_user, email_from=email_from) data = { 'name': service_name, 'email_from': email_from, 'created_by': str(service.created_by.id) } auth_header = create_authorization_header() resp = client.post( '/service/{}'.format(sample_service.id), data=json.dumps(data), headers=[('Content-Type', 'application/json'), auth_header] ) assert resp.status_code == 400 json_resp = json.loads(resp.get_data(as_text=True)) assert json_resp['result'] == 'error' assert ( "Duplicate service name '{}'".format(service_name) in json_resp['message']['name'] or "Duplicate service name '{}'".format(email_from) in json_resp['message']['name'] ) def test_update_service_should_404_if_id_is_invalid(notify_api, notify_db, notify_db_session): with notify_api.test_request_context(): with notify_api.test_client() as client: data = { 'name': 'updated service name' } missing_service_id = uuid.uuid4() auth_header = create_authorization_header() resp = client.post( '/service/{}'.format(missing_service_id), data=json.dumps(data), headers=[('Content-Type', 'application/json'), auth_header] ) assert resp.status_code == 404 def test_get_users_by_service(notify_api, notify_db, notify_db_session, sample_service): with notify_api.test_request_context(): with notify_api.test_client() as client: user_on_service = sample_service.users[0] auth_header = create_authorization_header() resp = client.get( '/service/{}/users'.format(sample_service.id), headers=[('Content-Type', 'application/json'), auth_header] ) assert resp.status_code == 200 result = json.loads(resp.get_data(as_text=True)) assert len(result['data']) == 1 assert result['data'][0]['name'] == user_on_service.name assert result['data'][0]['email_address'] == user_on_service.email_address assert result['data'][0]['mobile_number'] == user_on_service.mobile_number def test_get_users_for_service_returns_empty_list_if_no_users_associated_with_service(notify_api, notify_db, notify_db_session, sample_service): with notify_api.test_request_context(): with notify_api.test_client() as client: dao_remove_user_from_service(sample_service, sample_service.users[0]) auth_header = create_authorization_header() response = client.get( '/service/{}/users'.format(sample_service.id), headers=[('Content-Type', 'application/json'), auth_header] ) result = json.loads(response.get_data(as_text=True)) assert response.status_code == 200 assert result['data'] == [] def test_get_users_for_service_returns_404_when_service_does_not_exist(notify_api, notify_db, notify_db_session): with notify_api.test_request_context(): with notify_api.test_client() as client: service_id = uuid.uuid4() auth_header = create_authorization_header() response = client.get( '/service/{}/users'.format(service_id), headers=[('Content-Type', 'application/json'), auth_header] ) assert response.status_code == 404 result = json.loads(response.get_data(as_text=True)) assert result['result'] == 'error' assert result['message'] == 'No result found' def test_default_permissions_are_added_for_user_service(notify_api, notify_db, notify_db_session, sample_service, sample_user): with notify_api.test_request_context(): with notify_api.test_client() as client: data = { 'name': 'created service', 'user_id': str(sample_user.id), 'message_limit': 1000, 'restricted': False, 'active': False, 'email_from': 'created.service', 'created_by': str(sample_user.id)} auth_header = create_authorization_header() headers = [('Content-Type', 'application/json'), auth_header] resp = client.post( '/service', data=json.dumps(data), headers=headers) json_resp = json.loads(resp.get_data(as_text=True)) assert resp.status_code == 201 assert json_resp['data']['id'] assert json_resp['data']['name'] == 'created service' assert json_resp['data']['email_from'] == 'created.service' auth_header_fetch = create_authorization_header() resp = client.get( '/service/{}?user_id={}'.format(json_resp['data']['id'], sample_user.id), headers=[auth_header_fetch] ) assert resp.status_code == 200 header = create_authorization_header() response = client.get( url_for('user.get_user', user_id=sample_user.id), headers=[header]) assert response.status_code == 200 json_resp = json.loads(response.get_data(as_text=True)) service_permissions = json_resp['data']['permissions'][str(sample_service.id)] from app.dao.permissions_dao import default_service_permissions assert sorted(default_service_permissions) == sorted(service_permissions) def test_add_existing_user_to_another_service_with_all_permissions(notify_api, notify_db, notify_db_session, sample_service, sample_user): with notify_api.test_request_context(): with notify_api.test_client() as client: # check which users part of service user_already_in_service = sample_service.users[0] auth_header = create_authorization_header() resp = client.get( '/service/{}/users'.format(sample_service.id), headers=[('Content-Type', 'application/json'), auth_header] ) assert resp.status_code == 200 result = json.loads(resp.get_data(as_text=True)) assert len(result['data']) == 1 assert result['data'][0]['email_address'] == user_already_in_service.email_address # add new user to service user_to_add = User( name='Invited User', email_address='invited@digital.cabinet-office.gov.uk', password='password', mobile_number='+4477123456' ) # they must exist in db first save_model_user(user_to_add) data = [{"permission": "send_emails"}, {"permission": "send_letters"}, {"permission": "send_texts"}, {"permission": "manage_users"}, {"permission": "manage_settings"}, {"permission": "manage_api_keys"}, {"permission": "manage_templates"}, {"permission": "view_activity"}] auth_header = create_authorization_header() resp = client.post( '/service/{}/users/{}'.format(sample_service.id, user_to_add.id), headers=[('Content-Type', 'application/json'), auth_header], data=json.dumps(data) ) assert resp.status_code == 201 # check new user added to service auth_header = create_authorization_header() resp = client.get( '/service/{}'.format(sample_service.id), headers=[('Content-Type', 'application/json'), auth_header], ) assert resp.status_code == 200 json_resp = json.loads(resp.get_data(as_text=True)) assert str(user_to_add.id) in json_resp['data']['users'] # check user has all permissions auth_header = create_authorization_header() resp = client.get(url_for('user.get_user', user_id=user_to_add.id), headers=[('Content-Type', 'application/json'), auth_header]) assert resp.status_code == 200 json_resp = json.loads(resp.get_data(as_text=True)) permissions = json_resp['data']['permissions'][str(sample_service.id)] expected_permissions = ['send_texts', 'send_emails', 'send_letters', 'manage_users', 'manage_settings', 'manage_templates', 'manage_api_keys', 'view_activity'] assert sorted(expected_permissions) == sorted(permissions) def test_add_existing_user_to_another_service_with_send_permissions(notify_api, notify_db, notify_db_session, sample_service, sample_user): with notify_api.test_request_context(): with notify_api.test_client() as client: # they must exist in db first user_to_add = User( name='Invited User', email_address='invited@digital.cabinet-office.gov.uk', password='password', mobile_number='+4477123456' ) save_model_user(user_to_add) data = [{"permission": "send_emails"}, {"permission": "send_letters"}, {"permission": "send_texts"}] auth_header = create_authorization_header() resp = client.post( '/service/{}/users/{}'.format(sample_service.id, user_to_add.id), headers=[('Content-Type', 'application/json'), auth_header], data=json.dumps(data) ) assert resp.status_code == 201 # check user has send permissions auth_header = create_authorization_header() resp = client.get(url_for('user.get_user', user_id=user_to_add.id), headers=[('Content-Type', 'application/json'), auth_header]) assert resp.status_code == 200 json_resp = json.loads(resp.get_data(as_text=True)) permissions = json_resp['data']['permissions'][str(sample_service.id)] expected_permissions = ['send_texts', 'send_emails', 'send_letters'] assert sorted(expected_permissions) == sorted(permissions) def test_add_existing_user_to_another_service_with_manage_permissions(notify_api, notify_db, notify_db_session, sample_service, sample_user): with notify_api.test_request_context(): with notify_api.test_client() as client: # they must exist in db first user_to_add = User( name='Invited User', email_address='invited@digital.cabinet-office.gov.uk', password='password', mobile_number='+4477123456' ) save_model_user(user_to_add) data = [{"permission": "manage_users"}, {"permission": "manage_settings"}, {"permission": "manage_templates"}] auth_header = create_authorization_header() resp = client.post( '/service/{}/users/{}'.format(sample_service.id, user_to_add.id), headers=[('Content-Type', 'application/json'), auth_header], data=json.dumps(data) ) assert resp.status_code == 201 # check user has send permissions auth_header = create_authorization_header() resp = client.get(url_for('user.get_user', user_id=user_to_add.id), headers=[('Content-Type', 'application/json'), auth_header]) assert resp.status_code == 200 json_resp = json.loads(resp.get_data(as_text=True)) permissions = json_resp['data']['permissions'][str(sample_service.id)] expected_permissions = ['manage_users', 'manage_settings', 'manage_templates'] assert sorted(expected_permissions) == sorted(permissions) def test_add_existing_user_to_another_service_with_manage_api_keys(notify_api, notify_db, notify_db_session, sample_service, sample_user): with notify_api.test_request_context(): with notify_api.test_client() as client: # they must exist in db first user_to_add = User( name='Invited User', email_address='invited@digital.cabinet-office.gov.uk', password='password', mobile_number='+4477123456' ) save_model_user(user_to_add) data = [{"permission": "manage_api_keys"}] auth_header = create_authorization_header() resp = client.post( '/service/{}/users/{}'.format(sample_service.id, user_to_add.id), headers=[('Content-Type', 'application/json'), auth_header], data=json.dumps(data) ) assert resp.status_code == 201 # check user has send permissions auth_header = create_authorization_header() resp = client.get(url_for('user.get_user', user_id=user_to_add.id), headers=[('Content-Type', 'application/json'), auth_header]) assert resp.status_code == 200 json_resp = json.loads(resp.get_data(as_text=True)) permissions = json_resp['data']['permissions'][str(sample_service.id)] expected_permissions = ['manage_api_keys'] assert sorted(expected_permissions) == sorted(permissions) def test_add_existing_user_to_non_existing_service_returns404(notify_api, notify_db, notify_db_session, sample_user): with notify_api.test_request_context(): with notify_api.test_client() as client: user_to_add = User( name='Invited User', email_address='invited@digital.cabinet-office.gov.uk', password='password', mobile_number='+4477123456' ) save_model_user(user_to_add) incorrect_id = uuid.uuid4() data = {'permissions': ['send_messages', 'manage_service', 'manage_api_keys']} auth_header = create_authorization_header() resp = client.post( '/service/{}/users/{}'.format(incorrect_id, user_to_add.id), headers=[('Content-Type', 'application/json'), auth_header], data=json.dumps(data) ) result = json.loads(resp.get_data(as_text=True)) expected_message = 'No result found' assert resp.status_code == 404 assert result['result'] == 'error' assert result['message'] == expected_message def test_add_existing_user_of_service_to_service_returns400(notify_api, notify_db, notify_db_session, sample_service): with notify_api.test_request_context(): with notify_api.test_client() as client: existing_user_id = sample_service.users[0].id data = {'permissions': ['send_messages', 'manage_service', 'manage_api_keys']} auth_header = create_authorization_header() resp = client.post( '/service/{}/users/{}'.format(sample_service.id, existing_user_id), headers=[('Content-Type', 'application/json'), auth_header], data=json.dumps(data) ) result = json.loads(resp.get_data(as_text=True)) expected_message = 'User id: {} already part of service id: {}'.format(existing_user_id, sample_service.id) assert resp.status_code == 400 assert result['result'] == 'error' assert result['message'] == expected_message def test_add_unknown_user_to_service_returns404(notify_api, notify_db, notify_db_session, sample_service): with notify_api.test_request_context(): with notify_api.test_client() as client: incorrect_id = 9876 data = {'permissions': ['send_messages', 'manage_service', 'manage_api_keys']} auth_header = create_authorization_header() resp = client.post( '/service/{}/users/{}'.format(sample_service.id, incorrect_id), headers=[('Content-Type', 'application/json'), auth_header], data=json.dumps(data) ) result = json.loads(resp.get_data(as_text=True)) expected_message = 'No result found' assert resp.status_code == 404 assert result['result'] == 'error' assert result['message'] == expected_message def test_remove_user_from_service(notify_api, notify_db, notify_db_session, sample_service_permission): with notify_api.test_request_context(): with notify_api.test_client() as client: second_user = create_sample_user( notify_db, notify_db_session, email="new@digital.cabinet-office.gov.uk") # Simulates successfully adding a user to the service second_permission = create_sample_service_permission( notify_db, notify_db_session, user=second_user) endpoint = url_for( 'service.remove_user_from_service', service_id=str(second_permission.service.id), user_id=str(second_permission.user.id)) auth_header = create_authorization_header() resp = client.delete( endpoint, headers=[('Content-Type', 'application/json'), auth_header]) assert resp.status_code == 204 def test_remove_user_from_service(notify_api, notify_db, notify_db_session, sample_service_permission): with notify_api.test_request_context(): with notify_api.test_client() as client: second_user = create_sample_user( notify_db, notify_db_session, email="new@digital.cabinet-office.gov.uk") endpoint = url_for( 'service.remove_user_from_service', service_id=str(sample_service_permission.service.id), user_id=str(second_user.id)) auth_header = create_authorization_header() resp = client.delete( endpoint, headers=[('Content-Type', 'application/json'), auth_header]) assert resp.status_code == 404 def test_cannot_remove_only_user_from_service(notify_api, notify_db, notify_db_session, sample_service_permission): with notify_api.test_request_context(): with notify_api.test_client() as client: endpoint = url_for( 'service.remove_user_from_service', service_id=str(sample_service_permission.service.id), user_id=str(sample_service_permission.user.id)) auth_header = create_authorization_header() resp = client.delete( endpoint, headers=[('Content-Type', 'application/json'), auth_header]) assert resp.status_code == 400 result = json.loads(resp.get_data(as_text=True)) assert result['message'] == 'You cannot remove the only user for a service' # This test is just here verify get_service_and_api_key_history that is a temp solution # until proper ui is sorted out on admin app def test_get_service_and_api_key_history(notify_api, notify_db, notify_db_session, sample_service): from tests.app.conftest import sample_api_key as create_sample_api_key api_key = create_sample_api_key(notify_db, notify_db_session, service=sample_service) with notify_api.test_request_context(): with notify_api.test_client() as client: auth_header = create_authorization_header() response = client.get( path='/service/{}/history'.format(sample_service.id), headers=[auth_header] ) assert response.status_code == 200 json_resp = json.loads(response.get_data(as_text=True)) assert json_resp['data']['service_history'][0]['id'] == str(sample_service.id) assert json_resp['data']['api_key_history'][0]['id'] == str(api_key.id) def test_set_reply_to_email_for_service(notify_api, sample_service): with notify_api.test_request_context(): with notify_api.test_client() as client: auth_header = create_authorization_header() resp = client.get( '/service/{}'.format(sample_service.id), headers=[auth_header] ) json_resp = json.loads(resp.get_data(as_text=True)) assert resp.status_code == 200 assert json_resp['data']['name'] == sample_service.name data = { 'reply_to_email_address': 'reply_test@service.gov.uk', } auth_header = create_authorization_header() resp = client.post( '/service/{}'.format(sample_service.id), data=json.dumps(data), headers=[('Content-Type', 'application/json'), auth_header] ) result = json.loads(resp.get_data(as_text=True)) assert resp.status_code == 200 assert result['data']['reply_to_email_address'] == 'reply_test@service.gov.uk'