mirror of
https://github.com/GSA/notifications-api.git
synced 2026-01-31 23:26:23 -05:00
clean up user tests
an API build failed because one of the tests expected the database to be empty, but it actually wasn't. This is probably because another test run earlier on that worker did not clear down properly. I took the opportunity to refresh all of these tests to ensure they all correctly tear down, and also use the more modern admin_request fixture ratehr than the old client one that required us to specify headers and do json parsing etc.
This commit is contained in:
@@ -1,10 +1,9 @@
|
||||
import json
|
||||
import uuid
|
||||
from datetime import datetime
|
||||
from unittest import mock
|
||||
|
||||
import pytest
|
||||
from flask import current_app, url_for
|
||||
from flask import current_app
|
||||
from freezegun import freeze_time
|
||||
|
||||
from app.dao.permissions_dao import default_service_permissions
|
||||
@@ -21,7 +20,6 @@ from app.models import (
|
||||
Permission,
|
||||
User,
|
||||
)
|
||||
from tests import create_admin_authorization_header
|
||||
from tests.app.db import (
|
||||
create_organisation,
|
||||
create_service,
|
||||
@@ -99,11 +97,11 @@ def test_get_user_doesnt_return_inactive_services_and_orgs(admin_request, sample
|
||||
assert fetched['permissions'] == {}
|
||||
|
||||
|
||||
def test_post_user(client, notify_db, notify_db_session):
|
||||
def test_post_user(admin_request, notify_db_session):
|
||||
"""
|
||||
Tests POST endpoint '/' to create a user.
|
||||
"""
|
||||
assert User.query.count() == 0
|
||||
User.query.delete()
|
||||
data = {
|
||||
"name": "Test User",
|
||||
"email_address": "user@digital.cabinet-office.gov.uk",
|
||||
@@ -115,22 +113,16 @@ def test_post_user(client, notify_db, notify_db_session):
|
||||
"permissions": {},
|
||||
"auth_type": EMAIL_AUTH_TYPE
|
||||
}
|
||||
auth_header = create_admin_authorization_header()
|
||||
headers = [('Content-Type', 'application/json'), auth_header]
|
||||
resp = client.post(
|
||||
url_for('user.create_user'),
|
||||
data=json.dumps(data),
|
||||
headers=headers)
|
||||
assert resp.status_code == 201
|
||||
json_resp = admin_request.post('user.create_user', _data=data, _expected_status=201)
|
||||
|
||||
user = User.query.filter_by(email_address='user@digital.cabinet-office.gov.uk').first()
|
||||
json_resp = json.loads(resp.get_data(as_text=True))
|
||||
assert json_resp['data']['email_address'] == user.email_address
|
||||
assert json_resp['data']['id'] == str(user.id)
|
||||
assert user.auth_type == EMAIL_AUTH_TYPE
|
||||
|
||||
|
||||
def test_post_user_without_auth_type(admin_request, notify_db_session):
|
||||
assert User.query.count() == 0
|
||||
User.query.delete()
|
||||
data = {
|
||||
"name": "Test User",
|
||||
"email_address": "user@digital.cabinet-office.gov.uk",
|
||||
@@ -146,11 +138,11 @@ def test_post_user_without_auth_type(admin_request, notify_db_session):
|
||||
assert user.auth_type == SMS_AUTH_TYPE
|
||||
|
||||
|
||||
def test_post_user_missing_attribute_email(client, notify_db, notify_db_session):
|
||||
def test_post_user_missing_attribute_email(admin_request, notify_db_session):
|
||||
"""
|
||||
Tests POST endpoint '/' missing attribute email.
|
||||
"""
|
||||
assert User.query.count() == 0
|
||||
User.query.delete()
|
||||
data = {
|
||||
"name": "Test User",
|
||||
"password": "password",
|
||||
@@ -160,23 +152,17 @@ def test_post_user_missing_attribute_email(client, notify_db, notify_db_session)
|
||||
"failed_login_count": 0,
|
||||
"permissions": {}
|
||||
}
|
||||
auth_header = create_admin_authorization_header()
|
||||
headers = [('Content-Type', 'application/json'), auth_header]
|
||||
resp = client.post(
|
||||
url_for('user.create_user'),
|
||||
data=json.dumps(data),
|
||||
headers=headers)
|
||||
assert resp.status_code == 400
|
||||
json_resp = admin_request.post('user.create_user', _data=data, _expected_status=400)
|
||||
|
||||
assert User.query.count() == 0
|
||||
json_resp = json.loads(resp.get_data(as_text=True))
|
||||
assert {'email_address': ['Missing data for required field.']} == json_resp['message']
|
||||
|
||||
|
||||
def test_create_user_missing_attribute_password(client, notify_db, notify_db_session):
|
||||
def test_create_user_missing_attribute_password(admin_request, notify_db_session):
|
||||
"""
|
||||
Tests POST endpoint '/' missing attribute password.
|
||||
"""
|
||||
assert User.query.count() == 0
|
||||
User.query.delete()
|
||||
data = {
|
||||
"name": "Test User",
|
||||
"email_address": "user@digital.cabinet-office.gov.uk",
|
||||
@@ -186,15 +172,8 @@ def test_create_user_missing_attribute_password(client, notify_db, notify_db_ses
|
||||
"failed_login_count": 0,
|
||||
"permissions": {}
|
||||
}
|
||||
auth_header = create_admin_authorization_header()
|
||||
headers = [('Content-Type', 'application/json'), auth_header]
|
||||
resp = client.post(
|
||||
url_for('user.create_user'),
|
||||
data=json.dumps(data),
|
||||
headers=headers)
|
||||
assert resp.status_code == 400
|
||||
json_resp = admin_request.post('user.create_user', _data=data, _expected_status=400)
|
||||
assert User.query.count() == 0
|
||||
json_resp = json.loads(resp.get_data(as_text=True))
|
||||
assert {'password': ['Missing data for required field.']} == json_resp['message']
|
||||
|
||||
|
||||
@@ -252,22 +231,20 @@ def test_cannot_create_user_with_empty_strings(admin_request, notify_db_session)
|
||||
('email_address', 'newuser@mail.com'),
|
||||
('mobile_number', '+4407700900460')
|
||||
])
|
||||
def test_post_user_attribute(client, sample_user, user_attribute, user_value):
|
||||
def test_post_user_attribute(admin_request, sample_user, user_attribute, user_value):
|
||||
assert getattr(sample_user, user_attribute) != user_value
|
||||
update_dict = {
|
||||
user_attribute: user_value
|
||||
}
|
||||
auth_header = create_admin_authorization_header()
|
||||
headers = [('Content-Type', 'application/json'), auth_header]
|
||||
|
||||
resp = client.post(
|
||||
url_for('user.update_user_attribute', user_id=sample_user.id),
|
||||
data=json.dumps(update_dict),
|
||||
headers=headers)
|
||||
json_resp = admin_request.post(
|
||||
'user.update_user_attribute',
|
||||
user_id=sample_user.id,
|
||||
_data=update_dict
|
||||
)
|
||||
|
||||
assert resp.status_code == 200
|
||||
json_resp = json.loads(resp.get_data(as_text=True))
|
||||
assert json_resp['data'][user_attribute] == user_value
|
||||
assert getattr(sample_user, user_attribute) == user_value
|
||||
|
||||
|
||||
@pytest.mark.parametrize('user_attribute, user_value, arguments', [
|
||||
@@ -292,7 +269,7 @@ def test_post_user_attribute(client, sample_user, user_attribute, user_value):
|
||||
))
|
||||
])
|
||||
def test_post_user_attribute_with_updated_by(
|
||||
client, mocker, sample_user, user_attribute,
|
||||
admin_request, mocker, sample_user, user_attribute,
|
||||
user_value, arguments, team_member_email_edit_template, team_member_mobile_edit_template
|
||||
):
|
||||
updater = create_user(name="Service Manago", email="notify_manago@digital.cabinet-office.gov.uk")
|
||||
@@ -301,17 +278,13 @@ def test_post_user_attribute_with_updated_by(
|
||||
user_attribute: user_value,
|
||||
'updated_by': str(updater.id)
|
||||
}
|
||||
auth_header = create_admin_authorization_header()
|
||||
headers = [('Content-Type', 'application/json'), auth_header]
|
||||
mock_persist_notification = mocker.patch('app.user.rest.persist_notification')
|
||||
mocker.patch('app.user.rest.send_notification_to_queue')
|
||||
resp = client.post(
|
||||
url_for('user.update_user_attribute', user_id=sample_user.id),
|
||||
data=json.dumps(update_dict),
|
||||
headers=headers)
|
||||
|
||||
assert resp.status_code == 200, resp.get_data(as_text=True)
|
||||
json_resp = json.loads(resp.get_data(as_text=True))
|
||||
json_resp = admin_request.post(
|
||||
'user.update_user_attribute',
|
||||
user_id=sample_user.id,
|
||||
_data=update_dict
|
||||
)
|
||||
assert json_resp['data'][user_attribute] == user_value
|
||||
if arguments:
|
||||
mock_persist_notification.assert_called_once_with(**arguments)
|
||||
@@ -320,74 +293,66 @@ def test_post_user_attribute_with_updated_by(
|
||||
|
||||
|
||||
def test_post_user_attribute_with_updated_by_sends_notification_to_international_from_number(
|
||||
client, mocker, sample_user, team_member_mobile_edit_template
|
||||
admin_request, mocker, sample_user, team_member_mobile_edit_template
|
||||
):
|
||||
updater = create_user(name="Service Manago")
|
||||
update_dict = {
|
||||
'mobile_number': '+601117224412',
|
||||
'updated_by': str(updater.id)
|
||||
}
|
||||
auth_header = create_admin_authorization_header()
|
||||
headers = [('Content-Type', 'application/json'), auth_header]
|
||||
mocker.patch('app.user.rest.send_notification_to_queue')
|
||||
resp = client.post(
|
||||
url_for('user.update_user_attribute', user_id=sample_user.id),
|
||||
data=json.dumps(update_dict),
|
||||
headers=headers)
|
||||
|
||||
assert resp.status_code == 200, resp.get_data(as_text=True)
|
||||
admin_request.post(
|
||||
'user.update_user_attribute',
|
||||
user_id=sample_user.id,
|
||||
_data=update_dict
|
||||
)
|
||||
|
||||
notification = Notification.query.first()
|
||||
assert notification.reply_to_text == current_app.config['NOTIFY_INTERNATIONAL_SMS_SENDER']
|
||||
|
||||
|
||||
def test_archive_user(mocker, client, sample_user):
|
||||
def test_archive_user(mocker, admin_request, sample_user):
|
||||
archive_mock = mocker.patch('app.user.rest.dao_archive_user')
|
||||
|
||||
response = client.post(
|
||||
url_for('user.archive_user', user_id=sample_user.id),
|
||||
headers=[create_admin_authorization_header()]
|
||||
admin_request.post(
|
||||
'user.archive_user',
|
||||
user_id=sample_user.id,
|
||||
_expected_status=204
|
||||
)
|
||||
|
||||
assert response.status_code == 204
|
||||
archive_mock.assert_called_once_with(sample_user)
|
||||
|
||||
|
||||
def test_archive_user_when_user_does_not_exist_gives_404(mocker, client, fake_uuid, notify_db_session):
|
||||
def test_archive_user_when_user_does_not_exist_gives_404(mocker, admin_request, fake_uuid, notify_db_session):
|
||||
archive_mock = mocker.patch('app.user.rest.dao_archive_user')
|
||||
|
||||
response = client.post(
|
||||
url_for('user.archive_user', user_id=fake_uuid),
|
||||
headers=[create_admin_authorization_header()]
|
||||
admin_request.post(
|
||||
'user.archive_user',
|
||||
user_id=fake_uuid,
|
||||
_expected_status=404
|
||||
)
|
||||
|
||||
assert response.status_code == 404
|
||||
archive_mock.assert_not_called()
|
||||
|
||||
|
||||
def test_archive_user_when_user_cannot_be_archived(mocker, client, sample_user):
|
||||
def test_archive_user_when_user_cannot_be_archived(mocker, admin_request, sample_user):
|
||||
mocker.patch('app.dao.users_dao.user_can_be_archived', return_value=False)
|
||||
|
||||
response = client.post(
|
||||
url_for('user.archive_user', user_id=sample_user.id),
|
||||
headers=[create_admin_authorization_header()]
|
||||
json_resp = admin_request.post(
|
||||
'user.archive_user', user_id=sample_user.id,
|
||||
_expected_status=400
|
||||
)
|
||||
json_resp = json.loads(response.get_data(as_text=True))
|
||||
|
||||
msg = "User can’t be removed from a service - check all services have another team member with manage_settings"
|
||||
|
||||
assert response.status_code == 400
|
||||
assert json_resp['message'] == msg
|
||||
|
||||
|
||||
def test_get_user_by_email(client, sample_service):
|
||||
def test_get_user_by_email(admin_request, sample_service):
|
||||
sample_user = sample_service.users[0]
|
||||
header = create_admin_authorization_header()
|
||||
url = url_for('user.get_by_email', email=sample_user.email_address)
|
||||
resp = client.get(url, headers=[header])
|
||||
assert resp.status_code == 200
|
||||
|
||||
json_resp = json.loads(resp.get_data(as_text=True))
|
||||
json_resp = admin_request.get('user.get_by_email', email=sample_user.email_address)
|
||||
|
||||
expected_permissions = default_service_permissions
|
||||
fetched = json_resp['data']
|
||||
|
||||
@@ -399,22 +364,21 @@ def test_get_user_by_email(client, sample_service):
|
||||
assert sorted(expected_permissions) == sorted(fetched['permissions'][str(sample_service.id)])
|
||||
|
||||
|
||||
def test_get_user_by_email_not_found_returns_404(client, sample_user):
|
||||
header = create_admin_authorization_header()
|
||||
url = url_for('user.get_by_email', email='no_user@digital.gov.uk')
|
||||
resp = client.get(url, headers=[header])
|
||||
assert resp.status_code == 404
|
||||
json_resp = json.loads(resp.get_data(as_text=True))
|
||||
def test_get_user_by_email_not_found_returns_404(admin_request, sample_user):
|
||||
json_resp = admin_request.get(
|
||||
'user.get_by_email',
|
||||
email='no_user@digital.gov.uk',
|
||||
_expected_status=404
|
||||
)
|
||||
assert json_resp['result'] == 'error'
|
||||
assert json_resp['message'] == 'No result found'
|
||||
|
||||
|
||||
def test_get_user_by_email_bad_url_returns_404(client, sample_user):
|
||||
header = create_admin_authorization_header()
|
||||
url = '/user/email'
|
||||
resp = client.get(url, headers=[header])
|
||||
assert resp.status_code == 400
|
||||
json_resp = json.loads(resp.get_data(as_text=True))
|
||||
def test_get_user_by_email_bad_url_returns_404(admin_request, sample_user):
|
||||
json_resp = admin_request.get(
|
||||
'user.get_by_email',
|
||||
_expected_status=400
|
||||
)
|
||||
assert json_resp['result'] == 'error'
|
||||
assert json_resp['message'] == 'Invalid request. Email query string param required'
|
||||
|
||||
@@ -457,47 +421,41 @@ def test_fetch_user_by_email_without_email_returns_400(admin_request, notify_db_
|
||||
assert resp['message'] == {'email': ['Missing data for required field.']}
|
||||
|
||||
|
||||
def test_get_user_with_permissions(client, sample_user_service_permission):
|
||||
header = create_admin_authorization_header()
|
||||
response = client.get(url_for('user.get_user', user_id=str(sample_user_service_permission.user.id)),
|
||||
headers=[header])
|
||||
assert response.status_code == 200
|
||||
permissions = json.loads(response.get_data(as_text=True))['data']['permissions']
|
||||
def test_get_user_with_permissions(admin_request, sample_user_service_permission):
|
||||
json_resp = admin_request.get(
|
||||
'user.get_user',
|
||||
user_id=str(sample_user_service_permission.user.id),
|
||||
|
||||
)
|
||||
permissions = json_resp['data']['permissions']
|
||||
assert sample_user_service_permission.permission in permissions[str(sample_user_service_permission.service.id)]
|
||||
|
||||
|
||||
def test_set_user_permissions(client, sample_user, sample_service):
|
||||
data = json.dumps({'permissions': [{'permission': MANAGE_SETTINGS}]})
|
||||
header = create_admin_authorization_header()
|
||||
headers = [('Content-Type', 'application/json'), header]
|
||||
response = client.post(
|
||||
url_for(
|
||||
'user.set_permissions',
|
||||
user_id=str(sample_user.id),
|
||||
service_id=str(sample_service.id)),
|
||||
headers=headers,
|
||||
data=data)
|
||||
def test_set_user_permissions(admin_request, sample_user, sample_service):
|
||||
admin_request.post(
|
||||
'user.set_permissions',
|
||||
user_id=str(sample_user.id),
|
||||
service_id=str(sample_service.id),
|
||||
_data={'permissions': [{'permission': MANAGE_SETTINGS}]},
|
||||
_expected_status=204,
|
||||
)
|
||||
|
||||
assert response.status_code == 204
|
||||
permission = Permission.query.filter_by(permission=MANAGE_SETTINGS).first()
|
||||
assert permission.user == sample_user
|
||||
assert permission.service == sample_service
|
||||
assert permission.permission == MANAGE_SETTINGS
|
||||
|
||||
|
||||
def test_set_user_permissions_multiple(client, sample_user, sample_service):
|
||||
data = json.dumps({'permissions': [{'permission': MANAGE_SETTINGS}, {'permission': MANAGE_TEMPLATES}]})
|
||||
header = create_admin_authorization_header()
|
||||
headers = [('Content-Type', 'application/json'), header]
|
||||
response = client.post(
|
||||
url_for(
|
||||
'user.set_permissions',
|
||||
user_id=str(sample_user.id),
|
||||
service_id=str(sample_service.id)),
|
||||
headers=headers,
|
||||
data=data)
|
||||
def test_set_user_permissions_multiple(admin_request, sample_user, sample_service):
|
||||
data = {'permissions': [{'permission': MANAGE_SETTINGS}, {'permission': MANAGE_TEMPLATES}]}
|
||||
admin_request.post(
|
||||
'user.set_permissions',
|
||||
user_id=str(sample_user.id),
|
||||
service_id=str(sample_service.id),
|
||||
_data=data,
|
||||
_expected_status=204,
|
||||
)
|
||||
|
||||
assert response.status_code == 204
|
||||
permission = Permission.query.filter_by(permission=MANAGE_SETTINGS).first()
|
||||
assert permission.user == sample_user
|
||||
assert permission.service == sample_service
|
||||
@@ -508,38 +466,34 @@ def test_set_user_permissions_multiple(client, sample_user, sample_service):
|
||||
assert permission.permission == MANAGE_TEMPLATES
|
||||
|
||||
|
||||
def test_set_user_permissions_remove_old(client, sample_user, sample_service):
|
||||
data = json.dumps({'permissions': [{'permission': MANAGE_SETTINGS}]})
|
||||
header = create_admin_authorization_header()
|
||||
headers = [('Content-Type', 'application/json'), header]
|
||||
response = client.post(
|
||||
url_for(
|
||||
'user.set_permissions',
|
||||
user_id=str(sample_user.id),
|
||||
service_id=str(sample_service.id)),
|
||||
headers=headers,
|
||||
data=data)
|
||||
def test_set_user_permissions_remove_old(admin_request, sample_user, sample_service):
|
||||
data = {'permissions': [{'permission': MANAGE_SETTINGS}]}
|
||||
|
||||
admin_request.post(
|
||||
'user.set_permissions',
|
||||
user_id=str(sample_user.id),
|
||||
service_id=str(sample_service.id),
|
||||
_data=data,
|
||||
_expected_status=204,
|
||||
)
|
||||
|
||||
assert response.status_code == 204
|
||||
query = Permission.query.filter_by(user=sample_user)
|
||||
assert query.count() == 1
|
||||
assert query.first().permission == MANAGE_SETTINGS
|
||||
|
||||
|
||||
def test_set_user_folder_permissions(client, sample_user, sample_service):
|
||||
def test_set_user_folder_permissions(admin_request, sample_user, sample_service):
|
||||
tf1 = create_template_folder(sample_service)
|
||||
tf2 = create_template_folder(sample_service)
|
||||
data = json.dumps({'permissions': [], 'folder_permissions': [str(tf1.id), str(tf2.id)]})
|
||||
data = {'permissions': [], 'folder_permissions': [str(tf1.id), str(tf2.id)]}
|
||||
|
||||
response = client.post(
|
||||
url_for(
|
||||
'user.set_permissions',
|
||||
user_id=str(sample_user.id),
|
||||
service_id=str(sample_service.id)),
|
||||
headers=[('Content-Type', 'application/json'), create_admin_authorization_header()],
|
||||
data=data)
|
||||
|
||||
assert response.status_code == 204
|
||||
admin_request.post(
|
||||
'user.set_permissions',
|
||||
user_id=str(sample_user.id),
|
||||
service_id=str(sample_service.id),
|
||||
_data=data,
|
||||
_expected_status=204,
|
||||
)
|
||||
|
||||
service_user = dao_get_service_user(sample_user.id, sample_service.id)
|
||||
assert len(service_user.folders) == 2
|
||||
@@ -547,26 +501,24 @@ def test_set_user_folder_permissions(client, sample_user, sample_service):
|
||||
assert tf2 in service_user.folders
|
||||
|
||||
|
||||
def test_set_user_folder_permissions_when_user_does_not_belong_to_service(client, sample_user):
|
||||
def test_set_user_folder_permissions_when_user_does_not_belong_to_service(admin_request, sample_user):
|
||||
service = create_service()
|
||||
tf1 = create_template_folder(service)
|
||||
tf2 = create_template_folder(service)
|
||||
|
||||
data = json.dumps({'permissions': [], 'folder_permissions': [str(tf1.id), str(tf2.id)]})
|
||||
data = {'permissions': [], 'folder_permissions': [str(tf1.id), str(tf2.id)]}
|
||||
|
||||
response = client.post(
|
||||
url_for(
|
||||
'user.set_permissions',
|
||||
user_id=str(sample_user.id),
|
||||
service_id=str(service.id)),
|
||||
headers=[('Content-Type', 'application/json'), create_admin_authorization_header()],
|
||||
data=data)
|
||||
|
||||
assert response.status_code == 404
|
||||
admin_request.post(
|
||||
'user.set_permissions',
|
||||
user_id=str(sample_user.id),
|
||||
service_id=str(service.id),
|
||||
_data=data,
|
||||
_expected_status=404,
|
||||
)
|
||||
|
||||
|
||||
def test_set_user_folder_permissions_does_not_affect_permissions_for_other_services(
|
||||
client,
|
||||
admin_request,
|
||||
sample_user,
|
||||
sample_service,
|
||||
):
|
||||
@@ -584,23 +536,21 @@ def test_set_user_folder_permissions_does_not_affect_permissions_for_other_servi
|
||||
service_2_user.folders = [tf3]
|
||||
dao_update_service_user(service_2_user)
|
||||
|
||||
data = json.dumps({'permissions': [], 'folder_permissions': [str(tf2.id)]})
|
||||
data = {'permissions': [], 'folder_permissions': [str(tf2.id)]}
|
||||
|
||||
response = client.post(
|
||||
url_for(
|
||||
'user.set_permissions',
|
||||
user_id=str(sample_user.id),
|
||||
service_id=str(sample_service.id)),
|
||||
headers=[('Content-Type', 'application/json'), create_admin_authorization_header()],
|
||||
data=data)
|
||||
|
||||
assert response.status_code == 204
|
||||
admin_request.post(
|
||||
'user.set_permissions',
|
||||
user_id=str(sample_user.id),
|
||||
service_id=str(sample_service.id),
|
||||
_data=data,
|
||||
_expected_status=204,
|
||||
)
|
||||
|
||||
assert sample_service_user.folders == [tf2]
|
||||
assert service_2_user.folders == [tf3]
|
||||
|
||||
|
||||
def test_update_user_folder_permissions(client, sample_user, sample_service):
|
||||
def test_update_user_folder_permissions(admin_request, sample_user, sample_service):
|
||||
tf1 = create_template_folder(sample_service)
|
||||
tf2 = create_template_folder(sample_service)
|
||||
tf3 = create_template_folder(sample_service)
|
||||
@@ -609,23 +559,22 @@ def test_update_user_folder_permissions(client, sample_user, sample_service):
|
||||
service_user.folders = [tf1, tf2]
|
||||
dao_update_service_user(service_user)
|
||||
|
||||
data = json.dumps({'permissions': [], 'folder_permissions': [str(tf2.id), str(tf3.id)]})
|
||||
data = {'permissions': [], 'folder_permissions': [str(tf2.id), str(tf3.id)]}
|
||||
|
||||
response = client.post(
|
||||
url_for(
|
||||
'user.set_permissions',
|
||||
user_id=str(sample_user.id),
|
||||
service_id=str(sample_service.id)),
|
||||
headers=[('Content-Type', 'application/json'), create_admin_authorization_header()],
|
||||
data=data)
|
||||
admin_request.post(
|
||||
'user.set_permissions',
|
||||
user_id=str(sample_user.id),
|
||||
service_id=str(sample_service.id),
|
||||
_data=data,
|
||||
_expected_status=204,
|
||||
)
|
||||
|
||||
assert response.status_code == 204
|
||||
assert len(service_user.folders) == 2
|
||||
assert tf2 in service_user.folders
|
||||
assert tf3 in service_user.folders
|
||||
|
||||
|
||||
def test_remove_user_folder_permissions(client, sample_user, sample_service):
|
||||
def test_remove_user_folder_permissions(admin_request, sample_user, sample_service):
|
||||
tf1 = create_template_folder(sample_service)
|
||||
tf2 = create_template_folder(sample_service)
|
||||
|
||||
@@ -633,35 +582,34 @@ def test_remove_user_folder_permissions(client, sample_user, sample_service):
|
||||
service_user.folders = [tf1, tf2]
|
||||
dao_update_service_user(service_user)
|
||||
|
||||
data = json.dumps({'permissions': [], 'folder_permissions': []})
|
||||
data = {'permissions': [], 'folder_permissions': []}
|
||||
|
||||
response = client.post(
|
||||
url_for(
|
||||
'user.set_permissions',
|
||||
user_id=str(sample_user.id),
|
||||
service_id=str(sample_service.id)),
|
||||
headers=[('Content-Type', 'application/json'), create_admin_authorization_header()],
|
||||
data=data)
|
||||
admin_request.post(
|
||||
'user.set_permissions',
|
||||
user_id=str(sample_user.id),
|
||||
service_id=str(sample_service.id),
|
||||
_data=data,
|
||||
_expected_status=204,
|
||||
)
|
||||
|
||||
assert response.status_code == 204
|
||||
assert service_user.folders == []
|
||||
|
||||
|
||||
@freeze_time("2016-01-01 11:09:00.061258")
|
||||
def test_send_user_reset_password_should_send_reset_password_link(client,
|
||||
def test_send_user_reset_password_should_send_reset_password_link(admin_request,
|
||||
sample_user,
|
||||
mocker,
|
||||
password_reset_email_template):
|
||||
mocked = mocker.patch('app.celery.provider_tasks.deliver_email.apply_async')
|
||||
data = json.dumps({'email': sample_user.email_address})
|
||||
auth_header = create_admin_authorization_header()
|
||||
data = {'email': sample_user.email_address}
|
||||
notify_service = password_reset_email_template.service
|
||||
resp = client.post(
|
||||
url_for('user.send_user_reset_password'),
|
||||
data=data,
|
||||
headers=[('Content-Type', 'application/json'), auth_header])
|
||||
|
||||
assert resp.status_code == 204
|
||||
admin_request.post(
|
||||
'user.send_user_reset_password',
|
||||
_data=data,
|
||||
_expected_status=204,
|
||||
)
|
||||
|
||||
notification = Notification.query.first()
|
||||
mocked.assert_called_once_with([str(notification.id)], queue="notify-internal-tasks")
|
||||
assert notification.reply_to_text == notify_service.get_default_reply_to_email_address()
|
||||
@@ -702,109 +650,112 @@ def test_send_user_reset_password_should_use_provided_base_url(
|
||||
|
||||
@freeze_time("2016-01-01 11:09:00.061258")
|
||||
def test_send_user_reset_password_reset_password_link_contains_redirect_link_if_present_in_request(
|
||||
client, sample_user, mocker, password_reset_email_template
|
||||
admin_request, sample_user, mocker, password_reset_email_template
|
||||
):
|
||||
mocked = mocker.patch('app.celery.provider_tasks.deliver_email.apply_async')
|
||||
data = json.dumps({'email': sample_user.email_address, "next": "blob"})
|
||||
auth_header = create_admin_authorization_header()
|
||||
response = client.post(
|
||||
url_for('user.send_user_reset_password'),
|
||||
data=data,
|
||||
headers=[('Content-Type', 'application/json'), auth_header])
|
||||
data = {'email': sample_user.email_address, "next": "blob"}
|
||||
|
||||
admin_request.post(
|
||||
'user.send_user_reset_password',
|
||||
_data=data,
|
||||
_expected_status=204,
|
||||
)
|
||||
|
||||
assert response.status_code == 204
|
||||
notification = Notification.query.first()
|
||||
assert "?next=blob" in notification.content
|
||||
mocked.assert_called_once_with([str(notification.id)], queue="notify-internal-tasks")
|
||||
|
||||
|
||||
def test_send_user_reset_password_should_return_400_when_email_is_missing(client, mocker):
|
||||
def test_send_user_reset_password_should_return_400_when_email_is_missing(admin_request, mocker):
|
||||
mocked = mocker.patch('app.celery.provider_tasks.deliver_email.apply_async')
|
||||
data = json.dumps({})
|
||||
auth_header = create_admin_authorization_header()
|
||||
data = {}
|
||||
|
||||
resp = client.post(
|
||||
url_for('user.send_user_reset_password'),
|
||||
data=data,
|
||||
headers=[('Content-Type', 'application/json'), auth_header])
|
||||
|
||||
assert resp.status_code == 400
|
||||
assert json.loads(resp.get_data(as_text=True))['message'] == {'email': ['Missing data for required field.']}
|
||||
json_resp = admin_request.post(
|
||||
'user.send_user_reset_password',
|
||||
_data=data,
|
||||
_expected_status=400,
|
||||
)
|
||||
assert json_resp['message'] == {'email': ['Missing data for required field.']}
|
||||
assert mocked.call_count == 0
|
||||
|
||||
|
||||
def test_send_user_reset_password_should_return_400_when_user_doesnot_exist(client, mocker):
|
||||
def test_send_user_reset_password_should_return_400_when_user_doesnot_exist(admin_request, mocker):
|
||||
mocked = mocker.patch('app.celery.provider_tasks.deliver_email.apply_async')
|
||||
bad_email_address = 'bad@email.gov.uk'
|
||||
data = json.dumps({'email': bad_email_address})
|
||||
auth_header = create_admin_authorization_header()
|
||||
data = {'email': bad_email_address}
|
||||
|
||||
resp = client.post(
|
||||
url_for('user.send_user_reset_password'),
|
||||
data=data,
|
||||
headers=[('Content-Type', 'application/json'), auth_header])
|
||||
json_resp = admin_request.post(
|
||||
'user.send_user_reset_password',
|
||||
_data=data,
|
||||
_expected_status=404,
|
||||
)
|
||||
|
||||
assert resp.status_code == 404
|
||||
assert json.loads(resp.get_data(as_text=True))['message'] == 'No result found'
|
||||
assert json_resp['message'] == 'No result found'
|
||||
assert mocked.call_count == 0
|
||||
|
||||
|
||||
def test_send_user_reset_password_should_return_400_when_data_is_not_email_address(client, mocker):
|
||||
def test_send_user_reset_password_should_return_400_when_data_is_not_email_address(admin_request, mocker):
|
||||
mocked = mocker.patch('app.celery.provider_tasks.deliver_email.apply_async')
|
||||
bad_email_address = 'bad.email.gov.uk'
|
||||
data = json.dumps({'email': bad_email_address})
|
||||
auth_header = create_admin_authorization_header()
|
||||
data = {'email': bad_email_address}
|
||||
|
||||
resp = client.post(
|
||||
url_for('user.send_user_reset_password'),
|
||||
data=data,
|
||||
headers=[('Content-Type', 'application/json'), auth_header])
|
||||
json_resp = admin_request.post(
|
||||
'user.send_user_reset_password',
|
||||
_data=data,
|
||||
_expected_status=400,
|
||||
)
|
||||
|
||||
assert resp.status_code == 400
|
||||
assert json.loads(resp.get_data(as_text=True))['message'] == {'email': ['Not a valid email address']}
|
||||
assert json_resp['message'] == {'email': ['Not a valid email address']}
|
||||
assert mocked.call_count == 0
|
||||
|
||||
|
||||
def test_send_already_registered_email(client, sample_user, already_registered_template, mocker):
|
||||
data = json.dumps({'email': sample_user.email_address})
|
||||
auth_header = create_admin_authorization_header()
|
||||
def test_send_already_registered_email(admin_request, sample_user, already_registered_template, mocker):
|
||||
data = {'email': sample_user.email_address}
|
||||
mocked = mocker.patch('app.celery.provider_tasks.deliver_email.apply_async')
|
||||
notify_service = already_registered_template.service
|
||||
|
||||
resp = client.post(
|
||||
url_for('user.send_already_registered_email', user_id=str(sample_user.id)),
|
||||
data=data,
|
||||
headers=[('Content-Type', 'application/json'), auth_header])
|
||||
assert resp.status_code == 204
|
||||
admin_request.post(
|
||||
'user.send_already_registered_email',
|
||||
user_id=str(sample_user.id),
|
||||
_data=data,
|
||||
_expected_status=204,
|
||||
)
|
||||
|
||||
notification = Notification.query.first()
|
||||
mocked.assert_called_once_with(([str(notification.id)]), queue="notify-internal-tasks")
|
||||
assert notification.reply_to_text == notify_service.get_default_reply_to_email_address()
|
||||
|
||||
|
||||
def test_send_already_registered_email_returns_400_when_data_is_missing(client, sample_user):
|
||||
data = json.dumps({})
|
||||
auth_header = create_admin_authorization_header()
|
||||
def test_send_already_registered_email_returns_400_when_data_is_missing(admin_request, sample_user):
|
||||
data = {}
|
||||
|
||||
resp = client.post(
|
||||
url_for('user.send_already_registered_email', user_id=str(sample_user.id)),
|
||||
data=data,
|
||||
headers=[('Content-Type', 'application/json'), auth_header])
|
||||
assert resp.status_code == 400
|
||||
assert json.loads(resp.get_data(as_text=True))['message'] == {'email': ['Missing data for required field.']}
|
||||
json_resp = admin_request.post(
|
||||
'user.send_already_registered_email',
|
||||
user_id=str(sample_user.id),
|
||||
_data=data,
|
||||
_expected_status=400,
|
||||
)
|
||||
assert json_resp['message'] == {'email': ['Missing data for required field.']}
|
||||
|
||||
|
||||
def test_send_user_confirm_new_email_returns_204(client, sample_user, change_email_confirmation_template, mocker):
|
||||
def test_send_user_confirm_new_email_returns_204(
|
||||
admin_request,
|
||||
sample_user,
|
||||
change_email_confirmation_template,
|
||||
mocker
|
||||
):
|
||||
mocked = mocker.patch('app.celery.provider_tasks.deliver_email.apply_async')
|
||||
new_email = 'new_address@dig.gov.uk'
|
||||
data = json.dumps({'email': new_email})
|
||||
auth_header = create_admin_authorization_header()
|
||||
data = {'email': new_email}
|
||||
notify_service = change_email_confirmation_template.service
|
||||
|
||||
resp = client.post(url_for('user.send_user_confirm_new_email', user_id=str(sample_user.id)),
|
||||
data=data,
|
||||
headers=[('Content-Type', 'application/json'), auth_header])
|
||||
assert resp.status_code == 204
|
||||
admin_request.post(
|
||||
'user.send_user_confirm_new_email',
|
||||
user_id=str(sample_user.id),
|
||||
_data=data,
|
||||
_expected_status=204,
|
||||
)
|
||||
|
||||
notification = Notification.query.first()
|
||||
mocked.assert_called_once_with(
|
||||
([str(notification.id)]),
|
||||
@@ -812,41 +763,41 @@ def test_send_user_confirm_new_email_returns_204(client, sample_user, change_ema
|
||||
assert notification.reply_to_text == notify_service.get_default_reply_to_email_address()
|
||||
|
||||
|
||||
def test_send_user_confirm_new_email_returns_400_when_email_missing(client, sample_user, mocker):
|
||||
def test_send_user_confirm_new_email_returns_400_when_email_missing(admin_request, sample_user, mocker):
|
||||
mocked = mocker.patch('app.celery.provider_tasks.deliver_email.apply_async')
|
||||
data = json.dumps({})
|
||||
auth_header = create_admin_authorization_header()
|
||||
resp = client.post(url_for('user.send_user_confirm_new_email', user_id=str(sample_user.id)),
|
||||
data=data,
|
||||
headers=[('Content-Type', 'application/json'), auth_header])
|
||||
assert resp.status_code == 400
|
||||
assert json.loads(resp.get_data(as_text=True))['message'] == {'email': ['Missing data for required field.']}
|
||||
data = {}
|
||||
|
||||
json_resp = admin_request.post(
|
||||
'user.send_user_confirm_new_email',
|
||||
user_id=str(sample_user.id),
|
||||
_data=data,
|
||||
_expected_status=400,
|
||||
)
|
||||
assert json_resp['message'] == {'email': ['Missing data for required field.']}
|
||||
mocked.assert_not_called()
|
||||
|
||||
|
||||
@freeze_time('2020-02-14T12:00:00')
|
||||
def test_update_user_password_saves_correctly(client, sample_service):
|
||||
def test_update_user_password_saves_correctly(admin_request, sample_service):
|
||||
sample_user = sample_service.users[0]
|
||||
new_password = '1234567890'
|
||||
auth_header = create_admin_authorization_header()
|
||||
headers = [('Content-Type', 'application/json'), auth_header]
|
||||
data = {'_password': '1234567890'}
|
||||
resp = client.post(
|
||||
url_for('user.update_password', user_id=sample_user.id),
|
||||
data=json.dumps(data),
|
||||
headers=headers)
|
||||
assert resp.status_code == 200
|
||||
|
||||
json_resp = json.loads(resp.get_data(as_text=True))
|
||||
json_resp = admin_request.post(
|
||||
'user.update_password',
|
||||
user_id=str(sample_user.id),
|
||||
_data=data
|
||||
)
|
||||
|
||||
assert json_resp['data']['password_changed_at'] is not None
|
||||
data = {'password': new_password}
|
||||
auth_header = create_admin_authorization_header()
|
||||
headers = [('Content-Type', 'application/json'), auth_header]
|
||||
resp = client.post(
|
||||
url_for('user.verify_user_password', user_id=str(sample_user.id)),
|
||||
data=json.dumps(data),
|
||||
headers=headers)
|
||||
assert resp.status_code == 204
|
||||
|
||||
admin_request.post(
|
||||
'user.verify_user_password',
|
||||
user_id=str(sample_user.id),
|
||||
_data=data,
|
||||
_expected_status=204
|
||||
)
|
||||
|
||||
|
||||
def test_activate_user(admin_request, sample_user):
|
||||
@@ -1088,72 +1039,58 @@ def test_get_orgs_and_services_only_shows_users_orgs_and_services(admin_request,
|
||||
]
|
||||
|
||||
|
||||
def test_find_users_by_email_finds_user_by_partial_email(notify_db, client):
|
||||
def test_find_users_by_email_finds_user_by_partial_email(notify_db_session, admin_request):
|
||||
create_user(email='findel.mestro@foo.com')
|
||||
create_user(email='me.ignorra@foo.com')
|
||||
data = json.dumps({"email": "findel"})
|
||||
auth_header = create_admin_authorization_header()
|
||||
data = {"email": "findel"}
|
||||
|
||||
response = client.post(
|
||||
url_for("user.find_users_by_email"),
|
||||
data=data,
|
||||
headers=[('Content-Type', 'application/json'), auth_header]
|
||||
users = admin_request.post(
|
||||
"user.find_users_by_email",
|
||||
_data=data,
|
||||
)
|
||||
users = json.loads(response.get_data(as_text=True))
|
||||
|
||||
assert response.status_code == 200
|
||||
assert len(users['data']) == 1
|
||||
assert users['data'][0]['email_address'] == 'findel.mestro@foo.com'
|
||||
|
||||
|
||||
def test_find_users_by_email_finds_user_by_full_email(notify_db, client):
|
||||
def test_find_users_by_email_finds_user_by_full_email(notify_db_session, admin_request):
|
||||
create_user(email='findel.mestro@foo.com')
|
||||
create_user(email='me.ignorra@foo.com')
|
||||
data = json.dumps({"email": "findel.mestro@foo.com"})
|
||||
auth_header = create_admin_authorization_header()
|
||||
data = {"email": "findel.mestro@foo.com"}
|
||||
|
||||
response = client.post(
|
||||
url_for("user.find_users_by_email"),
|
||||
data=data,
|
||||
headers=[('Content-Type', 'application/json'), auth_header]
|
||||
users = admin_request.post(
|
||||
"user.find_users_by_email",
|
||||
_data=data,
|
||||
)
|
||||
users = json.loads(response.get_data(as_text=True))
|
||||
|
||||
assert response.status_code == 200
|
||||
assert len(users['data']) == 1
|
||||
assert users['data'][0]['email_address'] == 'findel.mestro@foo.com'
|
||||
|
||||
|
||||
def test_find_users_by_email_handles_no_results(notify_db, client):
|
||||
def test_find_users_by_email_handles_no_results(notify_db_session, admin_request):
|
||||
create_user(email='findel.mestro@foo.com')
|
||||
create_user(email='me.ignorra@foo.com')
|
||||
data = json.dumps({"email": "rogue"})
|
||||
auth_header = create_admin_authorization_header()
|
||||
data = {"email": "rogue"}
|
||||
|
||||
response = client.post(
|
||||
url_for("user.find_users_by_email"),
|
||||
data=data,
|
||||
headers=[('Content-Type', 'application/json'), auth_header]
|
||||
users = admin_request.post(
|
||||
"user.find_users_by_email",
|
||||
_data=data,
|
||||
)
|
||||
users = json.loads(response.get_data(as_text=True))
|
||||
|
||||
assert response.status_code == 200
|
||||
assert users['data'] == []
|
||||
|
||||
|
||||
def test_search_for_users_by_email_handles_incorrect_data_format(notify_db, client):
|
||||
def test_search_for_users_by_email_handles_incorrect_data_format(notify_db_session, admin_request):
|
||||
create_user(email='findel.mestro@foo.com')
|
||||
data = json.dumps({"email": 1})
|
||||
auth_header = create_admin_authorization_header()
|
||||
data = {"email": 1}
|
||||
|
||||
response = client.post(
|
||||
url_for("user.find_users_by_email"),
|
||||
data=data,
|
||||
headers=[('Content-Type', 'application/json'), auth_header]
|
||||
json = admin_request.post(
|
||||
"user.find_users_by_email",
|
||||
_data=data,
|
||||
_expected_status=400
|
||||
)
|
||||
|
||||
assert response.status_code == 400
|
||||
assert json.loads(response.get_data(as_text=True))['message'] == {'email': ['Not a valid string.']}
|
||||
assert json['message'] == {'email': ['Not a valid string.']}
|
||||
|
||||
|
||||
@pytest.mark.parametrize('number, expected_reply_to',
|
||||
|
||||
Reference in New Issue
Block a user