Functionality added and all tests working.

This commit is contained in:
Nicholas Staples
2016-03-01 14:21:28 +00:00
parent 4e678ac391
commit 918d40cc9d
7 changed files with 174 additions and 112 deletions

View File

@@ -1,15 +1,22 @@
from app.dao import DAOClass
from app.models import (Permission, Service, User)
from app import db
from werkzeug.datastructures import MultiDict
from app.dao import DAOClass
from app.models import (
Permission,
Service,
User,
MANAGE_SERVICE,
SEND_MESSAGES,
MANAGE_API_KEYS,
MANAGE_TEMPLATES)
# Service Permissions
manage_service = 'manage_service'
send_messages = 'send_messages'
manage_api_keys = 'manage_api_keys'
manage_templates = 'manage_templates'
# Default permissions for a service
default_service_permissions = [manage_service, send_messages, manage_api_keys, manage_templates]
default_service_permissions = [
MANAGE_SERVICE,
SEND_MESSAGES,
MANAGE_API_KEYS,
MANAGE_TEMPLATES]
class PermissionDAO(DAOClass):
@@ -42,5 +49,17 @@ class PermissionDAO(DAOClass):
permission = Permission(permission=name, user=user, service=service)
self.create_instance(permission, _commit=False)
def set_user_permission(self, user, permissions):
try:
query = self.get_query(filter_by_dict={'user': user.id})
query.delete()
for p in permissions:
self.create_instance(p, _commit=False)
except Exception as e:
db.session.rollback()
raise e
else:
db.session.commit()
permission_dao = PermissionDAO()

View File

@@ -272,6 +272,20 @@ class InvitedUser(db.Model):
return self.permissions.split(',')
# Service Permissions
MANAGE_SERVICE = 'manage_service'
SEND_MESSAGES = 'send_messages'
MANAGE_API_KEYS = 'manage_api_keys'
MANAGE_TEMPLATES = 'manage_templates'
# List of permissions
PERMISSION_LIST = [
MANAGE_SERVICE,
SEND_MESSAGES,
MANAGE_API_KEYS,
MANAGE_TEMPLATES]
class Permission(db.Model):
__tablename__ = 'permissions'
@@ -281,7 +295,10 @@ class Permission(db.Model):
service = db.relationship('Service')
user_id = db.Column(db.Integer, db.ForeignKey('users.id'), index=True, nullable=False)
user = db.relationship('User')
permission = db.Column(db.String(255), nullable=False, unique=False)
permission = db.Column(db.Enum(*PERMISSION_LIST, name='permission_types'),
index=False,
unique=False,
nullable=False)
created_at = db.Column(
db.DateTime,
index=False,

View File

@@ -26,29 +26,3 @@ def get_permission(permission_id):
if errors:
abort(500, errors)
return jsonify(data=data)
@permission.route('', methods=['POST'])
def create_permission():
inst, errors = permission_schema.load(request.get_json())
if errors:
abort(400, errors)
# Commit instance to the database
permission_dao.create_instance(inst)
data, errors = permission_schema.dump(inst)
if errors:
abort(500, errors)
return jsonify(data=data), 201
@permission.route('/<permission_id>', methods=['DELETE'])
def delete_permission(permission_id):
inst = permission_dao.get_query(filter_by_dict={'id': permission_id}).first()
if not inst:
abort(404, 'Permission not found for id: {permission_id}'.format(permission_id))
# Generate response first
data, errors = permission_schema.dump(inst)
permission_dao.delete_instance(inst)
if errors:
abort(500, errors)
return jsonify(data=data), 200

View File

@@ -5,6 +5,7 @@ from . import ma
from . import models
from app.dao.permissions_dao import permission_dao
from marshmallow import (post_load, ValidationError, validates, validates_schema)
from marshmallow_sqlalchemy import field_for
mobile_regex = re.compile("^\\+44[\\d]{10}$")
@@ -178,6 +179,11 @@ class InvitedUserSchema(BaseSchema):
class PermissionSchema(BaseSchema):
# Override generated fields
user = field_for(models.Permission, 'user', dump_only=True)
service = field_for(models.Permission, 'service', dump_only=True)
permission = field_for(models.Permission, 'permission')
__envelope__ = {
'single': 'permission',
'many': 'permissions',

View File

@@ -13,11 +13,15 @@ from app.dao.users_dao import (
get_user_by_email
)
from app.dao.permissions_dao import permission_dao
from app.dao.services_dao import dao_fetch_service_by_id
from app.schemas import (
old_request_verify_code_schema,
user_schema,
request_verify_code_schema,
user_schema_load_json
user_schema_load_json,
permission_schema
)
from app.celery.tasks import (send_sms_code, send_email_code)
@@ -194,6 +198,26 @@ def get_user(user_id=None):
return jsonify(data=result.data)
@user.route('/<int:user_id>/<service_id>/permission', methods=['POST'])
def set_permissions(user_id, service_id):
# TODO fix security hole, how do we verify that the user
# who is making this request has permission to make the request.
user = get_model_users(user_id=user_id)
if not user:
abort(404, 'User not found for id: {}'.format(user_id))
service = dao_fetch_service_by_id(service_id=service_id)
if not service:
abort(404, 'Service not found for id: {}'.format(service_id))
permissions, errors = permission_schema.load(request.get_json(), many=True)
if errors:
abort(400, errors)
for p in permissions:
p.user = user
p.service = service
permission_dao.set_user_permission(user, permissions)
return jsonify({}), 204
@user.route('/email', methods=['GET'])
def get_by_email():
email = request.args.get('email')

View File

@@ -100,78 +100,3 @@ def test_get_permission_404(notify_api, notify_db, notify_db_session, sample_per
assert response.status_code == 404
json_resp = json.loads(response.get_data(as_text=True))
assert json_resp['message'] == 'No result found'
def test_create_permission(notify_api, notify_db, notify_db_session, sample_user, sample_service):
"""
Tests POST endpoint '/' to create a single permission.
"""
with notify_api.test_request_context():
with notify_api.test_client() as client:
permission_name = "new permission"
data = json.dumps({
'user': sample_user.id,
'service': str(sample_service.id),
'permission': permission_name})
auth_header = create_authorization_header(
path=url_for('permission.create_permission'),
method='POST',
request_body=data)
headers = [('Content-Type', 'application/json'), auth_header]
response = client.post(
url_for('permission.create_permission'),
data=data,
headers=headers)
assert response.status_code == 201
json_resp = json.loads(response.get_data(as_text=True))
assert permission_name == json_resp['data']['permission']
assert str(sample_service.id) == json_resp['data']['service']
assert sample_user.id == json_resp['data']['user']
def test_create_permission_no_service(notify_api, notify_db, notify_db_session, sample_user):
"""
Tests POST endpoint '/' to create a single permission.
"""
with notify_api.test_request_context():
with notify_api.test_client() as client:
permission_name = "new permission"
data = json.dumps({
'user': sample_user.id,
'permission': permission_name})
auth_header = create_authorization_header(
path=url_for('permission.create_permission'),
method='POST',
request_body=data)
headers = [('Content-Type', 'application/json'), auth_header]
response = client.post(
url_for('permission.create_permission'),
data=data,
headers=headers)
assert response.status_code == 201
json_resp = json.loads(response.get_data(as_text=True))
assert permission_name == json_resp['data']['permission']
assert sample_user.id == json_resp['data']['user']
def test_delete_permission(notify_api, notify_db, notify_db_session, sample_permission):
"""
Tests DELETE endpoint '/' to delete a permission.
"""
with notify_api.test_request_context():
with notify_api.test_client() as client:
header = create_authorization_header(
path=url_for('permission.delete_permission', permission_id=sample_permission.id),
method='DELETE')
response = client.delete(
url_for('permission.delete_permission', permission_id=sample_permission.id),
headers=[header])
assert response.status_code == 200
json_resp = json.loads(response.get_data(as_text=True))
expected = {
"permission": sample_permission.permission,
"user": sample_permission.user.id,
"id": str(sample_permission.id),
"service": None
}
assert expected == json_resp['data']

View File

@@ -2,7 +2,8 @@ import json
from flask import url_for
from app.models import (User)
from app.models import (User, Permission, MANAGE_SERVICE, MANAGE_TEMPLATES)
from app import db
from tests import create_authorization_header
@@ -328,3 +329,99 @@ def test_get_user_with_permissions(notify_api,
assert response.status_code == 200
permissions = json.loads(response.get_data(as_text=True))['data']['permissions']
assert sample_service_permission.permission in permissions[str(sample_service_permission.service.id)]
def test_set_user_permissions(notify_api,
notify_db,
notify_db_session,
sample_user,
sample_service):
with notify_api.test_request_context():
with notify_api.test_client() as client:
data = json.dumps([{'permission': MANAGE_SERVICE}])
header = create_authorization_header(
path=url_for(
'user.set_permissions',
user_id=sample_user.id,
service_id=str(sample_service.id)),
method='POST',
request_body=data)
headers = [('Content-Type', 'application/json'), header]
response = client.post(
url_for(
'user.set_permissions',
user_id=sample_user.id,
service_id=str(sample_service.id)),
headers=headers,
data=data)
assert response.status_code == 204
permission = Permission.query.filter_by(permission=MANAGE_SERVICE).first()
assert permission.user == sample_user
assert permission.service == sample_service
assert permission.permission == MANAGE_SERVICE
def test_set_user_permissions_multiple(notify_api,
notify_db,
notify_db_session,
sample_user,
sample_service):
with notify_api.test_request_context():
with notify_api.test_client() as client:
data = json.dumps([{'permission': MANAGE_SERVICE}, {'permission': MANAGE_TEMPLATES}])
header = create_authorization_header(
path=url_for(
'user.set_permissions',
user_id=sample_user.id,
service_id=str(sample_service.id)),
method='POST',
request_body=data)
headers = [('Content-Type', 'application/json'), header]
response = client.post(
url_for(
'user.set_permissions',
user_id=sample_user.id,
service_id=str(sample_service.id)),
headers=headers,
data=data)
assert response.status_code == 204
permission = Permission.query.filter_by(permission=MANAGE_SERVICE).first()
assert permission.user == sample_user
assert permission.service == sample_service
assert permission.permission == MANAGE_SERVICE
permission = Permission.query.filter_by(permission=MANAGE_TEMPLATES).first()
assert permission.user == sample_user
assert permission.service == sample_service
assert permission.permission == MANAGE_TEMPLATES
def test_set_user_permissions_remove_old(notify_api,
notify_db,
notify_db_session,
sample_user,
sample_service):
with notify_api.test_request_context():
with notify_api.test_client() as client:
data = json.dumps([{'permission': MANAGE_SERVICE}])
header = create_authorization_header(
path=url_for(
'user.set_permissions',
user_id=sample_user.id,
service_id=str(sample_service.id)),
method='POST',
request_body=data)
headers = [('Content-Type', 'application/json'), header]
response = client.post(
url_for(
'user.set_permissions',
user_id=sample_user.id,
service_id=str(sample_service.id)),
headers=headers,
data=data)
assert response.status_code == 204
query = Permission.query.filter_by(user=sample_user)
assert query.count() == 1
assert query.first().permission == MANAGE_SERVICE