Fix bug in PermissionDAO

Refactor user/test_rest
Remove conftest/sample_admin_service
This commit is contained in:
Rebecca Law
2016-03-01 10:34:27 +00:00
parent 5aa2243e81
commit ecbfbbc6b0
3 changed files with 82 additions and 123 deletions

View File

@@ -29,7 +29,7 @@ class PermissionDAO(DAOClass):
query.filter_by(service=Service.query.get(service_ids[0]))
# TODO the join method for multiple services
if 'user' in filter_by_dict:
user_ids = filter_by_dict.getlist('service')
user_ids = filter_by_dict.getlist('user')
if len(user_ids) == 1:
query = query.filter_by(user=User.query.get(user_ids[0]))
# TODO the join method for multiple users

View File

@@ -237,16 +237,6 @@ def sample_email_job(notify_db,
return job
@pytest.fixture(scope='function')
def sample_admin_service_id(notify_db, notify_db_session):
admin_user = sample_user(notify_db, notify_db_session, email="notify_admin@digital.cabinet-office.gov.uk")
admin_service = sample_service(notify_db, notify_db_session, service_name="Sample Admin Service", user=admin_user)
data = {'service': admin_service, 'name': 'sample admin key'}
api_key = ApiKey(**data)
save_model_api_key(api_key)
return admin_service.id
@pytest.fixture(scope='function')
def mock_secret_code(mocker):
def _create():

View File

@@ -1,79 +1,69 @@
import json
from flask import url_for
from app.models import (User, Service)
from app.dao.users_dao import save_model_user
from app.models import (User)
from tests import create_authorization_header
from tests.app.conftest import sample_service as create_sample_service
def test_get_user_list(notify_api, notify_db, notify_db_session, sample_user, sample_admin_service_id):
def test_get_user_list(notify_api, notify_db, notify_db_session, sample_service):
"""
Tests GET endpoint '/' to retrieve entire user list.
"""
with notify_api.test_request_context():
with notify_api.test_client() as client:
header = create_authorization_header(service_id=sample_admin_service_id,
path=url_for('user.get_user'),
header = create_authorization_header(path=url_for('user.get_user'),
method='GET')
response = client.get(url_for('user.get_user'),
headers=[header])
assert response.status_code == 200
json_resp = json.loads(response.get_data(as_text=True))
assert len(json_resp['data']) == 2
expected = {
"name": "Test User",
"email_address": sample_user.email_address,
"id": sample_user.id,
"mobile_number": "+447700900986",
"password_changed_at": None,
"logged_in_at": None,
"state": "active",
"failed_login_count": 0,
"permissions": {
str(sample_admin_service_id): [
'manage_service', 'send_messages', 'manage_api_keys', 'manage_templates']}
}
print(json_resp['data'])
assert expected in json_resp['data']
assert len(json_resp['data']) == 1
sample_user = sample_service.users[0]
expected_permissions = ['manage_service', 'send_messages', 'manage_api_keys', 'manage_templates']
fetched = json_resp['data'][0]
assert sample_user.id == fetched['id']
assert sample_user.name == fetched['name']
assert sample_user.mobile_number == fetched['mobile_number']
assert sample_user.email_address == fetched['email_address']
assert sample_user.state == fetched['state']
assert sorted(expected_permissions) == sorted(fetched['permissions'][str(sample_service.id)])
def test_get_user(notify_api, notify_db, notify_db_session, sample_user, sample_admin_service_id):
def test_get_user(notify_api, notify_db, notify_db_session, sample_service):
"""
Tests GET endpoint '/<user_id>' to retrieve a single service.
"""
with notify_api.test_request_context():
with notify_api.test_client() as client:
header = create_authorization_header(service_id=sample_admin_service_id,
path=url_for('user.get_user', user_id=sample_user.id),
sample_user = sample_service.users[0]
header = create_authorization_header(path=url_for('user.get_user', user_id=sample_user.id),
method='GET')
resp = client.get(url_for('user.get_user',
user_id=sample_user.id),
headers=[header])
assert resp.status_code == 200
json_resp = json.loads(resp.get_data(as_text=True))
expected = {
"name": "Test User",
"email_address": sample_user.email_address,
"id": sample_user.id,
"mobile_number": "+447700900986",
"password_changed_at": None,
"logged_in_at": None,
"state": "active",
"failed_login_count": 0,
"permissions": {
str(sample_admin_service_id): [
'manage_service', 'send_messages', 'manage_api_keys', 'manage_templates']}
}
assert json_resp['data'] == expected
expected_permissions = ['manage_service', 'send_messages', 'manage_api_keys', 'manage_templates']
fetched = json_resp['data']
assert sample_user.id == fetched['id']
assert sample_user.name == fetched['name']
assert sample_user.mobile_number == fetched['mobile_number']
assert sample_user.email_address == fetched['email_address']
assert sample_user.state == fetched['state']
assert sorted(expected_permissions) == sorted(fetched['permissions'][str(sample_service.id)])
def test_post_user(notify_api, notify_db, notify_db_session, sample_admin_service_id):
def test_post_user(notify_api, notify_db, notify_db_session):
"""
Tests POST endpoint '/' to create a user.
"""
with notify_api.test_request_context():
with notify_api.test_client() as client:
assert User.query.count() == 1
assert User.query.count() == 0
data = {
"name": "Test User",
"email_address": "user@digital.cabinet-office.gov.uk",
@@ -85,8 +75,7 @@ def test_post_user(notify_api, notify_db, notify_db_session, sample_admin_servic
"failed_login_count": 0,
"permissions": {}
}
auth_header = create_authorization_header(service_id=sample_admin_service_id,
path=url_for('user.create_user'),
auth_header = create_authorization_header(path=url_for('user.create_user'),
method='POST',
request_body=json.dumps(data))
headers = [('Content-Type', 'application/json'), auth_header]
@@ -102,13 +91,13 @@ def test_post_user(notify_api, notify_db, notify_db_session, sample_admin_servic
assert json_resp['data']['id'] == user.id
def test_post_user_missing_attribute_email(notify_api, notify_db, notify_db_session, sample_admin_service_id):
def test_post_user_missing_attribute_email(notify_api, notify_db, notify_db_session):
"""
Tests POST endpoint '/' missing attribute email.
"""
with notify_api.test_request_context():
with notify_api.test_client() as client:
assert User.query.count() == 1
assert User.query.count() == 0
data = {
"name": "Test User",
"password": "password",
@@ -119,8 +108,7 @@ def test_post_user_missing_attribute_email(notify_api, notify_db, notify_db_sess
"failed_login_count": 0,
"permissions": {}
}
auth_header = create_authorization_header(service_id=sample_admin_service_id,
path=url_for('user.create_user'),
auth_header = create_authorization_header(path=url_for('user.create_user'),
method='POST',
request_body=json.dumps(data))
headers = [('Content-Type', 'application/json'), auth_header]
@@ -129,18 +117,18 @@ def test_post_user_missing_attribute_email(notify_api, notify_db, notify_db_sess
data=json.dumps(data),
headers=headers)
assert resp.status_code == 400
assert User.query.count() == 1
assert User.query.count() == 0
json_resp = json.loads(resp.get_data(as_text=True))
assert {'email_address': ['Missing data for required field.']} == json_resp['message']
def test_post_user_missing_attribute_password(notify_api, notify_db, notify_db_session, sample_admin_service_id):
def test_post_user_missing_attribute_password(notify_api, notify_db, notify_db_session):
"""
Tests POST endpoint '/' missing attribute password.
"""
with notify_api.test_request_context():
with notify_api.test_client() as client:
assert User.query.count() == 1
assert User.query.count() == 0
data = {
"name": "Test User",
"email_address": "user@digital.cabinet-office.gov.uk",
@@ -151,8 +139,7 @@ def test_post_user_missing_attribute_password(notify_api, notify_db, notify_db_s
"failed_login_count": 0,
"permissions": {}
}
auth_header = create_authorization_header(service_id=sample_admin_service_id,
path=url_for('user.create_user'),
auth_header = create_authorization_header(path=url_for('user.create_user'),
method='POST',
request_body=json.dumps(data))
headers = [('Content-Type', 'application/json'), auth_header]
@@ -161,26 +148,26 @@ def test_post_user_missing_attribute_password(notify_api, notify_db, notify_db_s
data=json.dumps(data),
headers=headers)
assert resp.status_code == 400
assert User.query.count() == 1
assert User.query.count() == 0
json_resp = json.loads(resp.get_data(as_text=True))
assert {'password': ['Missing data for required field.']} == json_resp['message']
def test_put_user(notify_api, notify_db, notify_db_session, sample_user, sample_admin_service_id):
def test_put_user(notify_api, notify_db, notify_db_session, sample_service):
"""
Tests PUT endpoint '/' to update a user.
"""
with notify_api.test_request_context():
with notify_api.test_client() as client:
assert User.query.count() == 2
assert User.query.count() == 1
sample_user = sample_service.users[0]
new_email = 'new@digital.cabinet-office.gov.uk'
data = {
'name': sample_user.name,
'email_address': new_email,
'mobile_number': sample_user.mobile_number
}
auth_header = create_authorization_header(service_id=sample_admin_service_id,
path=url_for('user.update_user', user_id=sample_user.id),
auth_header = create_authorization_header(path=url_for('user.update_user', user_id=sample_user.id),
method='PUT',
request_body=json.dumps(data))
headers = [('Content-Type', 'application/json'), auth_header]
@@ -189,37 +176,31 @@ def test_put_user(notify_api, notify_db, notify_db_session, sample_user, sample_
data=json.dumps(data),
headers=headers)
assert resp.status_code == 200
assert User.query.count() == 2
user = User.query.filter_by(email_address=new_email).first()
assert User.query.count() == 1
json_resp = json.loads(resp.get_data(as_text=True))
expected = {
"name": "Test User",
"email_address": new_email,
"mobile_number": "+447700900986",
"password_changed_at": None,
"id": user.id,
"logged_in_at": None,
"state": "active",
"failed_login_count": 0,
"permissions": {
str(sample_admin_service_id): [
'manage_service', 'send_messages', 'manage_api_keys', 'manage_templates']}
}
assert json_resp['data'] == expected
assert json_resp['data']['email_address'] == new_email
expected_permissions = ['manage_service', 'send_messages', 'manage_api_keys', 'manage_templates']
fetched = json_resp['data']
assert sample_user.id == fetched['id']
assert sample_user.name == fetched['name']
assert sample_user.mobile_number == fetched['mobile_number']
assert new_email == fetched['email_address']
assert sample_user.state == fetched['state']
assert sorted(expected_permissions) == sorted(fetched['permissions'][str(sample_service.id)])
def test_put_user_update_password(notify_api,
notify_db,
notify_db_session,
sample_user,
sample_admin_service_id):
sample_service):
"""
Tests PUT endpoint '/' to update a user including their password.
"""
with notify_api.test_request_context():
with notify_api.test_client() as client:
assert User.query.count() == 2
assert User.query.count() == 1
sample_user = sample_service.users[0]
new_password = '1234567890'
data = {
'name': sample_user.name,
@@ -227,8 +208,7 @@ def test_put_user_update_password(notify_api,
'mobile_number': sample_user.mobile_number,
'password': new_password
}
auth_header = create_authorization_header(service_id=sample_admin_service_id,
path=url_for('user.update_user', user_id=sample_user.id),
auth_header = create_authorization_header(path=url_for('user.update_user', user_id=sample_user.id),
method='PUT',
request_body=json.dumps(data))
headers = [('Content-Type', 'application/json'), auth_header]
@@ -237,12 +217,11 @@ def test_put_user_update_password(notify_api,
data=json.dumps(data),
headers=headers)
assert resp.status_code == 200
assert User.query.count() == 2
assert User.query.count() == 1
json_resp = json.loads(resp.get_data(as_text=True))
assert json_resp['data']['password_changed_at'] is not None
data = {'password': new_password}
auth_header = create_authorization_header(service_id=sample_admin_service_id,
path=url_for('user.verify_user_password', user_id=sample_user.id),
auth_header = create_authorization_header(path=url_for('user.verify_user_password', user_id=sample_user.id),
method='POST',
request_body=json.dumps(data))
headers = [('Content-Type', 'application/json'), auth_header]
@@ -253,17 +232,16 @@ def test_put_user_update_password(notify_api,
assert resp.status_code == 204
def test_put_user_not_exists(notify_api, notify_db, notify_db_session, sample_user, sample_admin_service_id):
def test_put_user_not_exists(notify_api, notify_db, notify_db_session, sample_user):
"""
Tests PUT endpoint '/' to update a user doesn't exist.
"""
with notify_api.test_request_context():
with notify_api.test_client() as client:
assert User.query.count() == 2
assert User.query.count() == 1
new_email = 'new@digital.cabinet-office.gov.uk'
data = {'email_address': new_email}
auth_header = create_authorization_header(service_id=sample_admin_service_id,
path=url_for('user.update_user', user_id="9999"),
auth_header = create_authorization_header(path=url_for('user.update_user', user_id="9999"),
method='PUT',
request_body=json.dumps(data))
headers = [('Content-Type', 'application/json'), auth_header]
@@ -272,7 +250,7 @@ def test_put_user_not_exists(notify_api, notify_db, notify_db_session, sample_us
data=json.dumps(data),
headers=headers)
assert resp.status_code == 404
assert User.query.count() == 2
assert User.query.count() == 1
user = User.query.filter_by(id=sample_user.id).first()
json_resp = json.loads(resp.get_data(as_text=True))
assert json_resp['result'] == "error"
@@ -282,43 +260,36 @@ def test_put_user_not_exists(notify_api, notify_db, notify_db_session, sample_us
assert user.email_address != new_email
def test_get_user_by_email(notify_api, notify_db, notify_db_session, sample_user, sample_admin_service_id):
def test_get_user_by_email(notify_api, notify_db, notify_db_session, sample_service):
with notify_api.test_request_context():
with notify_api.test_client() as client:
header = create_authorization_header(service_id=sample_admin_service_id,
path=url_for('user.get_by_email'), method='GET')
sample_user = sample_service.users[0]
header = create_authorization_header(path=url_for('user.get_by_email'), method='GET')
url = url_for('user.get_by_email', email=sample_user.email_address)
resp = client.get(url, headers=[header])
assert resp.status_code == 200
json_resp = json.loads(resp.get_data(as_text=True))
expected = {
"name": "Test User",
"email_address": sample_user.email_address,
"id": sample_user.id,
"mobile_number": "+447700900986",
"password_changed_at": None,
"logged_in_at": None,
"state": "active",
"failed_login_count": 0,
"permissions": {
str(sample_admin_service_id): [
'manage_service', 'send_messages', 'manage_api_keys', 'manage_templates']}
}
assert json_resp['data'] == expected
json_resp = json.loads(resp.get_data(as_text=True))
expected_permissions = ['manage_service', 'send_messages', 'manage_api_keys', 'manage_templates']
fetched = json_resp['data']
assert sample_user.id == fetched['id']
assert sample_user.name == fetched['name']
assert sample_user.mobile_number == fetched['mobile_number']
assert sample_user.email_address == fetched['email_address']
assert sample_user.state == fetched['state']
assert sorted(expected_permissions) == sorted(fetched['permissions'][str(sample_service.id)])
def test_get_user_by_email_not_found_returns_400(notify_api,
notify_db,
notify_db_session,
sample_user,
sample_admin_service_id):
sample_user):
with notify_api.test_request_context():
with notify_api.test_client() as client:
header = create_authorization_header(service_id=sample_admin_service_id,
path=url_for('user.get_by_email'), method='GET')
header = create_authorization_header(path=url_for('user.get_by_email'), method='GET')
url = url_for('user.get_by_email', email='no_user@digital.gov.uk')
resp = client.get(url, headers=[header])
assert resp.status_code == 404
@@ -330,13 +301,11 @@ def test_get_user_by_email_not_found_returns_400(notify_api,
def test_get_user_by_email_bad_url_returns_404(notify_api,
notify_db,
notify_db_session,
sample_user,
sample_admin_service_id):
sample_user):
with notify_api.test_request_context():
with notify_api.test_client() as client:
header = create_authorization_header(service_id=sample_admin_service_id,
path=url_for('user.get_by_email'), method='GET')
header = create_authorization_header(path=url_for('user.get_by_email'), method='GET')
url = '/user/email'
resp = client.get(url, headers=[header])
assert resp.status_code == 400