mirror of
https://github.com/GSA/notifications-api.git
synced 2025-12-23 08:51:30 -05:00
Merge pull request #104 from alphagov/add_user_permissions
Add user permissions
This commit is contained in:
@@ -3,6 +3,7 @@ from flask import current_app
|
|||||||
from flask_marshmallow.fields import fields
|
from flask_marshmallow.fields import fields
|
||||||
from . import ma
|
from . import ma
|
||||||
from . import models
|
from . import models
|
||||||
|
from app.dao.permissions_dao import permission_dao
|
||||||
from marshmallow import (post_load, ValidationError, validates, validates_schema)
|
from marshmallow import (post_load, ValidationError, validates, validates_schema)
|
||||||
|
|
||||||
mobile_regex = re.compile("^\\+44[\\d]{10}$")
|
mobile_regex = re.compile("^\\+44[\\d]{10}$")
|
||||||
@@ -58,6 +59,18 @@ class BaseSchema(ma.ModelSchema):
|
|||||||
|
|
||||||
|
|
||||||
class UserSchema(BaseSchema):
|
class UserSchema(BaseSchema):
|
||||||
|
|
||||||
|
permissions = fields.Method("user_permissions", dump_only=True)
|
||||||
|
|
||||||
|
def user_permissions(self, usr):
|
||||||
|
retval = {}
|
||||||
|
for x in permission_dao.get_query({'user': usr.id}):
|
||||||
|
service_id = str(x.service_id)
|
||||||
|
if service_id not in retval:
|
||||||
|
retval[service_id] = []
|
||||||
|
retval[service_id].append(x.permission)
|
||||||
|
return retval
|
||||||
|
|
||||||
class Meta:
|
class Meta:
|
||||||
model = models.User
|
model = models.User
|
||||||
exclude = (
|
exclude = (
|
||||||
|
|||||||
@@ -341,3 +341,24 @@ def sample_permission(notify_db,
|
|||||||
db.session.add(p_model)
|
db.session.add(p_model)
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
return p_model
|
return p_model
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(scope='function')
|
||||||
|
def sample_service_permission(notify_db,
|
||||||
|
notify_db_session,
|
||||||
|
service=None,
|
||||||
|
user=None,
|
||||||
|
permission="sample permission"):
|
||||||
|
if user is None:
|
||||||
|
user = sample_user(notify_db, notify_db_session)
|
||||||
|
if service is None:
|
||||||
|
service = sample_service(notify_db, notify_db_session)
|
||||||
|
data = {
|
||||||
|
'user': user,
|
||||||
|
'service': service,
|
||||||
|
'permission': permission
|
||||||
|
}
|
||||||
|
p_model = Permission(**data)
|
||||||
|
db.session.add(p_model)
|
||||||
|
db.session.commit()
|
||||||
|
return p_model
|
||||||
|
|||||||
@@ -28,8 +28,10 @@ def test_get_user_list(notify_api, notify_db, notify_db_session, sample_user, sa
|
|||||||
"password_changed_at": None,
|
"password_changed_at": None,
|
||||||
"logged_in_at": None,
|
"logged_in_at": None,
|
||||||
"state": "active",
|
"state": "active",
|
||||||
"failed_login_count": 0
|
"failed_login_count": 0,
|
||||||
|
"permissions": {}
|
||||||
}
|
}
|
||||||
|
print(json_resp['data'])
|
||||||
assert expected in json_resp['data']
|
assert expected in json_resp['data']
|
||||||
|
|
||||||
|
|
||||||
@@ -55,7 +57,8 @@ def test_get_user(notify_api, notify_db, notify_db_session, sample_user, sample_
|
|||||||
"password_changed_at": None,
|
"password_changed_at": None,
|
||||||
"logged_in_at": None,
|
"logged_in_at": None,
|
||||||
"state": "active",
|
"state": "active",
|
||||||
"failed_login_count": 0
|
"failed_login_count": 0,
|
||||||
|
"permissions": {}
|
||||||
}
|
}
|
||||||
assert json_resp['data'] == expected
|
assert json_resp['data'] == expected
|
||||||
|
|
||||||
@@ -75,7 +78,8 @@ def test_post_user(notify_api, notify_db, notify_db_session, sample_admin_servic
|
|||||||
"password_changed_at": None,
|
"password_changed_at": None,
|
||||||
"logged_in_at": None,
|
"logged_in_at": None,
|
||||||
"state": "active",
|
"state": "active",
|
||||||
"failed_login_count": 0
|
"failed_login_count": 0,
|
||||||
|
"permissions": {}
|
||||||
}
|
}
|
||||||
auth_header = create_authorization_header(service_id=sample_admin_service_id,
|
auth_header = create_authorization_header(service_id=sample_admin_service_id,
|
||||||
path=url_for('user.create_user'),
|
path=url_for('user.create_user'),
|
||||||
@@ -108,7 +112,8 @@ def test_post_user_missing_attribute_email(notify_api, notify_db, notify_db_sess
|
|||||||
"password_changed_at": None,
|
"password_changed_at": None,
|
||||||
"logged_in_at": None,
|
"logged_in_at": None,
|
||||||
"state": "active",
|
"state": "active",
|
||||||
"failed_login_count": 0
|
"failed_login_count": 0,
|
||||||
|
"permissions": {}
|
||||||
}
|
}
|
||||||
auth_header = create_authorization_header(service_id=sample_admin_service_id,
|
auth_header = create_authorization_header(service_id=sample_admin_service_id,
|
||||||
path=url_for('user.create_user'),
|
path=url_for('user.create_user'),
|
||||||
@@ -139,7 +144,8 @@ def test_post_user_missing_attribute_password(notify_api, notify_db, notify_db_s
|
|||||||
"password_changed_at": None,
|
"password_changed_at": None,
|
||||||
"logged_in_at": None,
|
"logged_in_at": None,
|
||||||
"state": "active",
|
"state": "active",
|
||||||
"failed_login_count": 0
|
"failed_login_count": 0,
|
||||||
|
"permissions": {}
|
||||||
}
|
}
|
||||||
auth_header = create_authorization_header(service_id=sample_admin_service_id,
|
auth_header = create_authorization_header(service_id=sample_admin_service_id,
|
||||||
path=url_for('user.create_user'),
|
path=url_for('user.create_user'),
|
||||||
@@ -190,7 +196,8 @@ def test_put_user(notify_api, notify_db, notify_db_session, sample_user, sample_
|
|||||||
"id": user.id,
|
"id": user.id,
|
||||||
"logged_in_at": None,
|
"logged_in_at": None,
|
||||||
"state": "active",
|
"state": "active",
|
||||||
"failed_login_count": 0
|
"failed_login_count": 0,
|
||||||
|
"permissions": {}
|
||||||
}
|
}
|
||||||
assert json_resp['data'] == expected
|
assert json_resp['data'] == expected
|
||||||
assert json_resp['data']['email_address'] == new_email
|
assert json_resp['data']['email_address'] == new_email
|
||||||
@@ -287,7 +294,8 @@ def test_get_user_by_email(notify_api, notify_db, notify_db_session, sample_user
|
|||||||
"password_changed_at": None,
|
"password_changed_at": None,
|
||||||
"logged_in_at": None,
|
"logged_in_at": None,
|
||||||
"state": "active",
|
"state": "active",
|
||||||
"failed_login_count": 0
|
"failed_login_count": 0,
|
||||||
|
"permissions": {}
|
||||||
}
|
}
|
||||||
|
|
||||||
assert json_resp['data'] == expected
|
assert json_resp['data'] == expected
|
||||||
@@ -327,3 +335,30 @@ def test_get_user_by_email_bad_url_returns_404(notify_api,
|
|||||||
json_resp = json.loads(resp.get_data(as_text=True))
|
json_resp = json.loads(resp.get_data(as_text=True))
|
||||||
assert json_resp['result'] == 'error'
|
assert json_resp['result'] == 'error'
|
||||||
assert json_resp['message'] == 'invalid request'
|
assert json_resp['message'] == 'invalid request'
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_user_with_permissions(notify_api,
|
||||||
|
notify_db,
|
||||||
|
notify_db_session,
|
||||||
|
sample_service_permission):
|
||||||
|
with notify_api.test_request_context():
|
||||||
|
with notify_api.test_client() as client:
|
||||||
|
header = create_authorization_header(
|
||||||
|
path=url_for('user.get_user', user_id=sample_service_permission.user.id),
|
||||||
|
method='GET')
|
||||||
|
response = client.get(url_for('user.get_user', user_id=sample_service_permission.user.id),
|
||||||
|
headers=[header])
|
||||||
|
assert response.status_code == 200
|
||||||
|
json_resp = json.loads(response.get_data(as_text=True))
|
||||||
|
expected = {
|
||||||
|
"name": "Test User",
|
||||||
|
"email_address": sample_service_permission.user.email_address,
|
||||||
|
"id": sample_service_permission.user.id,
|
||||||
|
"mobile_number": "+447700900986",
|
||||||
|
"password_changed_at": None,
|
||||||
|
"logged_in_at": None,
|
||||||
|
"state": "active",
|
||||||
|
"failed_login_count": 0,
|
||||||
|
"permissions": {str(sample_service_permission.service.id): [sample_service_permission.permission]}
|
||||||
|
}
|
||||||
|
assert expected == json_resp['data']
|
||||||
|
|||||||
Reference in New Issue
Block a user