mirror of
https://github.com/GSA/notifications-api.git
synced 2026-02-04 18:31:13 -05:00
Merge pull request #119 from alphagov/update_permission_endpoints
Update permission endpoints
This commit is contained in:
@@ -1,23 +1,26 @@
|
||||
from app.dao import DAOClass
|
||||
from app.models import (Permission, Service, User)
|
||||
from app import db
|
||||
from werkzeug.datastructures import MultiDict
|
||||
from app.dao import DAOClass
|
||||
from app.models import (
|
||||
Permission,
|
||||
Service,
|
||||
User,
|
||||
MANAGE_SERVICE,
|
||||
SEND_MESSAGES,
|
||||
MANAGE_API_KEYS,
|
||||
MANAGE_TEMPLATES,
|
||||
MANAGE_TEAM,
|
||||
VIEW_ACTIVITY)
|
||||
|
||||
|
||||
# Service Permissions
|
||||
manage_service = 'manage_service'
|
||||
send_messages = 'send_messages'
|
||||
manage_api_keys = 'manage_api_keys'
|
||||
manage_templates = 'manage_templates'
|
||||
manage_team = 'manage_team'
|
||||
view_activity = 'view_activity'
|
||||
# Default permissions for a service
|
||||
default_service_permissions = [
|
||||
manage_service,
|
||||
send_messages,
|
||||
manage_api_keys,
|
||||
manage_templates,
|
||||
manage_team,
|
||||
view_activity]
|
||||
MANAGE_SERVICE,
|
||||
SEND_MESSAGES,
|
||||
MANAGE_API_KEYS,
|
||||
MANAGE_TEMPLATES,
|
||||
MANAGE_TEAM,
|
||||
VIEW_ACTIVITY]
|
||||
|
||||
|
||||
class PermissionDAO(DAOClass):
|
||||
@@ -50,5 +53,17 @@ class PermissionDAO(DAOClass):
|
||||
permission = Permission(permission=name, user=user, service=service)
|
||||
self.create_instance(permission, _commit=False)
|
||||
|
||||
def set_user_permission(self, user, permissions):
|
||||
try:
|
||||
query = self.get_query(filter_by_dict={'user': user.id})
|
||||
query.delete()
|
||||
for p in permissions:
|
||||
self.create_instance(p, _commit=False)
|
||||
except Exception as e:
|
||||
db.session.rollback()
|
||||
raise e
|
||||
else:
|
||||
db.session.commit()
|
||||
|
||||
|
||||
permission_dao = PermissionDAO()
|
||||
|
||||
@@ -272,6 +272,24 @@ class InvitedUser(db.Model):
|
||||
return self.permissions.split(',')
|
||||
|
||||
|
||||
# Service Permissions
|
||||
MANAGE_SERVICE = 'manage_service'
|
||||
SEND_MESSAGES = 'send_messages'
|
||||
MANAGE_API_KEYS = 'manage_api_keys'
|
||||
MANAGE_TEMPLATES = 'manage_templates'
|
||||
MANAGE_TEAM = 'manage_team'
|
||||
VIEW_ACTIVITY = 'view_activity'
|
||||
|
||||
# List of permissions
|
||||
PERMISSION_LIST = [
|
||||
MANAGE_SERVICE,
|
||||
SEND_MESSAGES,
|
||||
MANAGE_API_KEYS,
|
||||
MANAGE_TEMPLATES,
|
||||
MANAGE_TEAM,
|
||||
VIEW_ACTIVITY]
|
||||
|
||||
|
||||
class Permission(db.Model):
|
||||
__tablename__ = 'permissions'
|
||||
|
||||
@@ -281,7 +299,11 @@ class Permission(db.Model):
|
||||
service = db.relationship('Service')
|
||||
user_id = db.Column(db.Integer, db.ForeignKey('users.id'), index=True, nullable=False)
|
||||
user = db.relationship('User')
|
||||
permission = db.Column(db.String(255), nullable=False, unique=False)
|
||||
permission = db.Column(
|
||||
db.Enum(*PERMISSION_LIST, name='permission_types'),
|
||||
index=False,
|
||||
unique=False,
|
||||
nullable=False)
|
||||
created_at = db.Column(
|
||||
db.DateTime,
|
||||
index=False,
|
||||
|
||||
@@ -26,29 +26,3 @@ def get_permission(permission_id):
|
||||
if errors:
|
||||
abort(500, errors)
|
||||
return jsonify(data=data)
|
||||
|
||||
|
||||
@permission.route('', methods=['POST'])
|
||||
def create_permission():
|
||||
inst, errors = permission_schema.load(request.get_json())
|
||||
if errors:
|
||||
abort(400, errors)
|
||||
# Commit instance to the database
|
||||
permission_dao.create_instance(inst)
|
||||
data, errors = permission_schema.dump(inst)
|
||||
if errors:
|
||||
abort(500, errors)
|
||||
return jsonify(data=data), 201
|
||||
|
||||
|
||||
@permission.route('/<permission_id>', methods=['DELETE'])
|
||||
def delete_permission(permission_id):
|
||||
inst = permission_dao.get_query(filter_by_dict={'id': permission_id}).first()
|
||||
if not inst:
|
||||
abort(404, 'Permission not found for id: {permission_id}'.format(permission_id))
|
||||
# Generate response first
|
||||
data, errors = permission_schema.dump(inst)
|
||||
permission_dao.delete_instance(inst)
|
||||
if errors:
|
||||
abort(500, errors)
|
||||
return jsonify(data=data), 200
|
||||
|
||||
@@ -5,6 +5,7 @@ from . import ma
|
||||
from . import models
|
||||
from app.dao.permissions_dao import permission_dao
|
||||
from marshmallow import (post_load, ValidationError, validates, validates_schema)
|
||||
from marshmallow_sqlalchemy import field_for
|
||||
|
||||
mobile_regex = re.compile("^\\+44[\\d]{10}$")
|
||||
|
||||
@@ -179,6 +180,11 @@ class InvitedUserSchema(BaseSchema):
|
||||
|
||||
class PermissionSchema(BaseSchema):
|
||||
|
||||
# Override generated fields
|
||||
user = field_for(models.Permission, 'user', dump_only=True)
|
||||
service = field_for(models.Permission, 'service', dump_only=True)
|
||||
permission = field_for(models.Permission, 'permission')
|
||||
|
||||
__envelope__ = {
|
||||
'single': 'permission',
|
||||
'many': 'permissions',
|
||||
|
||||
@@ -13,11 +13,15 @@ from app.dao.users_dao import (
|
||||
get_user_by_email
|
||||
)
|
||||
|
||||
from app.dao.permissions_dao import permission_dao
|
||||
from app.dao.services_dao import dao_fetch_service_by_id
|
||||
|
||||
from app.schemas import (
|
||||
old_request_verify_code_schema,
|
||||
user_schema,
|
||||
request_verify_code_schema,
|
||||
user_schema_load_json
|
||||
user_schema_load_json,
|
||||
permission_schema
|
||||
)
|
||||
|
||||
from app.celery.tasks import (send_sms_code, send_email_code)
|
||||
@@ -162,6 +166,26 @@ def get_user(user_id=None):
|
||||
return jsonify(data=result.data)
|
||||
|
||||
|
||||
@user.route('/<int:user_id>/<service_id>/permission', methods=['POST'])
|
||||
def set_permissions(user_id, service_id):
|
||||
# TODO fix security hole, how do we verify that the user
|
||||
# who is making this request has permission to make the request.
|
||||
user = get_model_users(user_id=user_id)
|
||||
if not user:
|
||||
abort(404, 'User not found for id: {}'.format(user_id))
|
||||
service = dao_fetch_service_by_id(service_id=service_id)
|
||||
if not service:
|
||||
abort(404, 'Service not found for id: {}'.format(service_id))
|
||||
permissions, errors = permission_schema.load(request.get_json(), many=True)
|
||||
if errors:
|
||||
abort(400, errors)
|
||||
for p in permissions:
|
||||
p.user = user
|
||||
p.service = service
|
||||
permission_dao.set_user_permission(user, permissions)
|
||||
return jsonify({}), 204
|
||||
|
||||
|
||||
@user.route('/email', methods=['GET'])
|
||||
def get_by_email():
|
||||
email = request.args.get('email')
|
||||
|
||||
@@ -31,6 +31,6 @@ def upgrade():
|
||||
def downgrade():
|
||||
### commands auto generated by Alembic - please adjust! ###
|
||||
conn = op.get_bind()
|
||||
conn.execute("DELETE FROM permissions where permission='manage_templates")
|
||||
conn.execute("DELETE FROM permissions where permission='manage_templates'")
|
||||
|
||||
### end Alembic commands ###
|
||||
48
migrations/versions/0032_update_permission_to_enum.py
Normal file
48
migrations/versions/0032_update_permission_to_enum.py
Normal file
@@ -0,0 +1,48 @@
|
||||
"""empty message
|
||||
|
||||
Revision ID: 0032_update_permission_to_enum
|
||||
Revises: 0031_add_manage_team_permission
|
||||
Create Date: 2016-03-01 17:08:12.184393
|
||||
|
||||
"""
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = '0032_update_permission_to_enum'
|
||||
down_revision = '0031_add_manage_team_permission'
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
|
||||
def upgrade():
|
||||
### commands auto generated by Alembic - please adjust! ###
|
||||
conn = op.get_bind()
|
||||
permissions = conn.execute("SELECT id, permission FROM permissions").fetchall()
|
||||
op.drop_constraint('uix_service_user_permission', 'permissions', type_='unique')
|
||||
op.drop_column('permissions', 'permission')
|
||||
permission_types = sa.Enum('manage_service', 'send_messages', 'manage_api_keys', 'manage_templates', 'manage_team', 'view_activity', name='permission_types')
|
||||
permission_types.create(op.get_bind())
|
||||
op.add_column('permissions', sa.Column('permission', permission_types, nullable=True))
|
||||
for p in permissions:
|
||||
conn.execute("UPDATE permissions SET permission='{}' WHERE id='{}'".format(str(p[1]), str(p[0])))
|
||||
op.create_unique_constraint('uix_service_user_permission', 'permissions', ['service_id', 'user_id', 'permission'])
|
||||
op.alter_column('permissions', 'permission', nullable=False)
|
||||
### end Alembic commands ###
|
||||
|
||||
|
||||
def downgrade():
|
||||
### commands auto generated by Alembic - please adjust! ###
|
||||
conn = op.get_bind()
|
||||
permissions = conn.execute("SELECT id, permission FROM permissions").fetchall()
|
||||
op.drop_constraint('uix_service_user_permission', 'permissions', type_='unique')
|
||||
op.drop_column('permissions', 'permission')
|
||||
try:
|
||||
sa.Enum(name='permission_types').drop(conn, checkfirst=False)
|
||||
except:
|
||||
pass
|
||||
op.add_column('permissions', sa.Column('permission', sa.VARCHAR(length=255), autoincrement=False, nullable=True))
|
||||
for p in permissions:
|
||||
conn.execute("UPDATE permissions SET permission='{}' WHERE ID='{}'".format(str(p[1]), str(p[0])))
|
||||
op.create_unique_constraint('uix_service_user_permission', 'permissions', ['service_id', 'user_id', 'permission'])
|
||||
op.alter_column('permissions', 'permission', nullable=False)
|
||||
### end Alembic commands ###
|
||||
@@ -334,7 +334,7 @@ def sample_permission(notify_db,
|
||||
notify_db_session,
|
||||
service=None,
|
||||
user=None,
|
||||
permission="sample permission"):
|
||||
permission="manage_service"):
|
||||
if user is None:
|
||||
user = sample_user(notify_db, notify_db_session)
|
||||
data = {
|
||||
@@ -343,9 +343,14 @@ def sample_permission(notify_db,
|
||||
}
|
||||
if service:
|
||||
data['service'] = service
|
||||
p_model = Permission(**data)
|
||||
db.session.add(p_model)
|
||||
db.session.commit()
|
||||
p_model = Permission.query.filter_by(
|
||||
user=user,
|
||||
service=service,
|
||||
permission=permission).first()
|
||||
if not p_model:
|
||||
p_model = Permission(**data)
|
||||
db.session.add(p_model)
|
||||
db.session.commit()
|
||||
return p_model
|
||||
|
||||
|
||||
@@ -354,7 +359,7 @@ def sample_service_permission(notify_db,
|
||||
notify_db_session,
|
||||
service=None,
|
||||
user=None,
|
||||
permission="sample permission"):
|
||||
permission="manage_service"):
|
||||
if user is None:
|
||||
user = sample_user(notify_db, notify_db_session)
|
||||
if service is None:
|
||||
@@ -364,7 +369,12 @@ def sample_service_permission(notify_db,
|
||||
'service': service,
|
||||
'permission': permission
|
||||
}
|
||||
p_model = Permission(**data)
|
||||
db.session.add(p_model)
|
||||
db.session.commit()
|
||||
p_model = Permission.query.filter_by(
|
||||
user=user,
|
||||
service=service,
|
||||
permission=permission).first()
|
||||
if not p_model:
|
||||
p_model = Permission(**data)
|
||||
db.session.add(p_model)
|
||||
db.session.commit()
|
||||
return p_model
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import json
|
||||
from flask import url_for
|
||||
from app.models import Permission
|
||||
from tests import create_authorization_header
|
||||
from ..conftest import sample_permission as create_permission
|
||||
|
||||
@@ -39,12 +40,6 @@ def test_get_permission_filter(notify_api,
|
||||
"""
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
another_permission = create_permission(
|
||||
notify_db,
|
||||
notify_db_session,
|
||||
user=sample_user,
|
||||
service=sample_service,
|
||||
permission="another permission")
|
||||
header = create_authorization_header(
|
||||
path=url_for('permission.get_permissions'),
|
||||
method='GET')
|
||||
@@ -53,6 +48,8 @@ def test_get_permission_filter(notify_api,
|
||||
headers=[header])
|
||||
assert response.status_code == 200
|
||||
json_resp = json.loads(response.get_data(as_text=True))
|
||||
another_permission = Permission.query.filter_by(
|
||||
service_id=str(sample_service.id)).first()
|
||||
expected = {
|
||||
"permission": another_permission.permission,
|
||||
"user": sample_user.id,
|
||||
@@ -100,78 +97,3 @@ def test_get_permission_404(notify_api, notify_db, notify_db_session, sample_per
|
||||
assert response.status_code == 404
|
||||
json_resp = json.loads(response.get_data(as_text=True))
|
||||
assert json_resp['message'] == 'No result found'
|
||||
|
||||
|
||||
def test_create_permission(notify_api, notify_db, notify_db_session, sample_user, sample_service):
|
||||
"""
|
||||
Tests POST endpoint '/' to create a single permission.
|
||||
"""
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
permission_name = "new permission"
|
||||
data = json.dumps({
|
||||
'user': sample_user.id,
|
||||
'service': str(sample_service.id),
|
||||
'permission': permission_name})
|
||||
auth_header = create_authorization_header(
|
||||
path=url_for('permission.create_permission'),
|
||||
method='POST',
|
||||
request_body=data)
|
||||
headers = [('Content-Type', 'application/json'), auth_header]
|
||||
response = client.post(
|
||||
url_for('permission.create_permission'),
|
||||
data=data,
|
||||
headers=headers)
|
||||
assert response.status_code == 201
|
||||
json_resp = json.loads(response.get_data(as_text=True))
|
||||
assert permission_name == json_resp['data']['permission']
|
||||
assert str(sample_service.id) == json_resp['data']['service']
|
||||
assert sample_user.id == json_resp['data']['user']
|
||||
|
||||
|
||||
def test_create_permission_no_service(notify_api, notify_db, notify_db_session, sample_user):
|
||||
"""
|
||||
Tests POST endpoint '/' to create a single permission.
|
||||
"""
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
permission_name = "new permission"
|
||||
data = json.dumps({
|
||||
'user': sample_user.id,
|
||||
'permission': permission_name})
|
||||
auth_header = create_authorization_header(
|
||||
path=url_for('permission.create_permission'),
|
||||
method='POST',
|
||||
request_body=data)
|
||||
headers = [('Content-Type', 'application/json'), auth_header]
|
||||
response = client.post(
|
||||
url_for('permission.create_permission'),
|
||||
data=data,
|
||||
headers=headers)
|
||||
assert response.status_code == 201
|
||||
json_resp = json.loads(response.get_data(as_text=True))
|
||||
assert permission_name == json_resp['data']['permission']
|
||||
assert sample_user.id == json_resp['data']['user']
|
||||
|
||||
|
||||
def test_delete_permission(notify_api, notify_db, notify_db_session, sample_permission):
|
||||
"""
|
||||
Tests DELETE endpoint '/' to delete a permission.
|
||||
"""
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
header = create_authorization_header(
|
||||
path=url_for('permission.delete_permission', permission_id=sample_permission.id),
|
||||
method='DELETE')
|
||||
response = client.delete(
|
||||
url_for('permission.delete_permission', permission_id=sample_permission.id),
|
||||
headers=[header])
|
||||
assert response.status_code == 200
|
||||
json_resp = json.loads(response.get_data(as_text=True))
|
||||
expected = {
|
||||
"permission": sample_permission.permission,
|
||||
"user": sample_permission.user.id,
|
||||
"id": str(sample_permission.id),
|
||||
"service": None
|
||||
}
|
||||
assert expected == json_resp['data']
|
||||
|
||||
@@ -2,7 +2,8 @@ import json
|
||||
|
||||
from flask import url_for
|
||||
|
||||
from app.models import (User)
|
||||
from app.models import (User, Permission, MANAGE_SERVICE, MANAGE_TEMPLATES)
|
||||
from app import db
|
||||
from tests import create_authorization_header
|
||||
|
||||
|
||||
@@ -336,3 +337,99 @@ def test_get_user_with_permissions(notify_api,
|
||||
assert response.status_code == 200
|
||||
permissions = json.loads(response.get_data(as_text=True))['data']['permissions']
|
||||
assert sample_service_permission.permission in permissions[str(sample_service_permission.service.id)]
|
||||
|
||||
|
||||
def test_set_user_permissions(notify_api,
|
||||
notify_db,
|
||||
notify_db_session,
|
||||
sample_user,
|
||||
sample_service):
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
data = json.dumps([{'permission': MANAGE_SERVICE}])
|
||||
header = create_authorization_header(
|
||||
path=url_for(
|
||||
'user.set_permissions',
|
||||
user_id=sample_user.id,
|
||||
service_id=str(sample_service.id)),
|
||||
method='POST',
|
||||
request_body=data)
|
||||
headers = [('Content-Type', 'application/json'), header]
|
||||
response = client.post(
|
||||
url_for(
|
||||
'user.set_permissions',
|
||||
user_id=sample_user.id,
|
||||
service_id=str(sample_service.id)),
|
||||
headers=headers,
|
||||
data=data)
|
||||
|
||||
assert response.status_code == 204
|
||||
permission = Permission.query.filter_by(permission=MANAGE_SERVICE).first()
|
||||
assert permission.user == sample_user
|
||||
assert permission.service == sample_service
|
||||
assert permission.permission == MANAGE_SERVICE
|
||||
|
||||
|
||||
def test_set_user_permissions_multiple(notify_api,
|
||||
notify_db,
|
||||
notify_db_session,
|
||||
sample_user,
|
||||
sample_service):
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
data = json.dumps([{'permission': MANAGE_SERVICE}, {'permission': MANAGE_TEMPLATES}])
|
||||
header = create_authorization_header(
|
||||
path=url_for(
|
||||
'user.set_permissions',
|
||||
user_id=sample_user.id,
|
||||
service_id=str(sample_service.id)),
|
||||
method='POST',
|
||||
request_body=data)
|
||||
headers = [('Content-Type', 'application/json'), header]
|
||||
response = client.post(
|
||||
url_for(
|
||||
'user.set_permissions',
|
||||
user_id=sample_user.id,
|
||||
service_id=str(sample_service.id)),
|
||||
headers=headers,
|
||||
data=data)
|
||||
|
||||
assert response.status_code == 204
|
||||
permission = Permission.query.filter_by(permission=MANAGE_SERVICE).first()
|
||||
assert permission.user == sample_user
|
||||
assert permission.service == sample_service
|
||||
assert permission.permission == MANAGE_SERVICE
|
||||
permission = Permission.query.filter_by(permission=MANAGE_TEMPLATES).first()
|
||||
assert permission.user == sample_user
|
||||
assert permission.service == sample_service
|
||||
assert permission.permission == MANAGE_TEMPLATES
|
||||
|
||||
|
||||
def test_set_user_permissions_remove_old(notify_api,
|
||||
notify_db,
|
||||
notify_db_session,
|
||||
sample_user,
|
||||
sample_service):
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
data = json.dumps([{'permission': MANAGE_SERVICE}])
|
||||
header = create_authorization_header(
|
||||
path=url_for(
|
||||
'user.set_permissions',
|
||||
user_id=sample_user.id,
|
||||
service_id=str(sample_service.id)),
|
||||
method='POST',
|
||||
request_body=data)
|
||||
headers = [('Content-Type', 'application/json'), header]
|
||||
response = client.post(
|
||||
url_for(
|
||||
'user.set_permissions',
|
||||
user_id=sample_user.id,
|
||||
service_id=str(sample_service.id)),
|
||||
headers=headers,
|
||||
data=data)
|
||||
|
||||
assert response.status_code == 204
|
||||
query = Permission.query.filter_by(user=sample_user)
|
||||
assert query.count() == 1
|
||||
assert query.first().permission == MANAGE_SERVICE
|
||||
|
||||
Reference in New Issue
Block a user