Merge pull request #3531 from alphagov/cleanup-user-tests

clean up user tests
This commit is contained in:
Leo Hemsted
2022-05-04 10:29:19 +01:00
committed by GitHub

View File

@@ -1,10 +1,9 @@
import json
import uuid
from datetime import datetime
from unittest import mock
import pytest
from flask import current_app, url_for
from flask import current_app
from freezegun import freeze_time
from app.dao.permissions_dao import default_service_permissions
@@ -21,7 +20,6 @@ from app.models import (
Permission,
User,
)
from tests import create_admin_authorization_header
from tests.app.db import (
create_organisation,
create_service,
@@ -99,11 +97,11 @@ def test_get_user_doesnt_return_inactive_services_and_orgs(admin_request, sample
assert fetched['permissions'] == {}
def test_post_user(client, notify_db, notify_db_session):
def test_post_user(admin_request, notify_db_session):
"""
Tests POST endpoint '/' to create a user.
"""
assert User.query.count() == 0
User.query.delete()
data = {
"name": "Test User",
"email_address": "user@digital.cabinet-office.gov.uk",
@@ -115,22 +113,16 @@ def test_post_user(client, notify_db, notify_db_session):
"permissions": {},
"auth_type": EMAIL_AUTH_TYPE
}
auth_header = create_admin_authorization_header()
headers = [('Content-Type', 'application/json'), auth_header]
resp = client.post(
url_for('user.create_user'),
data=json.dumps(data),
headers=headers)
assert resp.status_code == 201
json_resp = admin_request.post('user.create_user', _data=data, _expected_status=201)
user = User.query.filter_by(email_address='user@digital.cabinet-office.gov.uk').first()
json_resp = json.loads(resp.get_data(as_text=True))
assert json_resp['data']['email_address'] == user.email_address
assert json_resp['data']['id'] == str(user.id)
assert user.auth_type == EMAIL_AUTH_TYPE
def test_post_user_without_auth_type(admin_request, notify_db_session):
assert User.query.count() == 0
User.query.delete()
data = {
"name": "Test User",
"email_address": "user@digital.cabinet-office.gov.uk",
@@ -146,11 +138,11 @@ def test_post_user_without_auth_type(admin_request, notify_db_session):
assert user.auth_type == SMS_AUTH_TYPE
def test_post_user_missing_attribute_email(client, notify_db, notify_db_session):
def test_post_user_missing_attribute_email(admin_request, notify_db_session):
"""
Tests POST endpoint '/' missing attribute email.
"""
assert User.query.count() == 0
User.query.delete()
data = {
"name": "Test User",
"password": "password",
@@ -160,23 +152,17 @@ def test_post_user_missing_attribute_email(client, notify_db, notify_db_session)
"failed_login_count": 0,
"permissions": {}
}
auth_header = create_admin_authorization_header()
headers = [('Content-Type', 'application/json'), auth_header]
resp = client.post(
url_for('user.create_user'),
data=json.dumps(data),
headers=headers)
assert resp.status_code == 400
json_resp = admin_request.post('user.create_user', _data=data, _expected_status=400)
assert User.query.count() == 0
json_resp = json.loads(resp.get_data(as_text=True))
assert {'email_address': ['Missing data for required field.']} == json_resp['message']
def test_create_user_missing_attribute_password(client, notify_db, notify_db_session):
def test_create_user_missing_attribute_password(admin_request, notify_db_session):
"""
Tests POST endpoint '/' missing attribute password.
"""
assert User.query.count() == 0
User.query.delete()
data = {
"name": "Test User",
"email_address": "user@digital.cabinet-office.gov.uk",
@@ -186,15 +172,8 @@ def test_create_user_missing_attribute_password(client, notify_db, notify_db_ses
"failed_login_count": 0,
"permissions": {}
}
auth_header = create_admin_authorization_header()
headers = [('Content-Type', 'application/json'), auth_header]
resp = client.post(
url_for('user.create_user'),
data=json.dumps(data),
headers=headers)
assert resp.status_code == 400
json_resp = admin_request.post('user.create_user', _data=data, _expected_status=400)
assert User.query.count() == 0
json_resp = json.loads(resp.get_data(as_text=True))
assert {'password': ['Missing data for required field.']} == json_resp['message']
@@ -252,22 +231,20 @@ def test_cannot_create_user_with_empty_strings(admin_request, notify_db_session)
('email_address', 'newuser@mail.com'),
('mobile_number', '+4407700900460')
])
def test_post_user_attribute(client, sample_user, user_attribute, user_value):
def test_post_user_attribute(admin_request, sample_user, user_attribute, user_value):
assert getattr(sample_user, user_attribute) != user_value
update_dict = {
user_attribute: user_value
}
auth_header = create_admin_authorization_header()
headers = [('Content-Type', 'application/json'), auth_header]
resp = client.post(
url_for('user.update_user_attribute', user_id=sample_user.id),
data=json.dumps(update_dict),
headers=headers)
json_resp = admin_request.post(
'user.update_user_attribute',
user_id=sample_user.id,
_data=update_dict
)
assert resp.status_code == 200
json_resp = json.loads(resp.get_data(as_text=True))
assert json_resp['data'][user_attribute] == user_value
assert getattr(sample_user, user_attribute) == user_value
@pytest.mark.parametrize('user_attribute, user_value, arguments', [
@@ -292,7 +269,7 @@ def test_post_user_attribute(client, sample_user, user_attribute, user_value):
))
])
def test_post_user_attribute_with_updated_by(
client, mocker, sample_user, user_attribute,
admin_request, mocker, sample_user, user_attribute,
user_value, arguments, team_member_email_edit_template, team_member_mobile_edit_template
):
updater = create_user(name="Service Manago", email="notify_manago@digital.cabinet-office.gov.uk")
@@ -301,17 +278,13 @@ def test_post_user_attribute_with_updated_by(
user_attribute: user_value,
'updated_by': str(updater.id)
}
auth_header = create_admin_authorization_header()
headers = [('Content-Type', 'application/json'), auth_header]
mock_persist_notification = mocker.patch('app.user.rest.persist_notification')
mocker.patch('app.user.rest.send_notification_to_queue')
resp = client.post(
url_for('user.update_user_attribute', user_id=sample_user.id),
data=json.dumps(update_dict),
headers=headers)
assert resp.status_code == 200, resp.get_data(as_text=True)
json_resp = json.loads(resp.get_data(as_text=True))
json_resp = admin_request.post(
'user.update_user_attribute',
user_id=sample_user.id,
_data=update_dict
)
assert json_resp['data'][user_attribute] == user_value
if arguments:
mock_persist_notification.assert_called_once_with(**arguments)
@@ -320,74 +293,66 @@ def test_post_user_attribute_with_updated_by(
def test_post_user_attribute_with_updated_by_sends_notification_to_international_from_number(
client, mocker, sample_user, team_member_mobile_edit_template
admin_request, mocker, sample_user, team_member_mobile_edit_template
):
updater = create_user(name="Service Manago")
update_dict = {
'mobile_number': '+601117224412',
'updated_by': str(updater.id)
}
auth_header = create_admin_authorization_header()
headers = [('Content-Type', 'application/json'), auth_header]
mocker.patch('app.user.rest.send_notification_to_queue')
resp = client.post(
url_for('user.update_user_attribute', user_id=sample_user.id),
data=json.dumps(update_dict),
headers=headers)
assert resp.status_code == 200, resp.get_data(as_text=True)
admin_request.post(
'user.update_user_attribute',
user_id=sample_user.id,
_data=update_dict
)
notification = Notification.query.first()
assert notification.reply_to_text == current_app.config['NOTIFY_INTERNATIONAL_SMS_SENDER']
def test_archive_user(mocker, client, sample_user):
def test_archive_user(mocker, admin_request, sample_user):
archive_mock = mocker.patch('app.user.rest.dao_archive_user')
response = client.post(
url_for('user.archive_user', user_id=sample_user.id),
headers=[create_admin_authorization_header()]
admin_request.post(
'user.archive_user',
user_id=sample_user.id,
_expected_status=204
)
assert response.status_code == 204
archive_mock.assert_called_once_with(sample_user)
def test_archive_user_when_user_does_not_exist_gives_404(mocker, client, fake_uuid, notify_db_session):
def test_archive_user_when_user_does_not_exist_gives_404(mocker, admin_request, fake_uuid, notify_db_session):
archive_mock = mocker.patch('app.user.rest.dao_archive_user')
response = client.post(
url_for('user.archive_user', user_id=fake_uuid),
headers=[create_admin_authorization_header()]
admin_request.post(
'user.archive_user',
user_id=fake_uuid,
_expected_status=404
)
assert response.status_code == 404
archive_mock.assert_not_called()
def test_archive_user_when_user_cannot_be_archived(mocker, client, sample_user):
def test_archive_user_when_user_cannot_be_archived(mocker, admin_request, sample_user):
mocker.patch('app.dao.users_dao.user_can_be_archived', return_value=False)
response = client.post(
url_for('user.archive_user', user_id=sample_user.id),
headers=[create_admin_authorization_header()]
json_resp = admin_request.post(
'user.archive_user', user_id=sample_user.id,
_expected_status=400
)
json_resp = json.loads(response.get_data(as_text=True))
msg = "User cant be removed from a service - check all services have another team member with manage_settings"
assert response.status_code == 400
assert json_resp['message'] == msg
def test_get_user_by_email(client, sample_service):
def test_get_user_by_email(admin_request, sample_service):
sample_user = sample_service.users[0]
header = create_admin_authorization_header()
url = url_for('user.get_by_email', email=sample_user.email_address)
resp = client.get(url, headers=[header])
assert resp.status_code == 200
json_resp = json.loads(resp.get_data(as_text=True))
json_resp = admin_request.get('user.get_by_email', email=sample_user.email_address)
expected_permissions = default_service_permissions
fetched = json_resp['data']
@@ -399,22 +364,21 @@ def test_get_user_by_email(client, sample_service):
assert sorted(expected_permissions) == sorted(fetched['permissions'][str(sample_service.id)])
def test_get_user_by_email_not_found_returns_404(client, sample_user):
header = create_admin_authorization_header()
url = url_for('user.get_by_email', email='no_user@digital.gov.uk')
resp = client.get(url, headers=[header])
assert resp.status_code == 404
json_resp = json.loads(resp.get_data(as_text=True))
def test_get_user_by_email_not_found_returns_404(admin_request, sample_user):
json_resp = admin_request.get(
'user.get_by_email',
email='no_user@digital.gov.uk',
_expected_status=404
)
assert json_resp['result'] == 'error'
assert json_resp['message'] == 'No result found'
def test_get_user_by_email_bad_url_returns_404(client, sample_user):
header = create_admin_authorization_header()
url = '/user/email'
resp = client.get(url, headers=[header])
assert resp.status_code == 400
json_resp = json.loads(resp.get_data(as_text=True))
def test_get_user_by_email_bad_url_returns_404(admin_request, sample_user):
json_resp = admin_request.get(
'user.get_by_email',
_expected_status=400
)
assert json_resp['result'] == 'error'
assert json_resp['message'] == 'Invalid request. Email query string param required'
@@ -457,47 +421,41 @@ def test_fetch_user_by_email_without_email_returns_400(admin_request, notify_db_
assert resp['message'] == {'email': ['Missing data for required field.']}
def test_get_user_with_permissions(client, sample_user_service_permission):
header = create_admin_authorization_header()
response = client.get(url_for('user.get_user', user_id=str(sample_user_service_permission.user.id)),
headers=[header])
assert response.status_code == 200
permissions = json.loads(response.get_data(as_text=True))['data']['permissions']
def test_get_user_with_permissions(admin_request, sample_user_service_permission):
json_resp = admin_request.get(
'user.get_user',
user_id=str(sample_user_service_permission.user.id),
)
permissions = json_resp['data']['permissions']
assert sample_user_service_permission.permission in permissions[str(sample_user_service_permission.service.id)]
def test_set_user_permissions(client, sample_user, sample_service):
data = json.dumps({'permissions': [{'permission': MANAGE_SETTINGS}]})
header = create_admin_authorization_header()
headers = [('Content-Type', 'application/json'), header]
response = client.post(
url_for(
'user.set_permissions',
user_id=str(sample_user.id),
service_id=str(sample_service.id)),
headers=headers,
data=data)
def test_set_user_permissions(admin_request, sample_user, sample_service):
admin_request.post(
'user.set_permissions',
user_id=str(sample_user.id),
service_id=str(sample_service.id),
_data={'permissions': [{'permission': MANAGE_SETTINGS}]},
_expected_status=204,
)
assert response.status_code == 204
permission = Permission.query.filter_by(permission=MANAGE_SETTINGS).first()
assert permission.user == sample_user
assert permission.service == sample_service
assert permission.permission == MANAGE_SETTINGS
def test_set_user_permissions_multiple(client, sample_user, sample_service):
data = json.dumps({'permissions': [{'permission': MANAGE_SETTINGS}, {'permission': MANAGE_TEMPLATES}]})
header = create_admin_authorization_header()
headers = [('Content-Type', 'application/json'), header]
response = client.post(
url_for(
'user.set_permissions',
user_id=str(sample_user.id),
service_id=str(sample_service.id)),
headers=headers,
data=data)
def test_set_user_permissions_multiple(admin_request, sample_user, sample_service):
data = {'permissions': [{'permission': MANAGE_SETTINGS}, {'permission': MANAGE_TEMPLATES}]}
admin_request.post(
'user.set_permissions',
user_id=str(sample_user.id),
service_id=str(sample_service.id),
_data=data,
_expected_status=204,
)
assert response.status_code == 204
permission = Permission.query.filter_by(permission=MANAGE_SETTINGS).first()
assert permission.user == sample_user
assert permission.service == sample_service
@@ -508,38 +466,34 @@ def test_set_user_permissions_multiple(client, sample_user, sample_service):
assert permission.permission == MANAGE_TEMPLATES
def test_set_user_permissions_remove_old(client, sample_user, sample_service):
data = json.dumps({'permissions': [{'permission': MANAGE_SETTINGS}]})
header = create_admin_authorization_header()
headers = [('Content-Type', 'application/json'), header]
response = client.post(
url_for(
'user.set_permissions',
user_id=str(sample_user.id),
service_id=str(sample_service.id)),
headers=headers,
data=data)
def test_set_user_permissions_remove_old(admin_request, sample_user, sample_service):
data = {'permissions': [{'permission': MANAGE_SETTINGS}]}
admin_request.post(
'user.set_permissions',
user_id=str(sample_user.id),
service_id=str(sample_service.id),
_data=data,
_expected_status=204,
)
assert response.status_code == 204
query = Permission.query.filter_by(user=sample_user)
assert query.count() == 1
assert query.first().permission == MANAGE_SETTINGS
def test_set_user_folder_permissions(client, sample_user, sample_service):
def test_set_user_folder_permissions(admin_request, sample_user, sample_service):
tf1 = create_template_folder(sample_service)
tf2 = create_template_folder(sample_service)
data = json.dumps({'permissions': [], 'folder_permissions': [str(tf1.id), str(tf2.id)]})
data = {'permissions': [], 'folder_permissions': [str(tf1.id), str(tf2.id)]}
response = client.post(
url_for(
'user.set_permissions',
user_id=str(sample_user.id),
service_id=str(sample_service.id)),
headers=[('Content-Type', 'application/json'), create_admin_authorization_header()],
data=data)
assert response.status_code == 204
admin_request.post(
'user.set_permissions',
user_id=str(sample_user.id),
service_id=str(sample_service.id),
_data=data,
_expected_status=204,
)
service_user = dao_get_service_user(sample_user.id, sample_service.id)
assert len(service_user.folders) == 2
@@ -547,26 +501,24 @@ def test_set_user_folder_permissions(client, sample_user, sample_service):
assert tf2 in service_user.folders
def test_set_user_folder_permissions_when_user_does_not_belong_to_service(client, sample_user):
def test_set_user_folder_permissions_when_user_does_not_belong_to_service(admin_request, sample_user):
service = create_service()
tf1 = create_template_folder(service)
tf2 = create_template_folder(service)
data = json.dumps({'permissions': [], 'folder_permissions': [str(tf1.id), str(tf2.id)]})
data = {'permissions': [], 'folder_permissions': [str(tf1.id), str(tf2.id)]}
response = client.post(
url_for(
'user.set_permissions',
user_id=str(sample_user.id),
service_id=str(service.id)),
headers=[('Content-Type', 'application/json'), create_admin_authorization_header()],
data=data)
assert response.status_code == 404
admin_request.post(
'user.set_permissions',
user_id=str(sample_user.id),
service_id=str(service.id),
_data=data,
_expected_status=404,
)
def test_set_user_folder_permissions_does_not_affect_permissions_for_other_services(
client,
admin_request,
sample_user,
sample_service,
):
@@ -584,23 +536,21 @@ def test_set_user_folder_permissions_does_not_affect_permissions_for_other_servi
service_2_user.folders = [tf3]
dao_update_service_user(service_2_user)
data = json.dumps({'permissions': [], 'folder_permissions': [str(tf2.id)]})
data = {'permissions': [], 'folder_permissions': [str(tf2.id)]}
response = client.post(
url_for(
'user.set_permissions',
user_id=str(sample_user.id),
service_id=str(sample_service.id)),
headers=[('Content-Type', 'application/json'), create_admin_authorization_header()],
data=data)
assert response.status_code == 204
admin_request.post(
'user.set_permissions',
user_id=str(sample_user.id),
service_id=str(sample_service.id),
_data=data,
_expected_status=204,
)
assert sample_service_user.folders == [tf2]
assert service_2_user.folders == [tf3]
def test_update_user_folder_permissions(client, sample_user, sample_service):
def test_update_user_folder_permissions(admin_request, sample_user, sample_service):
tf1 = create_template_folder(sample_service)
tf2 = create_template_folder(sample_service)
tf3 = create_template_folder(sample_service)
@@ -609,23 +559,22 @@ def test_update_user_folder_permissions(client, sample_user, sample_service):
service_user.folders = [tf1, tf2]
dao_update_service_user(service_user)
data = json.dumps({'permissions': [], 'folder_permissions': [str(tf2.id), str(tf3.id)]})
data = {'permissions': [], 'folder_permissions': [str(tf2.id), str(tf3.id)]}
response = client.post(
url_for(
'user.set_permissions',
user_id=str(sample_user.id),
service_id=str(sample_service.id)),
headers=[('Content-Type', 'application/json'), create_admin_authorization_header()],
data=data)
admin_request.post(
'user.set_permissions',
user_id=str(sample_user.id),
service_id=str(sample_service.id),
_data=data,
_expected_status=204,
)
assert response.status_code == 204
assert len(service_user.folders) == 2
assert tf2 in service_user.folders
assert tf3 in service_user.folders
def test_remove_user_folder_permissions(client, sample_user, sample_service):
def test_remove_user_folder_permissions(admin_request, sample_user, sample_service):
tf1 = create_template_folder(sample_service)
tf2 = create_template_folder(sample_service)
@@ -633,35 +582,34 @@ def test_remove_user_folder_permissions(client, sample_user, sample_service):
service_user.folders = [tf1, tf2]
dao_update_service_user(service_user)
data = json.dumps({'permissions': [], 'folder_permissions': []})
data = {'permissions': [], 'folder_permissions': []}
response = client.post(
url_for(
'user.set_permissions',
user_id=str(sample_user.id),
service_id=str(sample_service.id)),
headers=[('Content-Type', 'application/json'), create_admin_authorization_header()],
data=data)
admin_request.post(
'user.set_permissions',
user_id=str(sample_user.id),
service_id=str(sample_service.id),
_data=data,
_expected_status=204,
)
assert response.status_code == 204
assert service_user.folders == []
@freeze_time("2016-01-01 11:09:00.061258")
def test_send_user_reset_password_should_send_reset_password_link(client,
def test_send_user_reset_password_should_send_reset_password_link(admin_request,
sample_user,
mocker,
password_reset_email_template):
mocked = mocker.patch('app.celery.provider_tasks.deliver_email.apply_async')
data = json.dumps({'email': sample_user.email_address})
auth_header = create_admin_authorization_header()
data = {'email': sample_user.email_address}
notify_service = password_reset_email_template.service
resp = client.post(
url_for('user.send_user_reset_password'),
data=data,
headers=[('Content-Type', 'application/json'), auth_header])
assert resp.status_code == 204
admin_request.post(
'user.send_user_reset_password',
_data=data,
_expected_status=204,
)
notification = Notification.query.first()
mocked.assert_called_once_with([str(notification.id)], queue="notify-internal-tasks")
assert notification.reply_to_text == notify_service.get_default_reply_to_email_address()
@@ -702,109 +650,112 @@ def test_send_user_reset_password_should_use_provided_base_url(
@freeze_time("2016-01-01 11:09:00.061258")
def test_send_user_reset_password_reset_password_link_contains_redirect_link_if_present_in_request(
client, sample_user, mocker, password_reset_email_template
admin_request, sample_user, mocker, password_reset_email_template
):
mocked = mocker.patch('app.celery.provider_tasks.deliver_email.apply_async')
data = json.dumps({'email': sample_user.email_address, "next": "blob"})
auth_header = create_admin_authorization_header()
response = client.post(
url_for('user.send_user_reset_password'),
data=data,
headers=[('Content-Type', 'application/json'), auth_header])
data = {'email': sample_user.email_address, "next": "blob"}
admin_request.post(
'user.send_user_reset_password',
_data=data,
_expected_status=204,
)
assert response.status_code == 204
notification = Notification.query.first()
assert "?next=blob" in notification.content
mocked.assert_called_once_with([str(notification.id)], queue="notify-internal-tasks")
def test_send_user_reset_password_should_return_400_when_email_is_missing(client, mocker):
def test_send_user_reset_password_should_return_400_when_email_is_missing(admin_request, mocker):
mocked = mocker.patch('app.celery.provider_tasks.deliver_email.apply_async')
data = json.dumps({})
auth_header = create_admin_authorization_header()
data = {}
resp = client.post(
url_for('user.send_user_reset_password'),
data=data,
headers=[('Content-Type', 'application/json'), auth_header])
assert resp.status_code == 400
assert json.loads(resp.get_data(as_text=True))['message'] == {'email': ['Missing data for required field.']}
json_resp = admin_request.post(
'user.send_user_reset_password',
_data=data,
_expected_status=400,
)
assert json_resp['message'] == {'email': ['Missing data for required field.']}
assert mocked.call_count == 0
def test_send_user_reset_password_should_return_400_when_user_doesnot_exist(client, mocker):
def test_send_user_reset_password_should_return_400_when_user_doesnot_exist(admin_request, mocker):
mocked = mocker.patch('app.celery.provider_tasks.deliver_email.apply_async')
bad_email_address = 'bad@email.gov.uk'
data = json.dumps({'email': bad_email_address})
auth_header = create_admin_authorization_header()
data = {'email': bad_email_address}
resp = client.post(
url_for('user.send_user_reset_password'),
data=data,
headers=[('Content-Type', 'application/json'), auth_header])
json_resp = admin_request.post(
'user.send_user_reset_password',
_data=data,
_expected_status=404,
)
assert resp.status_code == 404
assert json.loads(resp.get_data(as_text=True))['message'] == 'No result found'
assert json_resp['message'] == 'No result found'
assert mocked.call_count == 0
def test_send_user_reset_password_should_return_400_when_data_is_not_email_address(client, mocker):
def test_send_user_reset_password_should_return_400_when_data_is_not_email_address(admin_request, mocker):
mocked = mocker.patch('app.celery.provider_tasks.deliver_email.apply_async')
bad_email_address = 'bad.email.gov.uk'
data = json.dumps({'email': bad_email_address})
auth_header = create_admin_authorization_header()
data = {'email': bad_email_address}
resp = client.post(
url_for('user.send_user_reset_password'),
data=data,
headers=[('Content-Type', 'application/json'), auth_header])
json_resp = admin_request.post(
'user.send_user_reset_password',
_data=data,
_expected_status=400,
)
assert resp.status_code == 400
assert json.loads(resp.get_data(as_text=True))['message'] == {'email': ['Not a valid email address']}
assert json_resp['message'] == {'email': ['Not a valid email address']}
assert mocked.call_count == 0
def test_send_already_registered_email(client, sample_user, already_registered_template, mocker):
data = json.dumps({'email': sample_user.email_address})
auth_header = create_admin_authorization_header()
def test_send_already_registered_email(admin_request, sample_user, already_registered_template, mocker):
data = {'email': sample_user.email_address}
mocked = mocker.patch('app.celery.provider_tasks.deliver_email.apply_async')
notify_service = already_registered_template.service
resp = client.post(
url_for('user.send_already_registered_email', user_id=str(sample_user.id)),
data=data,
headers=[('Content-Type', 'application/json'), auth_header])
assert resp.status_code == 204
admin_request.post(
'user.send_already_registered_email',
user_id=str(sample_user.id),
_data=data,
_expected_status=204,
)
notification = Notification.query.first()
mocked.assert_called_once_with(([str(notification.id)]), queue="notify-internal-tasks")
assert notification.reply_to_text == notify_service.get_default_reply_to_email_address()
def test_send_already_registered_email_returns_400_when_data_is_missing(client, sample_user):
data = json.dumps({})
auth_header = create_admin_authorization_header()
def test_send_already_registered_email_returns_400_when_data_is_missing(admin_request, sample_user):
data = {}
resp = client.post(
url_for('user.send_already_registered_email', user_id=str(sample_user.id)),
data=data,
headers=[('Content-Type', 'application/json'), auth_header])
assert resp.status_code == 400
assert json.loads(resp.get_data(as_text=True))['message'] == {'email': ['Missing data for required field.']}
json_resp = admin_request.post(
'user.send_already_registered_email',
user_id=str(sample_user.id),
_data=data,
_expected_status=400,
)
assert json_resp['message'] == {'email': ['Missing data for required field.']}
def test_send_user_confirm_new_email_returns_204(client, sample_user, change_email_confirmation_template, mocker):
def test_send_user_confirm_new_email_returns_204(
admin_request,
sample_user,
change_email_confirmation_template,
mocker
):
mocked = mocker.patch('app.celery.provider_tasks.deliver_email.apply_async')
new_email = 'new_address@dig.gov.uk'
data = json.dumps({'email': new_email})
auth_header = create_admin_authorization_header()
data = {'email': new_email}
notify_service = change_email_confirmation_template.service
resp = client.post(url_for('user.send_user_confirm_new_email', user_id=str(sample_user.id)),
data=data,
headers=[('Content-Type', 'application/json'), auth_header])
assert resp.status_code == 204
admin_request.post(
'user.send_user_confirm_new_email',
user_id=str(sample_user.id),
_data=data,
_expected_status=204,
)
notification = Notification.query.first()
mocked.assert_called_once_with(
([str(notification.id)]),
@@ -812,41 +763,41 @@ def test_send_user_confirm_new_email_returns_204(client, sample_user, change_ema
assert notification.reply_to_text == notify_service.get_default_reply_to_email_address()
def test_send_user_confirm_new_email_returns_400_when_email_missing(client, sample_user, mocker):
def test_send_user_confirm_new_email_returns_400_when_email_missing(admin_request, sample_user, mocker):
mocked = mocker.patch('app.celery.provider_tasks.deliver_email.apply_async')
data = json.dumps({})
auth_header = create_admin_authorization_header()
resp = client.post(url_for('user.send_user_confirm_new_email', user_id=str(sample_user.id)),
data=data,
headers=[('Content-Type', 'application/json'), auth_header])
assert resp.status_code == 400
assert json.loads(resp.get_data(as_text=True))['message'] == {'email': ['Missing data for required field.']}
data = {}
json_resp = admin_request.post(
'user.send_user_confirm_new_email',
user_id=str(sample_user.id),
_data=data,
_expected_status=400,
)
assert json_resp['message'] == {'email': ['Missing data for required field.']}
mocked.assert_not_called()
@freeze_time('2020-02-14T12:00:00')
def test_update_user_password_saves_correctly(client, sample_service):
def test_update_user_password_saves_correctly(admin_request, sample_service):
sample_user = sample_service.users[0]
new_password = '1234567890'
auth_header = create_admin_authorization_header()
headers = [('Content-Type', 'application/json'), auth_header]
data = {'_password': '1234567890'}
resp = client.post(
url_for('user.update_password', user_id=sample_user.id),
data=json.dumps(data),
headers=headers)
assert resp.status_code == 200
json_resp = json.loads(resp.get_data(as_text=True))
json_resp = admin_request.post(
'user.update_password',
user_id=str(sample_user.id),
_data=data
)
assert json_resp['data']['password_changed_at'] is not None
data = {'password': new_password}
auth_header = create_admin_authorization_header()
headers = [('Content-Type', 'application/json'), auth_header]
resp = client.post(
url_for('user.verify_user_password', user_id=str(sample_user.id)),
data=json.dumps(data),
headers=headers)
assert resp.status_code == 204
admin_request.post(
'user.verify_user_password',
user_id=str(sample_user.id),
_data=data,
_expected_status=204
)
def test_activate_user(admin_request, sample_user):
@@ -1088,72 +1039,58 @@ def test_get_orgs_and_services_only_shows_users_orgs_and_services(admin_request,
]
def test_find_users_by_email_finds_user_by_partial_email(notify_db, client):
def test_find_users_by_email_finds_user_by_partial_email(notify_db_session, admin_request):
create_user(email='findel.mestro@foo.com')
create_user(email='me.ignorra@foo.com')
data = json.dumps({"email": "findel"})
auth_header = create_admin_authorization_header()
data = {"email": "findel"}
response = client.post(
url_for("user.find_users_by_email"),
data=data,
headers=[('Content-Type', 'application/json'), auth_header]
users = admin_request.post(
"user.find_users_by_email",
_data=data,
)
users = json.loads(response.get_data(as_text=True))
assert response.status_code == 200
assert len(users['data']) == 1
assert users['data'][0]['email_address'] == 'findel.mestro@foo.com'
def test_find_users_by_email_finds_user_by_full_email(notify_db, client):
def test_find_users_by_email_finds_user_by_full_email(notify_db_session, admin_request):
create_user(email='findel.mestro@foo.com')
create_user(email='me.ignorra@foo.com')
data = json.dumps({"email": "findel.mestro@foo.com"})
auth_header = create_admin_authorization_header()
data = {"email": "findel.mestro@foo.com"}
response = client.post(
url_for("user.find_users_by_email"),
data=data,
headers=[('Content-Type', 'application/json'), auth_header]
users = admin_request.post(
"user.find_users_by_email",
_data=data,
)
users = json.loads(response.get_data(as_text=True))
assert response.status_code == 200
assert len(users['data']) == 1
assert users['data'][0]['email_address'] == 'findel.mestro@foo.com'
def test_find_users_by_email_handles_no_results(notify_db, client):
def test_find_users_by_email_handles_no_results(notify_db_session, admin_request):
create_user(email='findel.mestro@foo.com')
create_user(email='me.ignorra@foo.com')
data = json.dumps({"email": "rogue"})
auth_header = create_admin_authorization_header()
data = {"email": "rogue"}
response = client.post(
url_for("user.find_users_by_email"),
data=data,
headers=[('Content-Type', 'application/json'), auth_header]
users = admin_request.post(
"user.find_users_by_email",
_data=data,
)
users = json.loads(response.get_data(as_text=True))
assert response.status_code == 200
assert users['data'] == []
def test_search_for_users_by_email_handles_incorrect_data_format(notify_db, client):
def test_search_for_users_by_email_handles_incorrect_data_format(notify_db_session, admin_request):
create_user(email='findel.mestro@foo.com')
data = json.dumps({"email": 1})
auth_header = create_admin_authorization_header()
data = {"email": 1}
response = client.post(
url_for("user.find_users_by_email"),
data=data,
headers=[('Content-Type', 'application/json'), auth_header]
json = admin_request.post(
"user.find_users_by_email",
_data=data,
_expected_status=400
)
assert response.status_code == 400
assert json.loads(response.get_data(as_text=True))['message'] == {'email': ['Not a valid string.']}
assert json['message'] == {'email': ['Not a valid string.']}
@pytest.mark.parametrize('number, expected_reply_to',