mirror of
https://github.com/GSA/notifications-api.git
synced 2025-12-22 16:31:15 -05:00
[WIP] When user is added to a service a list of permissions groups are
used to assign the correct permissions to the user. Last slice will be to update invite status.
This commit is contained in:
@@ -32,7 +32,7 @@ export FIRETEXT_API_KEY=[contact team member for api key]
|
||||
export FIRETEXT_NUMBER="Firetext"
|
||||
export INVITATION_EMAIL_FROM='invites@notifications.service.gov.uk'
|
||||
export INVITATION_EXPIRATION_DAYS=2
|
||||
export NOTIFY_EMAIL_DOMAIN='dev.notify.works'
|
||||
export NOTIFY_EMAIL_DOMAIN='notify.works'
|
||||
export NOTIFY_JOB_QUEUE='[unique-to-environment]-notify-jobs-queue' # NOTE unique prefix
|
||||
export NOTIFICATION_QUEUE_PREFIX='[unique-to-environment]-notification_development' # NOTE unique prefix
|
||||
export SECRET_KEY='dev-notify-secret-key'
|
||||
|
||||
27
app/permissions_utils.py
Normal file
27
app/permissions_utils.py
Normal file
@@ -0,0 +1,27 @@
|
||||
from app.models import (
|
||||
MANAGE_USERS,
|
||||
MANAGE_TEMPLATES,
|
||||
MANAGE_SETTINGS,
|
||||
SEND_TEXTS,
|
||||
SEND_EMAILS,
|
||||
SEND_LETTERS,
|
||||
MANAGE_API_KEYS,
|
||||
ACCESS_DEVELOPER_DOCS
|
||||
)
|
||||
|
||||
from app.schemas import permission_schema
|
||||
|
||||
|
||||
permissions_groups = {'send_messages': [SEND_TEXTS, SEND_EMAILS, SEND_LETTERS],
|
||||
'manage_service': [MANAGE_USERS, MANAGE_SETTINGS, MANAGE_TEMPLATES],
|
||||
'manage_api_keys': [MANAGE_API_KEYS, ACCESS_DEVELOPER_DOCS]}
|
||||
|
||||
|
||||
def get_permissions_by_group(permission_groups):
|
||||
requested_permissions = []
|
||||
for group in permission_groups:
|
||||
permissions = permissions_groups[group]
|
||||
for permission in permissions:
|
||||
requested_permissions.append({'permission': permission})
|
||||
permissions, errors = permission_schema.load(requested_permissions, many=True)
|
||||
return permissions
|
||||
@@ -28,7 +28,11 @@ from app.models import ApiKey
|
||||
from app.schemas import (
|
||||
service_schema,
|
||||
api_key_schema,
|
||||
user_schema)
|
||||
user_schema,
|
||||
permission_schema,
|
||||
invited_user_schema
|
||||
)
|
||||
|
||||
from app.errors import register_errors
|
||||
|
||||
service = Blueprint('service', __name__)
|
||||
@@ -173,6 +177,10 @@ def add_user_to_service(service_id, user_id):
|
||||
message='User id: {} already part of service id: {}'.format(user_id, service_id)), 400
|
||||
|
||||
dao_add_user_to_service(service, user)
|
||||
invited_user, errors = invited_user_schema.load(request.get_json())
|
||||
if errors:
|
||||
return jsonify(result="error", message=errors), 404
|
||||
_process_permissions(user, service, invited_user.get_permissions())
|
||||
|
||||
data, errors = service_schema.dump(service)
|
||||
return jsonify(data=data), 201
|
||||
@@ -180,3 +188,13 @@ def add_user_to_service(service_id, user_id):
|
||||
|
||||
def _service_not_found(service_id):
|
||||
return jsonify(result='error', message='Service not found for id: {}'.format(service_id)), 404
|
||||
|
||||
|
||||
def _process_permissions(user, service, permission_groups):
|
||||
from app.permissions_utils import get_permissions_by_group
|
||||
from app.dao.permissions_dao import permission_dao
|
||||
permissions = get_permissions_by_group(permission_groups)
|
||||
for permission in permissions:
|
||||
permission.user = user
|
||||
permission.service = service
|
||||
permission_dao.set_user_permission(user, permissions)
|
||||
|
||||
@@ -215,7 +215,6 @@ def test_update_invited_user_set_status_to_cancelled(notify_api, sample_invited_
|
||||
|
||||
assert response.status_code == 200
|
||||
json_resp = json.loads(response.get_data(as_text=True))['data']
|
||||
print(json_resp)
|
||||
assert json_resp['status'] == 'cancelled'
|
||||
|
||||
|
||||
|
||||
@@ -181,7 +181,6 @@ def test_create_job_returns_404_if_missing_service(notify_api, sample_template,
|
||||
assert response.status_code == 404
|
||||
|
||||
app.celery.tasks.process_job.apply_async.assert_not_called()
|
||||
print(resp_json)
|
||||
assert resp_json['result'] == 'error'
|
||||
assert resp_json['message'] == 'Service {} not found'.format(random_id)
|
||||
|
||||
|
||||
@@ -458,7 +458,11 @@ def test_default_permissions_are_added_for_user_service(notify_api,
|
||||
assert sorted(default_service_permissions) == sorted(service_permissions)
|
||||
|
||||
|
||||
def test_add_existing_user_to_another_service(notify_api, notify_db, notify_db_session, sample_service):
|
||||
def test_add_existing_user_to_another_service_with_all_permissions(notify_api,
|
||||
notify_db,
|
||||
notify_db_session,
|
||||
sample_service,
|
||||
sample_user):
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
|
||||
@@ -489,37 +493,221 @@ def test_add_existing_user_to_another_service(notify_api, notify_db, notify_db_s
|
||||
# they must exist in db first
|
||||
save_model_user(user_to_add)
|
||||
|
||||
data = {
|
||||
'service': str(sample_service.id),
|
||||
'email_address': 'invited@digital.cabinet-office.gov.uk',
|
||||
'from_user': sample_user.id,
|
||||
'permissions': 'send_messages,manage_service,manage_api_keys'
|
||||
}
|
||||
|
||||
auth_header = create_authorization_header(
|
||||
path='/service/{}/users/{}'.format(sample_service.id, user_to_add.id),
|
||||
request_body=json.dumps(data),
|
||||
method='POST'
|
||||
)
|
||||
|
||||
resp = client.post(
|
||||
'/service/{}/users/{}'.format(sample_service.id, user_to_add.id),
|
||||
headers=[('Content-Type', 'application/json'), auth_header]
|
||||
headers=[('Content-Type', 'application/json'), auth_header],
|
||||
data=json.dumps(data)
|
||||
)
|
||||
|
||||
assert resp.status_code == 201
|
||||
|
||||
# check new user added to service
|
||||
auth_header = create_authorization_header(
|
||||
path='/service/{}/users'.format(sample_service.id),
|
||||
path='/service/{}'.format(sample_service.id),
|
||||
method='GET'
|
||||
)
|
||||
|
||||
resp = client.get(
|
||||
'/service/{}/users'.format(sample_service.id),
|
||||
headers=[('Content-Type', 'application/json'), auth_header]
|
||||
'/service/{}'.format(sample_service.id),
|
||||
headers=[('Content-Type', 'application/json'), auth_header],
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
json_resp = json.loads(resp.get_data(as_text=True))
|
||||
assert user_to_add.id in json_resp['data']['users']
|
||||
|
||||
# check user has all permissions
|
||||
auth_header = create_authorization_header(
|
||||
path=url_for('user.get_user', user_id=user_to_add.id),
|
||||
method='GET'
|
||||
)
|
||||
resp = client.get(url_for('user.get_user', user_id=user_to_add.id),
|
||||
headers=[('Content-Type', 'application/json'), auth_header])
|
||||
|
||||
assert resp.status_code == 200
|
||||
result = json.loads(resp.get_data(as_text=True))
|
||||
assert len(result['data']) == 2
|
||||
assert _user_email_in_list(result['data'], user_already_in_service.email_address)
|
||||
assert _user_email_in_list(result['data'], user_to_add.email_address)
|
||||
json_resp = json.loads(resp.get_data(as_text=True))
|
||||
permissions = json_resp['data']['permissions'][str(sample_service.id)]
|
||||
expected_permissions = ['send_texts', 'send_emails', 'send_letters', 'manage_users',
|
||||
'manage_settings', 'manage_templates', 'manage_api_keys',
|
||||
'access_developer_docs']
|
||||
assert sorted(expected_permissions) == sorted(permissions)
|
||||
|
||||
|
||||
def test_add_existing_user_to_non_existing_service_returns404(notify_api, notify_db, notify_db_session):
|
||||
def test_add_existing_user_to_another_service_with_send_permissions(notify_api,
|
||||
notify_db,
|
||||
notify_db_session,
|
||||
sample_service,
|
||||
sample_user):
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
|
||||
# they must exist in db first
|
||||
user_to_add = User(
|
||||
name='Invited User',
|
||||
email_address='invited@digital.cabinet-office.gov.uk',
|
||||
password='password',
|
||||
mobile_number='+4477123456'
|
||||
)
|
||||
save_model_user(user_to_add)
|
||||
|
||||
data = {
|
||||
'service': str(sample_service.id),
|
||||
'email_address': 'invited@digital.cabinet-office.gov.uk',
|
||||
'from_user': sample_user.id,
|
||||
'permissions': 'send_messages'
|
||||
}
|
||||
auth_header = create_authorization_header(
|
||||
path='/service/{}/users/{}'.format(sample_service.id, user_to_add.id),
|
||||
request_body=json.dumps(data),
|
||||
method='POST'
|
||||
)
|
||||
|
||||
resp = client.post(
|
||||
'/service/{}/users/{}'.format(sample_service.id, user_to_add.id),
|
||||
headers=[('Content-Type', 'application/json'), auth_header],
|
||||
data=json.dumps(data)
|
||||
)
|
||||
|
||||
assert resp.status_code == 201
|
||||
|
||||
# check user has send permissions
|
||||
auth_header = create_authorization_header(
|
||||
path=url_for('user.get_user', user_id=user_to_add.id),
|
||||
method='GET'
|
||||
)
|
||||
resp = client.get(url_for('user.get_user', user_id=user_to_add.id),
|
||||
headers=[('Content-Type', 'application/json'), auth_header])
|
||||
|
||||
assert resp.status_code == 200
|
||||
json_resp = json.loads(resp.get_data(as_text=True))
|
||||
|
||||
permissions = json_resp['data']['permissions'][str(sample_service.id)]
|
||||
expected_permissions = ['send_texts', 'send_emails', 'send_letters']
|
||||
assert sorted(expected_permissions) == sorted(permissions)
|
||||
|
||||
|
||||
def test_add_existing_user_to_another_service_with_manage_permissions(notify_api,
|
||||
notify_db,
|
||||
notify_db_session,
|
||||
sample_service,
|
||||
sample_user):
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
|
||||
# they must exist in db first
|
||||
user_to_add = User(
|
||||
name='Invited User',
|
||||
email_address='invited@digital.cabinet-office.gov.uk',
|
||||
password='password',
|
||||
mobile_number='+4477123456'
|
||||
)
|
||||
save_model_user(user_to_add)
|
||||
|
||||
data = {
|
||||
'service': str(sample_service.id),
|
||||
'email_address': 'invited@digital.cabinet-office.gov.uk',
|
||||
'from_user': sample_user.id,
|
||||
'permissions': 'manage_service'
|
||||
}
|
||||
auth_header = create_authorization_header(
|
||||
path='/service/{}/users/{}'.format(sample_service.id, user_to_add.id),
|
||||
request_body=json.dumps(data),
|
||||
method='POST'
|
||||
)
|
||||
|
||||
resp = client.post(
|
||||
'/service/{}/users/{}'.format(sample_service.id, user_to_add.id),
|
||||
headers=[('Content-Type', 'application/json'), auth_header],
|
||||
data=json.dumps(data)
|
||||
)
|
||||
|
||||
assert resp.status_code == 201
|
||||
|
||||
# check user has send permissions
|
||||
auth_header = create_authorization_header(
|
||||
path=url_for('user.get_user', user_id=user_to_add.id),
|
||||
method='GET'
|
||||
)
|
||||
resp = client.get(url_for('user.get_user', user_id=user_to_add.id),
|
||||
headers=[('Content-Type', 'application/json'), auth_header])
|
||||
|
||||
assert resp.status_code == 200
|
||||
json_resp = json.loads(resp.get_data(as_text=True))
|
||||
|
||||
permissions = json_resp['data']['permissions'][str(sample_service.id)]
|
||||
expected_permissions = ['manage_users', 'manage_settings', 'manage_templates']
|
||||
assert sorted(expected_permissions) == sorted(permissions)
|
||||
|
||||
|
||||
def test_add_existing_user_to_another_service_with_manage_api_keys(notify_api,
|
||||
notify_db,
|
||||
notify_db_session,
|
||||
sample_service,
|
||||
sample_user):
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
|
||||
# they must exist in db first
|
||||
user_to_add = User(
|
||||
name='Invited User',
|
||||
email_address='invited@digital.cabinet-office.gov.uk',
|
||||
password='password',
|
||||
mobile_number='+4477123456'
|
||||
)
|
||||
save_model_user(user_to_add)
|
||||
|
||||
data = {
|
||||
'service': str(sample_service.id),
|
||||
'email_address': 'invited@digital.cabinet-office.gov.uk',
|
||||
'from_user': sample_user.id,
|
||||
'permissions': 'manage_api_keys'
|
||||
}
|
||||
auth_header = create_authorization_header(
|
||||
path='/service/{}/users/{}'.format(sample_service.id, user_to_add.id),
|
||||
request_body=json.dumps(data),
|
||||
method='POST'
|
||||
)
|
||||
|
||||
resp = client.post(
|
||||
'/service/{}/users/{}'.format(sample_service.id, user_to_add.id),
|
||||
headers=[('Content-Type', 'application/json'), auth_header],
|
||||
data=json.dumps(data)
|
||||
)
|
||||
|
||||
assert resp.status_code == 201
|
||||
|
||||
# check user has send permissions
|
||||
auth_header = create_authorization_header(
|
||||
path=url_for('user.get_user', user_id=user_to_add.id),
|
||||
method='GET'
|
||||
)
|
||||
resp = client.get(url_for('user.get_user', user_id=user_to_add.id),
|
||||
headers=[('Content-Type', 'application/json'), auth_header])
|
||||
|
||||
assert resp.status_code == 200
|
||||
json_resp = json.loads(resp.get_data(as_text=True))
|
||||
|
||||
permissions = json_resp['data']['permissions'][str(sample_service.id)]
|
||||
expected_permissions = ['manage_api_keys', 'access_developer_docs']
|
||||
assert sorted(expected_permissions) == sorted(permissions)
|
||||
|
||||
|
||||
def test_add_existing_user_to_non_existing_service_returns404(notify_api,
|
||||
notify_db,
|
||||
notify_db_session,
|
||||
sample_user):
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
|
||||
@@ -533,14 +721,22 @@ def test_add_existing_user_to_non_existing_service_returns404(notify_api, notify
|
||||
|
||||
incorrect_id = uuid.uuid4()
|
||||
|
||||
data = {
|
||||
'service': str(incorrect_id),
|
||||
'email_address': 'invited@digital.cabinet-office.gov.uk',
|
||||
'from_user': sample_user.id,
|
||||
'permissions': 'send_messages'
|
||||
}
|
||||
auth_header = create_authorization_header(
|
||||
path='/service/{}/users/{}'.format(incorrect_id, user_to_add.id),
|
||||
request_body=json.dumps(data),
|
||||
method='POST'
|
||||
)
|
||||
|
||||
resp = client.post(
|
||||
'/service/{}/users/{}'.format(incorrect_id, user_to_add.id),
|
||||
headers=[('Content-Type', 'application/json'), auth_header]
|
||||
headers=[('Content-Type', 'application/json'), auth_header],
|
||||
data=json.dumps(data)
|
||||
)
|
||||
|
||||
result = json.loads(resp.get_data(as_text=True))
|
||||
@@ -557,14 +753,22 @@ def test_add_existing_user_of_service_to_service_returns400(notify_api, notify_d
|
||||
|
||||
existing_user_id = sample_service.users[0].id
|
||||
|
||||
data = {
|
||||
'service': str(sample_service.id),
|
||||
'email_address': 'invited@digital.cabinet-office.gov.uk',
|
||||
'from_user': 'doesnotmatter',
|
||||
'permissions': 'send_messages'
|
||||
}
|
||||
auth_header = create_authorization_header(
|
||||
path='/service/{}/users/{}'.format(sample_service.id, existing_user_id),
|
||||
request_body=json.dumps(data),
|
||||
method='POST'
|
||||
)
|
||||
|
||||
resp = client.post(
|
||||
'/service/{}/users/{}'.format(sample_service.id, existing_user_id),
|
||||
headers=[('Content-Type', 'application/json'), auth_header]
|
||||
headers=[('Content-Type', 'application/json'), auth_header],
|
||||
data=json.dumps(data)
|
||||
)
|
||||
|
||||
result = json.loads(resp.get_data(as_text=True))
|
||||
@@ -581,14 +785,22 @@ def test_add_unknown_user_to_service_returns404(notify_api, notify_db, notify_db
|
||||
|
||||
incorrect_id = 9876
|
||||
|
||||
data = {
|
||||
'service': str(sample_service.id),
|
||||
'email_address': 'invited@digital.cabinet-office.gov.uk',
|
||||
'from_user': incorrect_id,
|
||||
'permissions': 'send_messages'
|
||||
}
|
||||
auth_header = create_authorization_header(
|
||||
path='/service/{}/users/{}'.format(sample_service.id, incorrect_id),
|
||||
request_body=json.dumps(data),
|
||||
method='POST'
|
||||
)
|
||||
|
||||
resp = client.post(
|
||||
'/service/{}/users/{}'.format(sample_service.id, incorrect_id),
|
||||
headers=[('Content-Type', 'application/json'), auth_header]
|
||||
headers=[('Content-Type', 'application/json'), auth_header],
|
||||
data=json.dumps(data)
|
||||
)
|
||||
|
||||
result = json.loads(resp.get_data(as_text=True))
|
||||
@@ -597,10 +809,3 @@ def test_add_unknown_user_to_service_returns404(notify_api, notify_db, notify_db
|
||||
assert resp.status_code == 404
|
||||
assert result['result'] == 'error'
|
||||
assert result['message'] == expected_message
|
||||
|
||||
|
||||
def _user_email_in_list(user_list, email_address):
|
||||
for user in user_list:
|
||||
if user['email_address'] == email_address:
|
||||
return True
|
||||
return False
|
||||
|
||||
Reference in New Issue
Block a user