diff --git a/tests/app/user/test_rest.py b/tests/app/user/test_rest.py index 24af49935..e5558038c 100644 --- a/tests/app/user/test_rest.py +++ b/tests/app/user/test_rest.py @@ -1,10 +1,9 @@ -import json import uuid from datetime import datetime from unittest import mock import pytest -from flask import current_app, url_for +from flask import current_app from freezegun import freeze_time from app.dao.permissions_dao import default_service_permissions @@ -21,7 +20,6 @@ from app.models import ( Permission, User, ) -from tests import create_admin_authorization_header from tests.app.db import ( create_organisation, create_service, @@ -99,11 +97,11 @@ def test_get_user_doesnt_return_inactive_services_and_orgs(admin_request, sample assert fetched['permissions'] == {} -def test_post_user(client, notify_db, notify_db_session): +def test_post_user(admin_request, notify_db_session): """ Tests POST endpoint '/' to create a user. """ - assert User.query.count() == 0 + User.query.delete() data = { "name": "Test User", "email_address": "user@digital.cabinet-office.gov.uk", @@ -115,22 +113,16 @@ def test_post_user(client, notify_db, notify_db_session): "permissions": {}, "auth_type": EMAIL_AUTH_TYPE } - auth_header = create_admin_authorization_header() - headers = [('Content-Type', 'application/json'), auth_header] - resp = client.post( - url_for('user.create_user'), - data=json.dumps(data), - headers=headers) - assert resp.status_code == 201 + json_resp = admin_request.post('user.create_user', _data=data, _expected_status=201) + user = User.query.filter_by(email_address='user@digital.cabinet-office.gov.uk').first() - json_resp = json.loads(resp.get_data(as_text=True)) assert json_resp['data']['email_address'] == user.email_address assert json_resp['data']['id'] == str(user.id) assert user.auth_type == EMAIL_AUTH_TYPE def test_post_user_without_auth_type(admin_request, notify_db_session): - assert User.query.count() == 0 + User.query.delete() data = { "name": "Test User", "email_address": "user@digital.cabinet-office.gov.uk", @@ -146,11 +138,11 @@ def test_post_user_without_auth_type(admin_request, notify_db_session): assert user.auth_type == SMS_AUTH_TYPE -def test_post_user_missing_attribute_email(client, notify_db, notify_db_session): +def test_post_user_missing_attribute_email(admin_request, notify_db_session): """ Tests POST endpoint '/' missing attribute email. """ - assert User.query.count() == 0 + User.query.delete() data = { "name": "Test User", "password": "password", @@ -160,23 +152,17 @@ def test_post_user_missing_attribute_email(client, notify_db, notify_db_session) "failed_login_count": 0, "permissions": {} } - auth_header = create_admin_authorization_header() - headers = [('Content-Type', 'application/json'), auth_header] - resp = client.post( - url_for('user.create_user'), - data=json.dumps(data), - headers=headers) - assert resp.status_code == 400 + json_resp = admin_request.post('user.create_user', _data=data, _expected_status=400) + assert User.query.count() == 0 - json_resp = json.loads(resp.get_data(as_text=True)) assert {'email_address': ['Missing data for required field.']} == json_resp['message'] -def test_create_user_missing_attribute_password(client, notify_db, notify_db_session): +def test_create_user_missing_attribute_password(admin_request, notify_db_session): """ Tests POST endpoint '/' missing attribute password. """ - assert User.query.count() == 0 + User.query.delete() data = { "name": "Test User", "email_address": "user@digital.cabinet-office.gov.uk", @@ -186,15 +172,8 @@ def test_create_user_missing_attribute_password(client, notify_db, notify_db_ses "failed_login_count": 0, "permissions": {} } - auth_header = create_admin_authorization_header() - headers = [('Content-Type', 'application/json'), auth_header] - resp = client.post( - url_for('user.create_user'), - data=json.dumps(data), - headers=headers) - assert resp.status_code == 400 + json_resp = admin_request.post('user.create_user', _data=data, _expected_status=400) assert User.query.count() == 0 - json_resp = json.loads(resp.get_data(as_text=True)) assert {'password': ['Missing data for required field.']} == json_resp['message'] @@ -252,22 +231,20 @@ def test_cannot_create_user_with_empty_strings(admin_request, notify_db_session) ('email_address', 'newuser@mail.com'), ('mobile_number', '+4407700900460') ]) -def test_post_user_attribute(client, sample_user, user_attribute, user_value): +def test_post_user_attribute(admin_request, sample_user, user_attribute, user_value): assert getattr(sample_user, user_attribute) != user_value update_dict = { user_attribute: user_value } - auth_header = create_admin_authorization_header() - headers = [('Content-Type', 'application/json'), auth_header] - resp = client.post( - url_for('user.update_user_attribute', user_id=sample_user.id), - data=json.dumps(update_dict), - headers=headers) + json_resp = admin_request.post( + 'user.update_user_attribute', + user_id=sample_user.id, + _data=update_dict + ) - assert resp.status_code == 200 - json_resp = json.loads(resp.get_data(as_text=True)) assert json_resp['data'][user_attribute] == user_value + assert getattr(sample_user, user_attribute) == user_value @pytest.mark.parametrize('user_attribute, user_value, arguments', [ @@ -292,7 +269,7 @@ def test_post_user_attribute(client, sample_user, user_attribute, user_value): )) ]) def test_post_user_attribute_with_updated_by( - client, mocker, sample_user, user_attribute, + admin_request, mocker, sample_user, user_attribute, user_value, arguments, team_member_email_edit_template, team_member_mobile_edit_template ): updater = create_user(name="Service Manago", email="notify_manago@digital.cabinet-office.gov.uk") @@ -301,17 +278,13 @@ def test_post_user_attribute_with_updated_by( user_attribute: user_value, 'updated_by': str(updater.id) } - auth_header = create_admin_authorization_header() - headers = [('Content-Type', 'application/json'), auth_header] mock_persist_notification = mocker.patch('app.user.rest.persist_notification') mocker.patch('app.user.rest.send_notification_to_queue') - resp = client.post( - url_for('user.update_user_attribute', user_id=sample_user.id), - data=json.dumps(update_dict), - headers=headers) - - assert resp.status_code == 200, resp.get_data(as_text=True) - json_resp = json.loads(resp.get_data(as_text=True)) + json_resp = admin_request.post( + 'user.update_user_attribute', + user_id=sample_user.id, + _data=update_dict + ) assert json_resp['data'][user_attribute] == user_value if arguments: mock_persist_notification.assert_called_once_with(**arguments) @@ -320,74 +293,66 @@ def test_post_user_attribute_with_updated_by( def test_post_user_attribute_with_updated_by_sends_notification_to_international_from_number( - client, mocker, sample_user, team_member_mobile_edit_template + admin_request, mocker, sample_user, team_member_mobile_edit_template ): updater = create_user(name="Service Manago") update_dict = { 'mobile_number': '+601117224412', 'updated_by': str(updater.id) } - auth_header = create_admin_authorization_header() - headers = [('Content-Type', 'application/json'), auth_header] mocker.patch('app.user.rest.send_notification_to_queue') - resp = client.post( - url_for('user.update_user_attribute', user_id=sample_user.id), - data=json.dumps(update_dict), - headers=headers) - assert resp.status_code == 200, resp.get_data(as_text=True) + admin_request.post( + 'user.update_user_attribute', + user_id=sample_user.id, + _data=update_dict + ) notification = Notification.query.first() assert notification.reply_to_text == current_app.config['NOTIFY_INTERNATIONAL_SMS_SENDER'] -def test_archive_user(mocker, client, sample_user): +def test_archive_user(mocker, admin_request, sample_user): archive_mock = mocker.patch('app.user.rest.dao_archive_user') - response = client.post( - url_for('user.archive_user', user_id=sample_user.id), - headers=[create_admin_authorization_header()] + admin_request.post( + 'user.archive_user', + user_id=sample_user.id, + _expected_status=204 ) - assert response.status_code == 204 archive_mock.assert_called_once_with(sample_user) -def test_archive_user_when_user_does_not_exist_gives_404(mocker, client, fake_uuid, notify_db_session): +def test_archive_user_when_user_does_not_exist_gives_404(mocker, admin_request, fake_uuid, notify_db_session): archive_mock = mocker.patch('app.user.rest.dao_archive_user') - response = client.post( - url_for('user.archive_user', user_id=fake_uuid), - headers=[create_admin_authorization_header()] + admin_request.post( + 'user.archive_user', + user_id=fake_uuid, + _expected_status=404 ) - assert response.status_code == 404 archive_mock.assert_not_called() -def test_archive_user_when_user_cannot_be_archived(mocker, client, sample_user): +def test_archive_user_when_user_cannot_be_archived(mocker, admin_request, sample_user): mocker.patch('app.dao.users_dao.user_can_be_archived', return_value=False) - response = client.post( - url_for('user.archive_user', user_id=sample_user.id), - headers=[create_admin_authorization_header()] + json_resp = admin_request.post( + 'user.archive_user', user_id=sample_user.id, + _expected_status=400 ) - json_resp = json.loads(response.get_data(as_text=True)) - msg = "User can’t be removed from a service - check all services have another team member with manage_settings" - assert response.status_code == 400 assert json_resp['message'] == msg -def test_get_user_by_email(client, sample_service): +def test_get_user_by_email(admin_request, sample_service): sample_user = sample_service.users[0] - header = create_admin_authorization_header() - url = url_for('user.get_by_email', email=sample_user.email_address) - resp = client.get(url, headers=[header]) - assert resp.status_code == 200 - json_resp = json.loads(resp.get_data(as_text=True)) + json_resp = admin_request.get('user.get_by_email', email=sample_user.email_address) + expected_permissions = default_service_permissions fetched = json_resp['data'] @@ -399,22 +364,21 @@ def test_get_user_by_email(client, sample_service): assert sorted(expected_permissions) == sorted(fetched['permissions'][str(sample_service.id)]) -def test_get_user_by_email_not_found_returns_404(client, sample_user): - header = create_admin_authorization_header() - url = url_for('user.get_by_email', email='no_user@digital.gov.uk') - resp = client.get(url, headers=[header]) - assert resp.status_code == 404 - json_resp = json.loads(resp.get_data(as_text=True)) +def test_get_user_by_email_not_found_returns_404(admin_request, sample_user): + json_resp = admin_request.get( + 'user.get_by_email', + email='no_user@digital.gov.uk', + _expected_status=404 + ) assert json_resp['result'] == 'error' assert json_resp['message'] == 'No result found' -def test_get_user_by_email_bad_url_returns_404(client, sample_user): - header = create_admin_authorization_header() - url = '/user/email' - resp = client.get(url, headers=[header]) - assert resp.status_code == 400 - json_resp = json.loads(resp.get_data(as_text=True)) +def test_get_user_by_email_bad_url_returns_404(admin_request, sample_user): + json_resp = admin_request.get( + 'user.get_by_email', + _expected_status=400 + ) assert json_resp['result'] == 'error' assert json_resp['message'] == 'Invalid request. Email query string param required' @@ -457,47 +421,41 @@ def test_fetch_user_by_email_without_email_returns_400(admin_request, notify_db_ assert resp['message'] == {'email': ['Missing data for required field.']} -def test_get_user_with_permissions(client, sample_user_service_permission): - header = create_admin_authorization_header() - response = client.get(url_for('user.get_user', user_id=str(sample_user_service_permission.user.id)), - headers=[header]) - assert response.status_code == 200 - permissions = json.loads(response.get_data(as_text=True))['data']['permissions'] +def test_get_user_with_permissions(admin_request, sample_user_service_permission): + json_resp = admin_request.get( + 'user.get_user', + user_id=str(sample_user_service_permission.user.id), + + ) + permissions = json_resp['data']['permissions'] assert sample_user_service_permission.permission in permissions[str(sample_user_service_permission.service.id)] -def test_set_user_permissions(client, sample_user, sample_service): - data = json.dumps({'permissions': [{'permission': MANAGE_SETTINGS}]}) - header = create_admin_authorization_header() - headers = [('Content-Type', 'application/json'), header] - response = client.post( - url_for( - 'user.set_permissions', - user_id=str(sample_user.id), - service_id=str(sample_service.id)), - headers=headers, - data=data) +def test_set_user_permissions(admin_request, sample_user, sample_service): + admin_request.post( + 'user.set_permissions', + user_id=str(sample_user.id), + service_id=str(sample_service.id), + _data={'permissions': [{'permission': MANAGE_SETTINGS}]}, + _expected_status=204, + ) - assert response.status_code == 204 permission = Permission.query.filter_by(permission=MANAGE_SETTINGS).first() assert permission.user == sample_user assert permission.service == sample_service assert permission.permission == MANAGE_SETTINGS -def test_set_user_permissions_multiple(client, sample_user, sample_service): - data = json.dumps({'permissions': [{'permission': MANAGE_SETTINGS}, {'permission': MANAGE_TEMPLATES}]}) - header = create_admin_authorization_header() - headers = [('Content-Type', 'application/json'), header] - response = client.post( - url_for( - 'user.set_permissions', - user_id=str(sample_user.id), - service_id=str(sample_service.id)), - headers=headers, - data=data) +def test_set_user_permissions_multiple(admin_request, sample_user, sample_service): + data = {'permissions': [{'permission': MANAGE_SETTINGS}, {'permission': MANAGE_TEMPLATES}]} + admin_request.post( + 'user.set_permissions', + user_id=str(sample_user.id), + service_id=str(sample_service.id), + _data=data, + _expected_status=204, + ) - assert response.status_code == 204 permission = Permission.query.filter_by(permission=MANAGE_SETTINGS).first() assert permission.user == sample_user assert permission.service == sample_service @@ -508,38 +466,34 @@ def test_set_user_permissions_multiple(client, sample_user, sample_service): assert permission.permission == MANAGE_TEMPLATES -def test_set_user_permissions_remove_old(client, sample_user, sample_service): - data = json.dumps({'permissions': [{'permission': MANAGE_SETTINGS}]}) - header = create_admin_authorization_header() - headers = [('Content-Type', 'application/json'), header] - response = client.post( - url_for( - 'user.set_permissions', - user_id=str(sample_user.id), - service_id=str(sample_service.id)), - headers=headers, - data=data) +def test_set_user_permissions_remove_old(admin_request, sample_user, sample_service): + data = {'permissions': [{'permission': MANAGE_SETTINGS}]} + + admin_request.post( + 'user.set_permissions', + user_id=str(sample_user.id), + service_id=str(sample_service.id), + _data=data, + _expected_status=204, + ) - assert response.status_code == 204 query = Permission.query.filter_by(user=sample_user) assert query.count() == 1 assert query.first().permission == MANAGE_SETTINGS -def test_set_user_folder_permissions(client, sample_user, sample_service): +def test_set_user_folder_permissions(admin_request, sample_user, sample_service): tf1 = create_template_folder(sample_service) tf2 = create_template_folder(sample_service) - data = json.dumps({'permissions': [], 'folder_permissions': [str(tf1.id), str(tf2.id)]}) + data = {'permissions': [], 'folder_permissions': [str(tf1.id), str(tf2.id)]} - response = client.post( - url_for( - 'user.set_permissions', - user_id=str(sample_user.id), - service_id=str(sample_service.id)), - headers=[('Content-Type', 'application/json'), create_admin_authorization_header()], - data=data) - - assert response.status_code == 204 + admin_request.post( + 'user.set_permissions', + user_id=str(sample_user.id), + service_id=str(sample_service.id), + _data=data, + _expected_status=204, + ) service_user = dao_get_service_user(sample_user.id, sample_service.id) assert len(service_user.folders) == 2 @@ -547,26 +501,24 @@ def test_set_user_folder_permissions(client, sample_user, sample_service): assert tf2 in service_user.folders -def test_set_user_folder_permissions_when_user_does_not_belong_to_service(client, sample_user): +def test_set_user_folder_permissions_when_user_does_not_belong_to_service(admin_request, sample_user): service = create_service() tf1 = create_template_folder(service) tf2 = create_template_folder(service) - data = json.dumps({'permissions': [], 'folder_permissions': [str(tf1.id), str(tf2.id)]}) + data = {'permissions': [], 'folder_permissions': [str(tf1.id), str(tf2.id)]} - response = client.post( - url_for( - 'user.set_permissions', - user_id=str(sample_user.id), - service_id=str(service.id)), - headers=[('Content-Type', 'application/json'), create_admin_authorization_header()], - data=data) - - assert response.status_code == 404 + admin_request.post( + 'user.set_permissions', + user_id=str(sample_user.id), + service_id=str(service.id), + _data=data, + _expected_status=404, + ) def test_set_user_folder_permissions_does_not_affect_permissions_for_other_services( - client, + admin_request, sample_user, sample_service, ): @@ -584,23 +536,21 @@ def test_set_user_folder_permissions_does_not_affect_permissions_for_other_servi service_2_user.folders = [tf3] dao_update_service_user(service_2_user) - data = json.dumps({'permissions': [], 'folder_permissions': [str(tf2.id)]}) + data = {'permissions': [], 'folder_permissions': [str(tf2.id)]} - response = client.post( - url_for( - 'user.set_permissions', - user_id=str(sample_user.id), - service_id=str(sample_service.id)), - headers=[('Content-Type', 'application/json'), create_admin_authorization_header()], - data=data) - - assert response.status_code == 204 + admin_request.post( + 'user.set_permissions', + user_id=str(sample_user.id), + service_id=str(sample_service.id), + _data=data, + _expected_status=204, + ) assert sample_service_user.folders == [tf2] assert service_2_user.folders == [tf3] -def test_update_user_folder_permissions(client, sample_user, sample_service): +def test_update_user_folder_permissions(admin_request, sample_user, sample_service): tf1 = create_template_folder(sample_service) tf2 = create_template_folder(sample_service) tf3 = create_template_folder(sample_service) @@ -609,23 +559,22 @@ def test_update_user_folder_permissions(client, sample_user, sample_service): service_user.folders = [tf1, tf2] dao_update_service_user(service_user) - data = json.dumps({'permissions': [], 'folder_permissions': [str(tf2.id), str(tf3.id)]}) + data = {'permissions': [], 'folder_permissions': [str(tf2.id), str(tf3.id)]} - response = client.post( - url_for( - 'user.set_permissions', - user_id=str(sample_user.id), - service_id=str(sample_service.id)), - headers=[('Content-Type', 'application/json'), create_admin_authorization_header()], - data=data) + admin_request.post( + 'user.set_permissions', + user_id=str(sample_user.id), + service_id=str(sample_service.id), + _data=data, + _expected_status=204, + ) - assert response.status_code == 204 assert len(service_user.folders) == 2 assert tf2 in service_user.folders assert tf3 in service_user.folders -def test_remove_user_folder_permissions(client, sample_user, sample_service): +def test_remove_user_folder_permissions(admin_request, sample_user, sample_service): tf1 = create_template_folder(sample_service) tf2 = create_template_folder(sample_service) @@ -633,35 +582,34 @@ def test_remove_user_folder_permissions(client, sample_user, sample_service): service_user.folders = [tf1, tf2] dao_update_service_user(service_user) - data = json.dumps({'permissions': [], 'folder_permissions': []}) + data = {'permissions': [], 'folder_permissions': []} - response = client.post( - url_for( - 'user.set_permissions', - user_id=str(sample_user.id), - service_id=str(sample_service.id)), - headers=[('Content-Type', 'application/json'), create_admin_authorization_header()], - data=data) + admin_request.post( + 'user.set_permissions', + user_id=str(sample_user.id), + service_id=str(sample_service.id), + _data=data, + _expected_status=204, + ) - assert response.status_code == 204 assert service_user.folders == [] @freeze_time("2016-01-01 11:09:00.061258") -def test_send_user_reset_password_should_send_reset_password_link(client, +def test_send_user_reset_password_should_send_reset_password_link(admin_request, sample_user, mocker, password_reset_email_template): mocked = mocker.patch('app.celery.provider_tasks.deliver_email.apply_async') - data = json.dumps({'email': sample_user.email_address}) - auth_header = create_admin_authorization_header() + data = {'email': sample_user.email_address} notify_service = password_reset_email_template.service - resp = client.post( - url_for('user.send_user_reset_password'), - data=data, - headers=[('Content-Type', 'application/json'), auth_header]) - assert resp.status_code == 204 + admin_request.post( + 'user.send_user_reset_password', + _data=data, + _expected_status=204, + ) + notification = Notification.query.first() mocked.assert_called_once_with([str(notification.id)], queue="notify-internal-tasks") assert notification.reply_to_text == notify_service.get_default_reply_to_email_address() @@ -702,109 +650,112 @@ def test_send_user_reset_password_should_use_provided_base_url( @freeze_time("2016-01-01 11:09:00.061258") def test_send_user_reset_password_reset_password_link_contains_redirect_link_if_present_in_request( - client, sample_user, mocker, password_reset_email_template + admin_request, sample_user, mocker, password_reset_email_template ): mocked = mocker.patch('app.celery.provider_tasks.deliver_email.apply_async') - data = json.dumps({'email': sample_user.email_address, "next": "blob"}) - auth_header = create_admin_authorization_header() - response = client.post( - url_for('user.send_user_reset_password'), - data=data, - headers=[('Content-Type', 'application/json'), auth_header]) + data = {'email': sample_user.email_address, "next": "blob"} + + admin_request.post( + 'user.send_user_reset_password', + _data=data, + _expected_status=204, + ) - assert response.status_code == 204 notification = Notification.query.first() assert "?next=blob" in notification.content mocked.assert_called_once_with([str(notification.id)], queue="notify-internal-tasks") -def test_send_user_reset_password_should_return_400_when_email_is_missing(client, mocker): +def test_send_user_reset_password_should_return_400_when_email_is_missing(admin_request, mocker): mocked = mocker.patch('app.celery.provider_tasks.deliver_email.apply_async') - data = json.dumps({}) - auth_header = create_admin_authorization_header() + data = {} - resp = client.post( - url_for('user.send_user_reset_password'), - data=data, - headers=[('Content-Type', 'application/json'), auth_header]) - - assert resp.status_code == 400 - assert json.loads(resp.get_data(as_text=True))['message'] == {'email': ['Missing data for required field.']} + json_resp = admin_request.post( + 'user.send_user_reset_password', + _data=data, + _expected_status=400, + ) + assert json_resp['message'] == {'email': ['Missing data for required field.']} assert mocked.call_count == 0 -def test_send_user_reset_password_should_return_400_when_user_doesnot_exist(client, mocker): +def test_send_user_reset_password_should_return_400_when_user_doesnot_exist(admin_request, mocker): mocked = mocker.patch('app.celery.provider_tasks.deliver_email.apply_async') bad_email_address = 'bad@email.gov.uk' - data = json.dumps({'email': bad_email_address}) - auth_header = create_admin_authorization_header() + data = {'email': bad_email_address} - resp = client.post( - url_for('user.send_user_reset_password'), - data=data, - headers=[('Content-Type', 'application/json'), auth_header]) + json_resp = admin_request.post( + 'user.send_user_reset_password', + _data=data, + _expected_status=404, + ) - assert resp.status_code == 404 - assert json.loads(resp.get_data(as_text=True))['message'] == 'No result found' + assert json_resp['message'] == 'No result found' assert mocked.call_count == 0 -def test_send_user_reset_password_should_return_400_when_data_is_not_email_address(client, mocker): +def test_send_user_reset_password_should_return_400_when_data_is_not_email_address(admin_request, mocker): mocked = mocker.patch('app.celery.provider_tasks.deliver_email.apply_async') bad_email_address = 'bad.email.gov.uk' - data = json.dumps({'email': bad_email_address}) - auth_header = create_admin_authorization_header() + data = {'email': bad_email_address} - resp = client.post( - url_for('user.send_user_reset_password'), - data=data, - headers=[('Content-Type', 'application/json'), auth_header]) + json_resp = admin_request.post( + 'user.send_user_reset_password', + _data=data, + _expected_status=400, + ) - assert resp.status_code == 400 - assert json.loads(resp.get_data(as_text=True))['message'] == {'email': ['Not a valid email address']} + assert json_resp['message'] == {'email': ['Not a valid email address']} assert mocked.call_count == 0 -def test_send_already_registered_email(client, sample_user, already_registered_template, mocker): - data = json.dumps({'email': sample_user.email_address}) - auth_header = create_admin_authorization_header() +def test_send_already_registered_email(admin_request, sample_user, already_registered_template, mocker): + data = {'email': sample_user.email_address} mocked = mocker.patch('app.celery.provider_tasks.deliver_email.apply_async') notify_service = already_registered_template.service - resp = client.post( - url_for('user.send_already_registered_email', user_id=str(sample_user.id)), - data=data, - headers=[('Content-Type', 'application/json'), auth_header]) - assert resp.status_code == 204 + admin_request.post( + 'user.send_already_registered_email', + user_id=str(sample_user.id), + _data=data, + _expected_status=204, + ) notification = Notification.query.first() mocked.assert_called_once_with(([str(notification.id)]), queue="notify-internal-tasks") assert notification.reply_to_text == notify_service.get_default_reply_to_email_address() -def test_send_already_registered_email_returns_400_when_data_is_missing(client, sample_user): - data = json.dumps({}) - auth_header = create_admin_authorization_header() +def test_send_already_registered_email_returns_400_when_data_is_missing(admin_request, sample_user): + data = {} - resp = client.post( - url_for('user.send_already_registered_email', user_id=str(sample_user.id)), - data=data, - headers=[('Content-Type', 'application/json'), auth_header]) - assert resp.status_code == 400 - assert json.loads(resp.get_data(as_text=True))['message'] == {'email': ['Missing data for required field.']} + json_resp = admin_request.post( + 'user.send_already_registered_email', + user_id=str(sample_user.id), + _data=data, + _expected_status=400, + ) + assert json_resp['message'] == {'email': ['Missing data for required field.']} -def test_send_user_confirm_new_email_returns_204(client, sample_user, change_email_confirmation_template, mocker): +def test_send_user_confirm_new_email_returns_204( + admin_request, + sample_user, + change_email_confirmation_template, + mocker +): mocked = mocker.patch('app.celery.provider_tasks.deliver_email.apply_async') new_email = 'new_address@dig.gov.uk' - data = json.dumps({'email': new_email}) - auth_header = create_admin_authorization_header() + data = {'email': new_email} notify_service = change_email_confirmation_template.service - resp = client.post(url_for('user.send_user_confirm_new_email', user_id=str(sample_user.id)), - data=data, - headers=[('Content-Type', 'application/json'), auth_header]) - assert resp.status_code == 204 + admin_request.post( + 'user.send_user_confirm_new_email', + user_id=str(sample_user.id), + _data=data, + _expected_status=204, + ) + notification = Notification.query.first() mocked.assert_called_once_with( ([str(notification.id)]), @@ -812,41 +763,41 @@ def test_send_user_confirm_new_email_returns_204(client, sample_user, change_ema assert notification.reply_to_text == notify_service.get_default_reply_to_email_address() -def test_send_user_confirm_new_email_returns_400_when_email_missing(client, sample_user, mocker): +def test_send_user_confirm_new_email_returns_400_when_email_missing(admin_request, sample_user, mocker): mocked = mocker.patch('app.celery.provider_tasks.deliver_email.apply_async') - data = json.dumps({}) - auth_header = create_admin_authorization_header() - resp = client.post(url_for('user.send_user_confirm_new_email', user_id=str(sample_user.id)), - data=data, - headers=[('Content-Type', 'application/json'), auth_header]) - assert resp.status_code == 400 - assert json.loads(resp.get_data(as_text=True))['message'] == {'email': ['Missing data for required field.']} + data = {} + + json_resp = admin_request.post( + 'user.send_user_confirm_new_email', + user_id=str(sample_user.id), + _data=data, + _expected_status=400, + ) + assert json_resp['message'] == {'email': ['Missing data for required field.']} mocked.assert_not_called() @freeze_time('2020-02-14T12:00:00') -def test_update_user_password_saves_correctly(client, sample_service): +def test_update_user_password_saves_correctly(admin_request, sample_service): sample_user = sample_service.users[0] new_password = '1234567890' - auth_header = create_admin_authorization_header() - headers = [('Content-Type', 'application/json'), auth_header] data = {'_password': '1234567890'} - resp = client.post( - url_for('user.update_password', user_id=sample_user.id), - data=json.dumps(data), - headers=headers) - assert resp.status_code == 200 - json_resp = json.loads(resp.get_data(as_text=True)) + json_resp = admin_request.post( + 'user.update_password', + user_id=str(sample_user.id), + _data=data + ) + assert json_resp['data']['password_changed_at'] is not None data = {'password': new_password} - auth_header = create_admin_authorization_header() - headers = [('Content-Type', 'application/json'), auth_header] - resp = client.post( - url_for('user.verify_user_password', user_id=str(sample_user.id)), - data=json.dumps(data), - headers=headers) - assert resp.status_code == 204 + + admin_request.post( + 'user.verify_user_password', + user_id=str(sample_user.id), + _data=data, + _expected_status=204 + ) def test_activate_user(admin_request, sample_user): @@ -1088,72 +1039,58 @@ def test_get_orgs_and_services_only_shows_users_orgs_and_services(admin_request, ] -def test_find_users_by_email_finds_user_by_partial_email(notify_db, client): +def test_find_users_by_email_finds_user_by_partial_email(notify_db_session, admin_request): create_user(email='findel.mestro@foo.com') create_user(email='me.ignorra@foo.com') - data = json.dumps({"email": "findel"}) - auth_header = create_admin_authorization_header() + data = {"email": "findel"} - response = client.post( - url_for("user.find_users_by_email"), - data=data, - headers=[('Content-Type', 'application/json'), auth_header] + users = admin_request.post( + "user.find_users_by_email", + _data=data, ) - users = json.loads(response.get_data(as_text=True)) - assert response.status_code == 200 assert len(users['data']) == 1 assert users['data'][0]['email_address'] == 'findel.mestro@foo.com' -def test_find_users_by_email_finds_user_by_full_email(notify_db, client): +def test_find_users_by_email_finds_user_by_full_email(notify_db_session, admin_request): create_user(email='findel.mestro@foo.com') create_user(email='me.ignorra@foo.com') - data = json.dumps({"email": "findel.mestro@foo.com"}) - auth_header = create_admin_authorization_header() + data = {"email": "findel.mestro@foo.com"} - response = client.post( - url_for("user.find_users_by_email"), - data=data, - headers=[('Content-Type', 'application/json'), auth_header] + users = admin_request.post( + "user.find_users_by_email", + _data=data, ) - users = json.loads(response.get_data(as_text=True)) - assert response.status_code == 200 assert len(users['data']) == 1 assert users['data'][0]['email_address'] == 'findel.mestro@foo.com' -def test_find_users_by_email_handles_no_results(notify_db, client): +def test_find_users_by_email_handles_no_results(notify_db_session, admin_request): create_user(email='findel.mestro@foo.com') create_user(email='me.ignorra@foo.com') - data = json.dumps({"email": "rogue"}) - auth_header = create_admin_authorization_header() + data = {"email": "rogue"} - response = client.post( - url_for("user.find_users_by_email"), - data=data, - headers=[('Content-Type', 'application/json'), auth_header] + users = admin_request.post( + "user.find_users_by_email", + _data=data, ) - users = json.loads(response.get_data(as_text=True)) - assert response.status_code == 200 assert users['data'] == [] -def test_search_for_users_by_email_handles_incorrect_data_format(notify_db, client): +def test_search_for_users_by_email_handles_incorrect_data_format(notify_db_session, admin_request): create_user(email='findel.mestro@foo.com') - data = json.dumps({"email": 1}) - auth_header = create_admin_authorization_header() + data = {"email": 1} - response = client.post( - url_for("user.find_users_by_email"), - data=data, - headers=[('Content-Type', 'application/json'), auth_header] + json = admin_request.post( + "user.find_users_by_email", + _data=data, + _expected_status=400 ) - assert response.status_code == 400 - assert json.loads(response.get_data(as_text=True))['message'] == {'email': ['Not a valid string.']} + assert json['message'] == {'email': ['Not a valid string.']} @pytest.mark.parametrize('number, expected_reply_to',