diff --git a/app/dao/permissions_dao.py b/app/dao/permissions_dao.py index 601a95293..67015b75c 100644 --- a/app/dao/permissions_dao.py +++ b/app/dao/permissions_dao.py @@ -29,7 +29,7 @@ class PermissionDAO(DAOClass): query.filter_by(service=Service.query.get(service_ids[0])) # TODO the join method for multiple services if 'user' in filter_by_dict: - user_ids = filter_by_dict.getlist('service') + user_ids = filter_by_dict.getlist('user') if len(user_ids) == 1: query = query.filter_by(user=User.query.get(user_ids[0])) # TODO the join method for multiple users diff --git a/tests/app/conftest.py b/tests/app/conftest.py index b0a9626bc..4d52a8af8 100644 --- a/tests/app/conftest.py +++ b/tests/app/conftest.py @@ -237,16 +237,6 @@ def sample_email_job(notify_db, return job -@pytest.fixture(scope='function') -def sample_admin_service_id(notify_db, notify_db_session): - admin_user = sample_user(notify_db, notify_db_session, email="notify_admin@digital.cabinet-office.gov.uk") - admin_service = sample_service(notify_db, notify_db_session, service_name="Sample Admin Service", user=admin_user) - data = {'service': admin_service, 'name': 'sample admin key'} - api_key = ApiKey(**data) - save_model_api_key(api_key) - return admin_service.id - - @pytest.fixture(scope='function') def mock_secret_code(mocker): def _create(): diff --git a/tests/app/user/test_rest.py b/tests/app/user/test_rest.py index c0553cce9..b64b9d0e2 100644 --- a/tests/app/user/test_rest.py +++ b/tests/app/user/test_rest.py @@ -1,79 +1,69 @@ import json + from flask import url_for -from app.models import (User, Service) -from app.dao.users_dao import save_model_user + +from app.models import (User) from tests import create_authorization_header -from tests.app.conftest import sample_service as create_sample_service -def test_get_user_list(notify_api, notify_db, notify_db_session, sample_user, sample_admin_service_id): +def test_get_user_list(notify_api, notify_db, notify_db_session, sample_service): """ Tests GET endpoint '/' to retrieve entire user list. """ with notify_api.test_request_context(): with notify_api.test_client() as client: - header = create_authorization_header(service_id=sample_admin_service_id, - path=url_for('user.get_user'), + header = create_authorization_header(path=url_for('user.get_user'), method='GET') response = client.get(url_for('user.get_user'), headers=[header]) assert response.status_code == 200 json_resp = json.loads(response.get_data(as_text=True)) - assert len(json_resp['data']) == 2 - expected = { - "name": "Test User", - "email_address": sample_user.email_address, - "id": sample_user.id, - "mobile_number": "+447700900986", - "password_changed_at": None, - "logged_in_at": None, - "state": "active", - "failed_login_count": 0, - "permissions": { - str(sample_admin_service_id): [ - 'manage_service', 'send_messages', 'manage_api_keys', 'manage_templates']} - } - print(json_resp['data']) - assert expected in json_resp['data'] + assert len(json_resp['data']) == 1 + sample_user = sample_service.users[0] + expected_permissions = ['manage_service', 'send_messages', 'manage_api_keys', 'manage_templates'] + fetched = json_resp['data'][0] + + assert sample_user.id == fetched['id'] + assert sample_user.name == fetched['name'] + assert sample_user.mobile_number == fetched['mobile_number'] + assert sample_user.email_address == fetched['email_address'] + assert sample_user.state == fetched['state'] + assert sorted(expected_permissions) == sorted(fetched['permissions'][str(sample_service.id)]) -def test_get_user(notify_api, notify_db, notify_db_session, sample_user, sample_admin_service_id): +def test_get_user(notify_api, notify_db, notify_db_session, sample_service): """ Tests GET endpoint '/' to retrieve a single service. """ with notify_api.test_request_context(): with notify_api.test_client() as client: - header = create_authorization_header(service_id=sample_admin_service_id, - path=url_for('user.get_user', user_id=sample_user.id), + sample_user = sample_service.users[0] + header = create_authorization_header(path=url_for('user.get_user', user_id=sample_user.id), method='GET') resp = client.get(url_for('user.get_user', user_id=sample_user.id), headers=[header]) assert resp.status_code == 200 json_resp = json.loads(resp.get_data(as_text=True)) - expected = { - "name": "Test User", - "email_address": sample_user.email_address, - "id": sample_user.id, - "mobile_number": "+447700900986", - "password_changed_at": None, - "logged_in_at": None, - "state": "active", - "failed_login_count": 0, - "permissions": { - str(sample_admin_service_id): [ - 'manage_service', 'send_messages', 'manage_api_keys', 'manage_templates']} - } - assert json_resp['data'] == expected + + expected_permissions = ['manage_service', 'send_messages', 'manage_api_keys', 'manage_templates'] + fetched = json_resp['data'] + + assert sample_user.id == fetched['id'] + assert sample_user.name == fetched['name'] + assert sample_user.mobile_number == fetched['mobile_number'] + assert sample_user.email_address == fetched['email_address'] + assert sample_user.state == fetched['state'] + assert sorted(expected_permissions) == sorted(fetched['permissions'][str(sample_service.id)]) -def test_post_user(notify_api, notify_db, notify_db_session, sample_admin_service_id): +def test_post_user(notify_api, notify_db, notify_db_session): """ Tests POST endpoint '/' to create a user. """ with notify_api.test_request_context(): with notify_api.test_client() as client: - assert User.query.count() == 1 + assert User.query.count() == 0 data = { "name": "Test User", "email_address": "user@digital.cabinet-office.gov.uk", @@ -85,8 +75,7 @@ def test_post_user(notify_api, notify_db, notify_db_session, sample_admin_servic "failed_login_count": 0, "permissions": {} } - auth_header = create_authorization_header(service_id=sample_admin_service_id, - path=url_for('user.create_user'), + auth_header = create_authorization_header(path=url_for('user.create_user'), method='POST', request_body=json.dumps(data)) headers = [('Content-Type', 'application/json'), auth_header] @@ -102,13 +91,13 @@ def test_post_user(notify_api, notify_db, notify_db_session, sample_admin_servic assert json_resp['data']['id'] == user.id -def test_post_user_missing_attribute_email(notify_api, notify_db, notify_db_session, sample_admin_service_id): +def test_post_user_missing_attribute_email(notify_api, notify_db, notify_db_session): """ Tests POST endpoint '/' missing attribute email. """ with notify_api.test_request_context(): with notify_api.test_client() as client: - assert User.query.count() == 1 + assert User.query.count() == 0 data = { "name": "Test User", "password": "password", @@ -119,8 +108,7 @@ def test_post_user_missing_attribute_email(notify_api, notify_db, notify_db_sess "failed_login_count": 0, "permissions": {} } - auth_header = create_authorization_header(service_id=sample_admin_service_id, - path=url_for('user.create_user'), + auth_header = create_authorization_header(path=url_for('user.create_user'), method='POST', request_body=json.dumps(data)) headers = [('Content-Type', 'application/json'), auth_header] @@ -129,18 +117,18 @@ def test_post_user_missing_attribute_email(notify_api, notify_db, notify_db_sess data=json.dumps(data), headers=headers) assert resp.status_code == 400 - assert User.query.count() == 1 + assert User.query.count() == 0 json_resp = json.loads(resp.get_data(as_text=True)) assert {'email_address': ['Missing data for required field.']} == json_resp['message'] -def test_post_user_missing_attribute_password(notify_api, notify_db, notify_db_session, sample_admin_service_id): +def test_post_user_missing_attribute_password(notify_api, notify_db, notify_db_session): """ Tests POST endpoint '/' missing attribute password. """ with notify_api.test_request_context(): with notify_api.test_client() as client: - assert User.query.count() == 1 + assert User.query.count() == 0 data = { "name": "Test User", "email_address": "user@digital.cabinet-office.gov.uk", @@ -151,8 +139,7 @@ def test_post_user_missing_attribute_password(notify_api, notify_db, notify_db_s "failed_login_count": 0, "permissions": {} } - auth_header = create_authorization_header(service_id=sample_admin_service_id, - path=url_for('user.create_user'), + auth_header = create_authorization_header(path=url_for('user.create_user'), method='POST', request_body=json.dumps(data)) headers = [('Content-Type', 'application/json'), auth_header] @@ -161,26 +148,26 @@ def test_post_user_missing_attribute_password(notify_api, notify_db, notify_db_s data=json.dumps(data), headers=headers) assert resp.status_code == 400 - assert User.query.count() == 1 + assert User.query.count() == 0 json_resp = json.loads(resp.get_data(as_text=True)) assert {'password': ['Missing data for required field.']} == json_resp['message'] -def test_put_user(notify_api, notify_db, notify_db_session, sample_user, sample_admin_service_id): +def test_put_user(notify_api, notify_db, notify_db_session, sample_service): """ Tests PUT endpoint '/' to update a user. """ with notify_api.test_request_context(): with notify_api.test_client() as client: - assert User.query.count() == 2 + assert User.query.count() == 1 + sample_user = sample_service.users[0] new_email = 'new@digital.cabinet-office.gov.uk' data = { 'name': sample_user.name, 'email_address': new_email, 'mobile_number': sample_user.mobile_number } - auth_header = create_authorization_header(service_id=sample_admin_service_id, - path=url_for('user.update_user', user_id=sample_user.id), + auth_header = create_authorization_header(path=url_for('user.update_user', user_id=sample_user.id), method='PUT', request_body=json.dumps(data)) headers = [('Content-Type', 'application/json'), auth_header] @@ -189,37 +176,31 @@ def test_put_user(notify_api, notify_db, notify_db_session, sample_user, sample_ data=json.dumps(data), headers=headers) assert resp.status_code == 200 - assert User.query.count() == 2 - user = User.query.filter_by(email_address=new_email).first() + assert User.query.count() == 1 json_resp = json.loads(resp.get_data(as_text=True)) - expected = { - "name": "Test User", - "email_address": new_email, - "mobile_number": "+447700900986", - "password_changed_at": None, - "id": user.id, - "logged_in_at": None, - "state": "active", - "failed_login_count": 0, - "permissions": { - str(sample_admin_service_id): [ - 'manage_service', 'send_messages', 'manage_api_keys', 'manage_templates']} - } - assert json_resp['data'] == expected assert json_resp['data']['email_address'] == new_email + expected_permissions = ['manage_service', 'send_messages', 'manage_api_keys', 'manage_templates'] + fetched = json_resp['data'] + + assert sample_user.id == fetched['id'] + assert sample_user.name == fetched['name'] + assert sample_user.mobile_number == fetched['mobile_number'] + assert new_email == fetched['email_address'] + assert sample_user.state == fetched['state'] + assert sorted(expected_permissions) == sorted(fetched['permissions'][str(sample_service.id)]) def test_put_user_update_password(notify_api, notify_db, notify_db_session, - sample_user, - sample_admin_service_id): + sample_service): """ Tests PUT endpoint '/' to update a user including their password. """ with notify_api.test_request_context(): with notify_api.test_client() as client: - assert User.query.count() == 2 + assert User.query.count() == 1 + sample_user = sample_service.users[0] new_password = '1234567890' data = { 'name': sample_user.name, @@ -227,8 +208,7 @@ def test_put_user_update_password(notify_api, 'mobile_number': sample_user.mobile_number, 'password': new_password } - auth_header = create_authorization_header(service_id=sample_admin_service_id, - path=url_for('user.update_user', user_id=sample_user.id), + auth_header = create_authorization_header(path=url_for('user.update_user', user_id=sample_user.id), method='PUT', request_body=json.dumps(data)) headers = [('Content-Type', 'application/json'), auth_header] @@ -237,12 +217,11 @@ def test_put_user_update_password(notify_api, data=json.dumps(data), headers=headers) assert resp.status_code == 200 - assert User.query.count() == 2 + assert User.query.count() == 1 json_resp = json.loads(resp.get_data(as_text=True)) assert json_resp['data']['password_changed_at'] is not None data = {'password': new_password} - auth_header = create_authorization_header(service_id=sample_admin_service_id, - path=url_for('user.verify_user_password', user_id=sample_user.id), + auth_header = create_authorization_header(path=url_for('user.verify_user_password', user_id=sample_user.id), method='POST', request_body=json.dumps(data)) headers = [('Content-Type', 'application/json'), auth_header] @@ -253,17 +232,16 @@ def test_put_user_update_password(notify_api, assert resp.status_code == 204 -def test_put_user_not_exists(notify_api, notify_db, notify_db_session, sample_user, sample_admin_service_id): +def test_put_user_not_exists(notify_api, notify_db, notify_db_session, sample_user): """ Tests PUT endpoint '/' to update a user doesn't exist. """ with notify_api.test_request_context(): with notify_api.test_client() as client: - assert User.query.count() == 2 + assert User.query.count() == 1 new_email = 'new@digital.cabinet-office.gov.uk' data = {'email_address': new_email} - auth_header = create_authorization_header(service_id=sample_admin_service_id, - path=url_for('user.update_user', user_id="9999"), + auth_header = create_authorization_header(path=url_for('user.update_user', user_id="9999"), method='PUT', request_body=json.dumps(data)) headers = [('Content-Type', 'application/json'), auth_header] @@ -272,7 +250,7 @@ def test_put_user_not_exists(notify_api, notify_db, notify_db_session, sample_us data=json.dumps(data), headers=headers) assert resp.status_code == 404 - assert User.query.count() == 2 + assert User.query.count() == 1 user = User.query.filter_by(id=sample_user.id).first() json_resp = json.loads(resp.get_data(as_text=True)) assert json_resp['result'] == "error" @@ -282,43 +260,36 @@ def test_put_user_not_exists(notify_api, notify_db, notify_db_session, sample_us assert user.email_address != new_email -def test_get_user_by_email(notify_api, notify_db, notify_db_session, sample_user, sample_admin_service_id): +def test_get_user_by_email(notify_api, notify_db, notify_db_session, sample_service): with notify_api.test_request_context(): with notify_api.test_client() as client: - header = create_authorization_header(service_id=sample_admin_service_id, - path=url_for('user.get_by_email'), method='GET') + sample_user = sample_service.users[0] + header = create_authorization_header(path=url_for('user.get_by_email'), method='GET') url = url_for('user.get_by_email', email=sample_user.email_address) resp = client.get(url, headers=[header]) assert resp.status_code == 200 - json_resp = json.loads(resp.get_data(as_text=True)) - expected = { - "name": "Test User", - "email_address": sample_user.email_address, - "id": sample_user.id, - "mobile_number": "+447700900986", - "password_changed_at": None, - "logged_in_at": None, - "state": "active", - "failed_login_count": 0, - "permissions": { - str(sample_admin_service_id): [ - 'manage_service', 'send_messages', 'manage_api_keys', 'manage_templates']} - } - assert json_resp['data'] == expected + json_resp = json.loads(resp.get_data(as_text=True)) + expected_permissions = ['manage_service', 'send_messages', 'manage_api_keys', 'manage_templates'] + fetched = json_resp['data'] + + assert sample_user.id == fetched['id'] + assert sample_user.name == fetched['name'] + assert sample_user.mobile_number == fetched['mobile_number'] + assert sample_user.email_address == fetched['email_address'] + assert sample_user.state == fetched['state'] + assert sorted(expected_permissions) == sorted(fetched['permissions'][str(sample_service.id)]) def test_get_user_by_email_not_found_returns_400(notify_api, notify_db, notify_db_session, - sample_user, - sample_admin_service_id): + sample_user): with notify_api.test_request_context(): with notify_api.test_client() as client: - header = create_authorization_header(service_id=sample_admin_service_id, - path=url_for('user.get_by_email'), method='GET') + header = create_authorization_header(path=url_for('user.get_by_email'), method='GET') url = url_for('user.get_by_email', email='no_user@digital.gov.uk') resp = client.get(url, headers=[header]) assert resp.status_code == 404 @@ -330,13 +301,11 @@ def test_get_user_by_email_not_found_returns_400(notify_api, def test_get_user_by_email_bad_url_returns_404(notify_api, notify_db, notify_db_session, - sample_user, - sample_admin_service_id): + sample_user): with notify_api.test_request_context(): with notify_api.test_client() as client: - header = create_authorization_header(service_id=sample_admin_service_id, - path=url_for('user.get_by_email'), method='GET') + header = create_authorization_header(path=url_for('user.get_by_email'), method='GET') url = '/user/email' resp = client.get(url, headers=[header]) assert resp.status_code == 400