mirror of
https://github.com/GSA/notifications-api.git
synced 2025-12-22 16:31:15 -05:00
Working permissions and all tests passing.
Remove print statements. Fix for review comments.
This commit is contained in:
@@ -47,6 +47,7 @@ def create_app():
|
||||
from app.job.rest import job as job_blueprint
|
||||
from app.notifications.rest import notifications as notifications_blueprint
|
||||
from app.invite.rest import invite as invite_blueprint
|
||||
from app.permission.rest import permission as permission_blueprint
|
||||
|
||||
application.register_blueprint(service_blueprint, url_prefix='/service')
|
||||
application.register_blueprint(user_blueprint, url_prefix='/user')
|
||||
@@ -55,6 +56,7 @@ def create_app():
|
||||
application.register_blueprint(notifications_blueprint)
|
||||
application.register_blueprint(job_blueprint)
|
||||
application.register_blueprint(invite_blueprint)
|
||||
application.register_blueprint(permission_blueprint, url_prefix='/permission')
|
||||
|
||||
return application
|
||||
|
||||
|
||||
@@ -1,6 +1,54 @@
|
||||
from sqlalchemy.exc import SQLAlchemyError
|
||||
from werkzeug.datastructures import MultiDict
|
||||
from sqlalchemy.orm.relationships import RelationshipProperty
|
||||
from app import db
|
||||
|
||||
|
||||
# Should I use SQLAlchemyError?
|
||||
class DAOException(SQLAlchemyError):
|
||||
pass
|
||||
|
||||
|
||||
class DAOClass(object):
|
||||
|
||||
class Meta:
|
||||
model = None
|
||||
|
||||
def create_instance(self, inst):
|
||||
db.session.add(inst)
|
||||
db.session.commit()
|
||||
|
||||
def update_instance(self, inst, update_dict):
|
||||
# Make sure the id is not included in the update_dict
|
||||
update_dict.pop('id')
|
||||
self.Meta.model.query.filter_by(id=inst.id).update(update_dict)
|
||||
db.session.commit()
|
||||
|
||||
def get_query(self, filter_by_dict={}):
|
||||
if isinstance(filter_by_dict, dict):
|
||||
filter_by_dict = MultiDict(filter_by_dict)
|
||||
query = self.Meta.model.query
|
||||
for k in filter_by_dict.keys():
|
||||
query = self._build_query(query, k, filter_by_dict.getlist(k))
|
||||
return query
|
||||
|
||||
def delete_instance(self, inst):
|
||||
db.session.delete(inst)
|
||||
db.session.commit()
|
||||
|
||||
def _build_query(self, query, key, values):
|
||||
# TODO Lots to do here to work with all types of filters.
|
||||
field = getattr(self.Meta.model, key, None)
|
||||
filters = getattr(self.Meta, 'filter', [key])
|
||||
if field and key in filters:
|
||||
if isinstance(field.property, RelationshipProperty):
|
||||
if len(values) == 1:
|
||||
query = query.filter_by(**{key: field.property.mapper.class_.query.get(values[0])})
|
||||
elif len(values) > 1:
|
||||
query = query.filter(field.in_(field.property.mapper.class_.query.any(values[0])))
|
||||
else:
|
||||
if len(values) == 1:
|
||||
query = query.filter_by(**{key: values[0]})
|
||||
elif len(values) > 1:
|
||||
query = query.filter(field.in_(values))
|
||||
return query
|
||||
|
||||
11
app/dao/permissions_dao.py
Normal file
11
app/dao/permissions_dao.py
Normal file
@@ -0,0 +1,11 @@
|
||||
from app.dao import DAOClass
|
||||
from app.models import Permission
|
||||
|
||||
|
||||
class PermissionDAO(DAOClass):
|
||||
|
||||
class Meta:
|
||||
model = Permission
|
||||
|
||||
|
||||
permission_dao = PermissionDAO()
|
||||
@@ -10,7 +10,11 @@ def register_errors(blueprint):
|
||||
|
||||
@blueprint.app_errorhandler(400)
|
||||
def bad_request(e):
|
||||
return jsonify(result='error', message=str(e.description)), 400
|
||||
if isinstance(e, str):
|
||||
msg = e
|
||||
else:
|
||||
msg = e.description or "Invalid request parameters"
|
||||
return jsonify(result='error', message=str(msg)), 400
|
||||
|
||||
@blueprint.app_errorhandler(401)
|
||||
def unauthorized(e):
|
||||
@@ -24,7 +28,11 @@ def register_errors(blueprint):
|
||||
|
||||
@blueprint.app_errorhandler(404)
|
||||
def page_not_found(e):
|
||||
return jsonify(result='error', message=e.description or "Not found"), 404
|
||||
if isinstance(e, str):
|
||||
msg = e
|
||||
else:
|
||||
msg = e.description or "Not found"
|
||||
return jsonify(result='error', message=msg), 404
|
||||
|
||||
@blueprint.app_errorhandler(429)
|
||||
def limit_exceeded(e):
|
||||
@@ -32,6 +40,9 @@ def register_errors(blueprint):
|
||||
|
||||
@blueprint.app_errorhandler(500)
|
||||
def internal_server_error(e):
|
||||
if isinstance(e, str):
|
||||
current_app.logger.error(e)
|
||||
elif isinstance(e, Exception):
|
||||
current_app.logger.exception(e)
|
||||
return jsonify(result='error', message="Internal server error"), 500
|
||||
|
||||
|
||||
@@ -10,10 +10,7 @@ from app.dao.invited_user_dao import (
|
||||
get_invited_users_for_service
|
||||
)
|
||||
|
||||
from app.schemas import (
|
||||
invited_user_schema,
|
||||
invited_users_schema
|
||||
)
|
||||
from app.schemas import invited_user_schema
|
||||
|
||||
invite = Blueprint('invite', __name__, url_prefix='/service/<service_id>/invite')
|
||||
|
||||
@@ -33,7 +30,7 @@ def create_invited_user(service_id):
|
||||
@invite.route('', methods=['GET'])
|
||||
def get_invited_users_by_service(service_id):
|
||||
invited_users = get_invited_users_for_service(service_id)
|
||||
return jsonify(data=invited_users_schema.dump(invited_users).data), 200
|
||||
return jsonify(data=invited_user_schema.dump(invited_users, many=True).data), 200
|
||||
|
||||
|
||||
@invite.route('/<invited_user_id>', methods=['GET'])
|
||||
|
||||
@@ -15,10 +15,7 @@ from app.dao.services_dao import (
|
||||
dao_fetch_service_by_id
|
||||
)
|
||||
|
||||
from app.schemas import (
|
||||
job_schema,
|
||||
jobs_schema
|
||||
)
|
||||
from app.schemas import job_schema
|
||||
|
||||
from app.celery.tasks import process_job
|
||||
|
||||
@@ -41,7 +38,7 @@ def get_job_by_service_and_job_id(service_id, job_id):
|
||||
@job.route('', methods=['GET'])
|
||||
def get_jobs_by_service(service_id):
|
||||
jobs = dao_get_jobs_by_service_id(service_id)
|
||||
data, errors = jobs_schema.dump(jobs)
|
||||
data, errors = job_schema.dump(jobs, many=True)
|
||||
return jsonify(data=data)
|
||||
|
||||
|
||||
|
||||
@@ -40,7 +40,6 @@ class User(db.Model):
|
||||
logged_in_at = db.Column(db.DateTime, nullable=True)
|
||||
failed_login_count = db.Column(db.Integer, nullable=False, default=0)
|
||||
state = db.Column(db.String, nullable=False, default='pending')
|
||||
permissions = db.Column("permissions", ARRAY(db.String))
|
||||
|
||||
@property
|
||||
def password(self):
|
||||
@@ -265,3 +264,25 @@ class InvitedUser(db.Model):
|
||||
default=datetime.datetime.now)
|
||||
status = db.Column(
|
||||
db.Enum(*INVITED_USER_STATUS_TYPES, name='invited_users_status_types'), nullable=False, default='pending')
|
||||
|
||||
|
||||
class Permission(db.Model):
|
||||
__tablename__ = 'permissions'
|
||||
|
||||
id = db.Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
|
||||
# Service id is optional, if the service is omitted we will assume the permission is not service specific.
|
||||
service_id = db.Column(UUID(as_uuid=True), db.ForeignKey('services.id'), index=True, unique=False, nullable=True)
|
||||
service = db.relationship('Service')
|
||||
user_id = db.Column(db.Integer, db.ForeignKey('users.id'), index=True, nullable=False)
|
||||
user = db.relationship('User')
|
||||
permission = db.Column(db.String(255), nullable=False, unique=False)
|
||||
created_at = db.Column(
|
||||
db.DateTime,
|
||||
index=False,
|
||||
unique=False,
|
||||
nullable=False,
|
||||
default=datetime.datetime.now)
|
||||
|
||||
__table_args__ = (
|
||||
UniqueConstraint('service_id', 'user_id', 'permission', name='uix_service_user_permission'),
|
||||
)
|
||||
|
||||
0
app/permission/__init__.py
Normal file
0
app/permission/__init__.py
Normal file
54
app/permission/rest.py
Normal file
54
app/permission/rest.py
Normal file
@@ -0,0 +1,54 @@
|
||||
|
||||
from flask import (jsonify, request, abort, Blueprint, current_app)
|
||||
from app.schemas import permission_schema
|
||||
from app.errors import register_errors
|
||||
from app.dao.permissions_dao import permission_dao
|
||||
|
||||
permission = Blueprint('permission', __name__)
|
||||
register_errors(permission)
|
||||
|
||||
|
||||
@permission.route('', methods=['GET'])
|
||||
def get_permissions():
|
||||
data, errors = permission_schema.dump(
|
||||
permission_dao.get_query(filter_by_dict=request.args), many=True)
|
||||
if errors:
|
||||
abort(500, errors)
|
||||
return jsonify(data=data)
|
||||
|
||||
|
||||
@permission.route('/<permission_id>', methods=['GET'])
|
||||
def get_permission(permission_id):
|
||||
inst = permission_dao.get_query(filter_by_dict={'id': permission_id}).first()
|
||||
if not inst:
|
||||
abort(404, 'Permission not found for id: {permission_id}'.format(permission_id))
|
||||
data, errors = permission_schema.dump(inst)
|
||||
if errors:
|
||||
abort(500, errors)
|
||||
return jsonify(data=data)
|
||||
|
||||
|
||||
@permission.route('', methods=['POST'])
|
||||
def create_permission():
|
||||
inst, errors = permission_schema.load(request.get_json())
|
||||
if errors:
|
||||
abort(400, errors)
|
||||
# Commit instance to the database
|
||||
permission_dao.create_instance(inst)
|
||||
data, errors = permission_schema.dump(inst)
|
||||
if errors:
|
||||
abort(500, errors)
|
||||
return jsonify(data=data), 201
|
||||
|
||||
|
||||
@permission.route('/<permission_id>', methods=['DELETE'])
|
||||
def delete_permission(permission_id):
|
||||
inst = permission_dao.get_query(filter_by_dict={'id': permission_id}).first()
|
||||
if not inst:
|
||||
abort(404, 'Permission not found for id: {permission_id}'.format(permission_id))
|
||||
# Generate response first
|
||||
data, errors = permission_schema.dump(inst)
|
||||
permission_dao.delete_instance(inst)
|
||||
if errors:
|
||||
abort(500, errors)
|
||||
return jsonify(data=data), 200
|
||||
@@ -21,6 +21,30 @@ class BaseSchema(ma.ModelSchema):
|
||||
self.load_json = load_json
|
||||
super(BaseSchema, self).__init__(*args, **kwargs)
|
||||
|
||||
__envelope__ = {
|
||||
'single': None,
|
||||
'many': None
|
||||
}
|
||||
|
||||
def get_envelope_key(self, many):
|
||||
"""Helper to get the envelope key."""
|
||||
key = self.__envelope__['many'] if many else self.__envelope__['single']
|
||||
assert key is not None, "Envelope key undefined"
|
||||
return key
|
||||
|
||||
# Code to envelope the input and response.
|
||||
# TOBE added soon.
|
||||
|
||||
# @pre_load(pass_many=True)
|
||||
# def unwrap_envelope(self, data, many):
|
||||
# key = self.get_envelope_key(many)
|
||||
# return data[key]
|
||||
|
||||
# @post_dump(pass_many=True)
|
||||
# def wrap_with_envelope(self, data, many):
|
||||
# key = self.get_envelope_key(many)
|
||||
# return {key: data}
|
||||
|
||||
@post_load
|
||||
def make_instance(self, data):
|
||||
"""Deserialize data to an instance of the model. Update an existing row
|
||||
@@ -138,21 +162,28 @@ class InvitedUserSchema(BaseSchema):
|
||||
raise ValidationError('Invalid email')
|
||||
|
||||
|
||||
class PermissionSchema(BaseSchema):
|
||||
|
||||
__envelope__ = {
|
||||
'single': 'permission',
|
||||
'many': 'permissions',
|
||||
}
|
||||
|
||||
class Meta:
|
||||
model = models.Permission
|
||||
exclude = ("created_at",)
|
||||
|
||||
|
||||
user_schema = UserSchema()
|
||||
user_schema_load_json = UserSchema(load_json=True)
|
||||
users_schema = UserSchema(many=True)
|
||||
service_schema = ServiceSchema()
|
||||
service_schema_load_json = ServiceSchema(load_json=True)
|
||||
services_schema = ServiceSchema(many=True)
|
||||
template_schema = TemplateSchema()
|
||||
template_schema_load_json = TemplateSchema(load_json=True)
|
||||
templates_schema = TemplateSchema(many=True)
|
||||
api_key_schema = ApiKeySchema()
|
||||
api_key_schema_load_json = ApiKeySchema(load_json=True)
|
||||
api_keys_schema = ApiKeySchema(many=True)
|
||||
job_schema = JobSchema()
|
||||
job_schema_load_json = JobSchema(load_json=True)
|
||||
jobs_schema = JobSchema(many=True)
|
||||
old_request_verify_code_schema = OldRequestVerifyCodeSchema()
|
||||
request_verify_code_schema = RequestVerifyCodeSchema()
|
||||
sms_admin_notification_schema = SmsAdminNotificationSchema()
|
||||
@@ -161,7 +192,6 @@ job_sms_template_notification_schema = JobSmsTemplateNotificationSchema()
|
||||
email_notification_schema = EmailNotificationSchema()
|
||||
job_email_template_notification_schema = JobEmailTemplateNotificationSchema()
|
||||
notification_status_schema = NotificationStatusSchema()
|
||||
notifications_status_schema = NotificationStatusSchema(many=True)
|
||||
notification_status_schema_load_json = NotificationStatusSchema(load_json=True)
|
||||
invited_user_schema = InvitedUserSchema()
|
||||
invited_users_schema = InvitedUserSchema(many=True)
|
||||
permission_schema = PermissionSchema()
|
||||
|
||||
@@ -24,10 +24,9 @@ from app.dao.services_dao import (
|
||||
from app.dao.users_dao import get_model_users
|
||||
from app.models import ApiKey
|
||||
from app.schemas import (
|
||||
services_schema,
|
||||
service_schema,
|
||||
api_keys_schema,
|
||||
users_schema)
|
||||
api_key_schema,
|
||||
user_schema)
|
||||
from app.errors import register_errors
|
||||
|
||||
service = Blueprint('service', __name__)
|
||||
@@ -43,7 +42,7 @@ def get_services():
|
||||
services = dao_fetch_all_services_by_user(user_id)
|
||||
else:
|
||||
services = dao_fetch_all_services()
|
||||
data, errors = services_schema.dump(services)
|
||||
data, errors = service_schema.dump(services, many=True)
|
||||
return jsonify(data=data)
|
||||
|
||||
|
||||
@@ -140,7 +139,7 @@ def get_api_keys(service_id, key_id=None):
|
||||
except NoResultFound:
|
||||
return jsonify(result="error", message="API key not found for id: {}".format(service_id)), 404
|
||||
|
||||
return jsonify(apiKeys=api_keys_schema.dump(api_keys).data), 200
|
||||
return jsonify(apiKeys=api_key_schema.dump(api_keys, many=True).data), 200
|
||||
|
||||
|
||||
@service.route('/<service_id>/users', methods=['GET'])
|
||||
@@ -148,10 +147,11 @@ def get_users_for_service(service_id):
|
||||
fetched = dao_fetch_service_by_id(service_id)
|
||||
if not fetched:
|
||||
return _service_not_found(service_id)
|
||||
# TODO why is this code here, the same functionality exists without it?
|
||||
if not fetched.users:
|
||||
return jsonify(data=[])
|
||||
|
||||
result = users_schema.dump(fetched.users)
|
||||
result = user_schema.dump(fetched.users, many=True)
|
||||
return jsonify(data=result.data)
|
||||
|
||||
|
||||
|
||||
@@ -15,10 +15,7 @@ from app.dao.templates_dao import (
|
||||
from app.dao.services_dao import (
|
||||
dao_fetch_service_by_id
|
||||
)
|
||||
from app.schemas import (
|
||||
template_schema,
|
||||
templates_schema,
|
||||
)
|
||||
from app.schemas import template_schema
|
||||
|
||||
template = Blueprint('template', __name__, url_prefix='/service/<service_id>/template')
|
||||
|
||||
@@ -70,7 +67,7 @@ def update_template(service_id, template_id):
|
||||
@template.route('', methods=['GET'])
|
||||
def get_all_templates_for_service(service_id):
|
||||
templates = dao_get_all_templates_for_service(service_id=service_id)
|
||||
data, errors = templates_schema.dump(templates)
|
||||
data, errors = template_schema.dump(templates, many=True)
|
||||
return jsonify(data=data)
|
||||
|
||||
|
||||
|
||||
@@ -16,7 +16,6 @@ from app.dao.users_dao import (
|
||||
from app.schemas import (
|
||||
old_request_verify_code_schema,
|
||||
user_schema,
|
||||
users_schema,
|
||||
request_verify_code_schema,
|
||||
user_schema_load_json
|
||||
)
|
||||
@@ -191,7 +190,7 @@ def get_user(user_id=None):
|
||||
users = get_model_users(user_id=user_id)
|
||||
if not users:
|
||||
return jsonify(result="error", message="not found"), 404
|
||||
result = users_schema.dump(users) if isinstance(users, list) else user_schema.dump(users)
|
||||
result = user_schema.dump(users, many=True) if isinstance(users, list) else user_schema.dump(users)
|
||||
return jsonify(data=result.data)
|
||||
|
||||
|
||||
|
||||
42
migrations/versions/0027_add_service_permission.py
Normal file
42
migrations/versions/0027_add_service_permission.py
Normal file
@@ -0,0 +1,42 @@
|
||||
"""empty message
|
||||
|
||||
Revision ID: 0027_add_service_permission
|
||||
Revises: 0026_add_sender
|
||||
Create Date: 2016-02-26 10:33:20.536362
|
||||
|
||||
"""
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = '0027_add_service_permission'
|
||||
down_revision = '0026_add_sender'
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
from sqlalchemy.dialects import postgresql
|
||||
|
||||
def upgrade():
|
||||
### commands auto generated by Alembic - please adjust! ###
|
||||
op.create_table('permissions',
|
||||
sa.Column('id', postgresql.UUID(as_uuid=True), nullable=False),
|
||||
sa.Column('service_id', postgresql.UUID(as_uuid=True), nullable=True),
|
||||
sa.Column('user_id', sa.Integer(), nullable=False),
|
||||
sa.Column('permission', sa.String(length=255), nullable=False),
|
||||
sa.Column('created_at', sa.DateTime(), nullable=False),
|
||||
sa.ForeignKeyConstraint(['service_id'], ['services.id'], ),
|
||||
sa.ForeignKeyConstraint(['user_id'], ['users.id'], ),
|
||||
sa.PrimaryKeyConstraint('id'),
|
||||
sa.UniqueConstraint('service_id', 'user_id', 'permission', name='uix_service_user_permission')
|
||||
)
|
||||
op.create_index(op.f('ix_permissions_service_id'), 'permissions', ['service_id'], unique=False)
|
||||
op.create_index(op.f('ix_permissions_user_id'), 'permissions', ['user_id'], unique=False)
|
||||
op.drop_column('users', 'permissions')
|
||||
### end Alembic commands ###
|
||||
|
||||
|
||||
def downgrade():
|
||||
### commands auto generated by Alembic - please adjust! ###
|
||||
op.add_column('users', sa.Column('permissions', postgresql.ARRAY(VARCHAR()), autoincrement=False, nullable=True))
|
||||
op.drop_index(op.f('ix_permissions_user_id'), table_name='permissions')
|
||||
op.drop_index(op.f('ix_permissions_service_id'), table_name='permissions')
|
||||
op.drop_table('permissions')
|
||||
### end Alembic commands ###
|
||||
@@ -1,7 +1,8 @@
|
||||
import pytest
|
||||
from datetime import datetime
|
||||
from app import email_safe
|
||||
from app.models import (User, Service, Template, ApiKey, Job, Notification, InvitedUser)
|
||||
from app import (email_safe, db)
|
||||
from app.models import (
|
||||
User, Service, Template, ApiKey, Job, Notification, InvitedUser, Permission)
|
||||
from app.dao.users_dao import (save_model_user, create_user_code, create_secret_code)
|
||||
from app.dao.services_dao import dao_create_service
|
||||
from app.dao.templates_dao import dao_create_template
|
||||
@@ -47,8 +48,7 @@ def sample_user(notify_db,
|
||||
'email_address': email,
|
||||
'password': 'password',
|
||||
'mobile_number': '+447700900986',
|
||||
'state': 'active',
|
||||
'permissions': []
|
||||
'state': 'active'
|
||||
}
|
||||
usr = User.query.filter_by(email_address=email).first()
|
||||
if not usr:
|
||||
@@ -321,3 +321,23 @@ def sample_invited_user(notify_db,
|
||||
invited_user = InvitedUser(**data)
|
||||
save_invited_user(invited_user)
|
||||
return invited_user
|
||||
|
||||
|
||||
@pytest.fixture(scope='function')
|
||||
def sample_permission(notify_db,
|
||||
notify_db_session,
|
||||
service=None,
|
||||
user=None,
|
||||
permission="sample permission"):
|
||||
if user is None:
|
||||
user = sample_user(notify_db, notify_db_session)
|
||||
data = {
|
||||
'user': user,
|
||||
'permission': permission
|
||||
}
|
||||
if service:
|
||||
data['service'] = service
|
||||
p_model = Permission(**data)
|
||||
db.session.add(p_model)
|
||||
db.session.commit()
|
||||
return p_model
|
||||
|
||||
@@ -439,8 +439,8 @@ def test_should_allow_valid_email_notification(notify_api, sample_email_template
|
||||
path='/notifications/email',
|
||||
data=json.dumps(data),
|
||||
headers=[('Content-Type', 'application/json'), auth_header])
|
||||
|
||||
notification_id = json.loads(response.data)['notification_id']
|
||||
assert response.status_code == 201
|
||||
notification_id = json.loads(response.get_data(as_text=True))['notification_id']
|
||||
app.celery.tasks.send_email.apply_async.assert_called_once_with(
|
||||
(str(sample_email_template.service_id),
|
||||
notification_id,
|
||||
|
||||
0
tests/app/permissions/__init__.py
Normal file
0
tests/app/permissions/__init__.py
Normal file
178
tests/app/permissions/test_rest.py
Normal file
178
tests/app/permissions/test_rest.py
Normal file
@@ -0,0 +1,178 @@
|
||||
import json
|
||||
from flask import url_for
|
||||
from tests import create_authorization_header
|
||||
from ..conftest import sample_permission as create_permission
|
||||
|
||||
|
||||
def test_get_permission_list(notify_api, notify_db, notify_db_session, sample_permission):
|
||||
"""
|
||||
Tests GET endpoint '/' to retrieve entire permission list.
|
||||
"""
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
header = create_authorization_header(
|
||||
path=url_for('permission.get_permissions'),
|
||||
method='GET')
|
||||
response = client.get(
|
||||
url_for('permission.get_permissions'),
|
||||
headers=[header])
|
||||
assert response.status_code == 200
|
||||
json_resp = json.loads(response.get_data(as_text=True))
|
||||
assert len(json_resp['data']) == 1
|
||||
expected = {
|
||||
"permission": sample_permission.permission,
|
||||
"user": sample_permission.user.id,
|
||||
"id": str(sample_permission.id),
|
||||
"service": None
|
||||
}
|
||||
assert expected in json_resp['data']
|
||||
|
||||
|
||||
def test_get_permission_filter(notify_api,
|
||||
notify_db,
|
||||
notify_db_session,
|
||||
sample_permission,
|
||||
sample_user,
|
||||
sample_service):
|
||||
"""
|
||||
Tests GET endpoint '/' to retrieve filtered permission list.
|
||||
"""
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
another_permission = create_permission(
|
||||
notify_db,
|
||||
notify_db_session,
|
||||
user=sample_user,
|
||||
service=sample_service,
|
||||
permission="another permission")
|
||||
header = create_authorization_header(
|
||||
path=url_for('permission.get_permissions'),
|
||||
method='GET')
|
||||
response = client.get(
|
||||
url_for('permission.get_permissions', service=str(sample_service.id)),
|
||||
headers=[header])
|
||||
assert response.status_code == 200
|
||||
json_resp = json.loads(response.get_data(as_text=True))
|
||||
assert len(json_resp['data']) == 1
|
||||
expected = {
|
||||
"permission": another_permission.permission,
|
||||
"user": sample_user.id,
|
||||
"id": str(another_permission.id),
|
||||
"service": str(sample_service.id)
|
||||
}
|
||||
assert expected in json_resp['data']
|
||||
|
||||
|
||||
def test_get_permission(notify_api, notify_db, notify_db_session, sample_permission):
|
||||
"""
|
||||
Tests GET endpoint '/<permission_id>' to retrieve a single permission.
|
||||
"""
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
header = create_authorization_header(
|
||||
path=url_for('permission.get_permission', permission_id=str(sample_permission.id)),
|
||||
method='GET')
|
||||
response = client.get(
|
||||
url_for('permission.get_permission', permission_id=str(sample_permission.id)),
|
||||
headers=[header])
|
||||
assert response.status_code == 200
|
||||
json_resp = json.loads(response.get_data(as_text=True))
|
||||
expected = {
|
||||
"permission": sample_permission.permission,
|
||||
"user": sample_permission.user.id,
|
||||
"id": str(sample_permission.id),
|
||||
"service": None
|
||||
}
|
||||
assert expected == json_resp['data']
|
||||
|
||||
|
||||
def test_get_permission_404(notify_api, notify_db, notify_db_session, sample_permission):
|
||||
"""
|
||||
Tests GET endpoint '/<invalid_id>' returns a correct 404
|
||||
"""
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
header = create_authorization_header(
|
||||
path=url_for('permission.get_permission', permission_id="123"),
|
||||
method='GET')
|
||||
response = client.get(
|
||||
url_for('permission.get_permission', permission_id="123"),
|
||||
headers=[header])
|
||||
assert response.status_code == 404
|
||||
json_resp = json.loads(response.get_data(as_text=True))
|
||||
assert json_resp['message'] == 'No result found'
|
||||
|
||||
|
||||
def test_create_permission(notify_api, notify_db, notify_db_session, sample_user, sample_service):
|
||||
"""
|
||||
Tests POST endpoint '/' to create a single permission.
|
||||
"""
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
permission_name = "new permission"
|
||||
data = json.dumps({
|
||||
'user': sample_user.id,
|
||||
'service': str(sample_service.id),
|
||||
'permission': permission_name})
|
||||
auth_header = create_authorization_header(
|
||||
path=url_for('permission.create_permission'),
|
||||
method='POST',
|
||||
request_body=data)
|
||||
headers = [('Content-Type', 'application/json'), auth_header]
|
||||
response = client.post(
|
||||
url_for('permission.create_permission'),
|
||||
data=data,
|
||||
headers=headers)
|
||||
assert response.status_code == 201
|
||||
json_resp = json.loads(response.get_data(as_text=True))
|
||||
assert permission_name == json_resp['data']['permission']
|
||||
assert str(sample_service.id) == json_resp['data']['service']
|
||||
assert sample_user.id == json_resp['data']['user']
|
||||
|
||||
|
||||
def test_create_permission_no_service(notify_api, notify_db, notify_db_session, sample_user):
|
||||
"""
|
||||
Tests POST endpoint '/' to create a single permission.
|
||||
"""
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
permission_name = "new permission"
|
||||
data = json.dumps({
|
||||
'user': sample_user.id,
|
||||
'permission': permission_name})
|
||||
auth_header = create_authorization_header(
|
||||
path=url_for('permission.create_permission'),
|
||||
method='POST',
|
||||
request_body=data)
|
||||
headers = [('Content-Type', 'application/json'), auth_header]
|
||||
response = client.post(
|
||||
url_for('permission.create_permission'),
|
||||
data=data,
|
||||
headers=headers)
|
||||
assert response.status_code == 201
|
||||
json_resp = json.loads(response.get_data(as_text=True))
|
||||
assert permission_name == json_resp['data']['permission']
|
||||
assert sample_user.id == json_resp['data']['user']
|
||||
|
||||
|
||||
def test_delete_permission(notify_api, notify_db, notify_db_session, sample_permission):
|
||||
"""
|
||||
Tests DELETE endpoint '/' to delete a permission.
|
||||
"""
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
header = create_authorization_header(
|
||||
path=url_for('permission.delete_permission', permission_id=sample_permission.id),
|
||||
method='DELETE')
|
||||
response = client.delete(
|
||||
url_for('permission.delete_permission', permission_id=sample_permission.id),
|
||||
headers=[header])
|
||||
assert response.status_code == 200
|
||||
json_resp = json.loads(response.get_data(as_text=True))
|
||||
expected = {
|
||||
"permission": sample_permission.permission,
|
||||
"user": sample_permission.user.id,
|
||||
"id": str(sample_permission.id),
|
||||
"service": None
|
||||
}
|
||||
assert expected == json_resp['data']
|
||||
@@ -28,8 +28,7 @@ def test_get_user_list(notify_api, notify_db, notify_db_session, sample_user, sa
|
||||
"password_changed_at": None,
|
||||
"logged_in_at": None,
|
||||
"state": "active",
|
||||
"failed_login_count": 0,
|
||||
"permissions": []
|
||||
"failed_login_count": 0
|
||||
}
|
||||
assert expected in json_resp['data']
|
||||
|
||||
@@ -56,8 +55,7 @@ def test_get_user(notify_api, notify_db, notify_db_session, sample_user, sample_
|
||||
"password_changed_at": None,
|
||||
"logged_in_at": None,
|
||||
"state": "active",
|
||||
"failed_login_count": 0,
|
||||
"permissions": []
|
||||
"failed_login_count": 0
|
||||
}
|
||||
assert json_resp['data'] == expected
|
||||
|
||||
@@ -77,8 +75,7 @@ def test_post_user(notify_api, notify_db, notify_db_session, sample_admin_servic
|
||||
"password_changed_at": None,
|
||||
"logged_in_at": None,
|
||||
"state": "active",
|
||||
"failed_login_count": 0,
|
||||
"permissions": []
|
||||
"failed_login_count": 0
|
||||
}
|
||||
auth_header = create_authorization_header(service_id=sample_admin_service_id,
|
||||
path=url_for('user.create_user'),
|
||||
@@ -111,8 +108,7 @@ def test_post_user_missing_attribute_email(notify_api, notify_db, notify_db_sess
|
||||
"password_changed_at": None,
|
||||
"logged_in_at": None,
|
||||
"state": "active",
|
||||
"failed_login_count": 0,
|
||||
"permissions": []
|
||||
"failed_login_count": 0
|
||||
}
|
||||
auth_header = create_authorization_header(service_id=sample_admin_service_id,
|
||||
path=url_for('user.create_user'),
|
||||
@@ -143,8 +139,7 @@ def test_post_user_missing_attribute_password(notify_api, notify_db, notify_db_s
|
||||
"password_changed_at": None,
|
||||
"logged_in_at": None,
|
||||
"state": "active",
|
||||
"failed_login_count": 0,
|
||||
"permissions": []
|
||||
"failed_login_count": 0
|
||||
}
|
||||
auth_header = create_authorization_header(service_id=sample_admin_service_id,
|
||||
path=url_for('user.create_user'),
|
||||
@@ -172,8 +167,7 @@ def test_put_user(notify_api, notify_db, notify_db_session, sample_user, sample_
|
||||
data = {
|
||||
'name': sample_user.name,
|
||||
'email_address': new_email,
|
||||
'mobile_number': sample_user.mobile_number,
|
||||
'permissions': []
|
||||
'mobile_number': sample_user.mobile_number
|
||||
}
|
||||
auth_header = create_authorization_header(service_id=sample_admin_service_id,
|
||||
path=url_for('user.update_user', user_id=sample_user.id),
|
||||
@@ -196,8 +190,7 @@ def test_put_user(notify_api, notify_db, notify_db_session, sample_user, sample_
|
||||
"id": user.id,
|
||||
"logged_in_at": None,
|
||||
"state": "active",
|
||||
"failed_login_count": 0,
|
||||
"permissions": []
|
||||
"failed_login_count": 0
|
||||
}
|
||||
assert json_resp['data'] == expected
|
||||
assert json_resp['data']['email_address'] == new_email
|
||||
@@ -219,8 +212,7 @@ def test_put_user_update_password(notify_api,
|
||||
'name': sample_user.name,
|
||||
'email_address': sample_user.email_address,
|
||||
'mobile_number': sample_user.mobile_number,
|
||||
'password': new_password,
|
||||
'permissions': []
|
||||
'password': new_password
|
||||
}
|
||||
auth_header = create_authorization_header(service_id=sample_admin_service_id,
|
||||
path=url_for('user.update_user', user_id=sample_user.id),
|
||||
@@ -277,125 +269,6 @@ def test_put_user_not_exists(notify_api, notify_db, notify_db_session, sample_us
|
||||
assert user.email_address != new_email
|
||||
|
||||
|
||||
def test_post_with_permissions(notify_api, notify_db, notify_db_session, sample_admin_service_id):
|
||||
"""
|
||||
Tests POST endpoint '/' to create a user with permissions.
|
||||
"""
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
assert User.query.count() == 1
|
||||
permissions = ['new permission']
|
||||
data = {
|
||||
"name": "Test User",
|
||||
"email_address": "user@digital.cabinet-office.gov.uk",
|
||||
"password": "password",
|
||||
"mobile_number": "+447700900986",
|
||||
"password_changed_at": None,
|
||||
"logged_in_at": None,
|
||||
"state": "active",
|
||||
"failed_login_count": 0,
|
||||
"permissions": permissions
|
||||
}
|
||||
auth_header = create_authorization_header(service_id=sample_admin_service_id,
|
||||
path=url_for('user.create_user'),
|
||||
method='POST',
|
||||
request_body=json.dumps(data))
|
||||
headers = [('Content-Type', 'application/json'), auth_header]
|
||||
resp = client.post(
|
||||
url_for('user.create_user'),
|
||||
data=json.dumps(data),
|
||||
headers=headers)
|
||||
assert resp.status_code == 201
|
||||
user = User.query.filter_by(email_address='user@digital.cabinet-office.gov.uk').first()
|
||||
json_resp = json.loads(resp.get_data(as_text=True))
|
||||
json_resp['data'] == {"email_address": user.email_address, "id": user.id}
|
||||
assert json_resp['data']['email_address'] == user.email_address
|
||||
assert json_resp['data']['id'] == user.id
|
||||
assert json_resp['data']['permissions'] == permissions
|
||||
|
||||
|
||||
def test_put_add_permissions(notify_api, notify_db, notify_db_session, sample_user, sample_admin_service_id):
|
||||
"""
|
||||
Tests PUT endpoint '/' to update user permissions.
|
||||
"""
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
permissions = ['one permission', 'another permission']
|
||||
data = {
|
||||
'name': sample_user.name,
|
||||
'email_address': sample_user.email_address,
|
||||
'mobile_number': sample_user.mobile_number,
|
||||
'permissions': permissions
|
||||
}
|
||||
auth_header = create_authorization_header(service_id=sample_admin_service_id,
|
||||
path=url_for('user.update_user', user_id=sample_user.id),
|
||||
method='PUT',
|
||||
request_body=json.dumps(data))
|
||||
headers = [('Content-Type', 'application/json'), auth_header]
|
||||
resp = client.put(
|
||||
url_for('user.update_user', user_id=sample_user.id),
|
||||
data=json.dumps(data),
|
||||
headers=headers)
|
||||
assert resp.status_code == 200
|
||||
assert User.query.count() == 2
|
||||
user = User.query.filter_by(email_address=sample_user.email_address).first()
|
||||
json_resp = json.loads(resp.get_data(as_text=True))
|
||||
expected = {
|
||||
"name": user.name,
|
||||
"email_address": user.email_address,
|
||||
"mobile_number": user.mobile_number,
|
||||
"password_changed_at": None,
|
||||
"id": user.id,
|
||||
"logged_in_at": None,
|
||||
"state": user.state,
|
||||
"failed_login_count": 0,
|
||||
"permissions": permissions
|
||||
}
|
||||
assert json_resp['data'] == expected
|
||||
|
||||
|
||||
def test_put_remove_permissions(notify_api, notify_db, notify_db_session, sample_user, sample_admin_service_id):
|
||||
"""
|
||||
Tests PUT endpoint '/' to update user permissions.
|
||||
"""
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
old_permissions = ['one permission', 'another permission']
|
||||
save_model_user(sample_user, {'permissions': old_permissions})
|
||||
permissions = ['new permissions']
|
||||
data = {
|
||||
'name': sample_user.name,
|
||||
'email_address': sample_user.email_address,
|
||||
'mobile_number': sample_user.mobile_number,
|
||||
'permissions': permissions
|
||||
}
|
||||
auth_header = create_authorization_header(service_id=sample_admin_service_id,
|
||||
path=url_for('user.update_user', user_id=sample_user.id),
|
||||
method='PUT',
|
||||
request_body=json.dumps(data))
|
||||
headers = [('Content-Type', 'application/json'), auth_header]
|
||||
resp = client.put(
|
||||
url_for('user.update_user', user_id=sample_user.id),
|
||||
data=json.dumps(data),
|
||||
headers=headers)
|
||||
assert resp.status_code == 200
|
||||
assert User.query.count() == 2
|
||||
user = User.query.filter_by(email_address=sample_user.email_address).first()
|
||||
json_resp = json.loads(resp.get_data(as_text=True))
|
||||
expected = {
|
||||
"name": user.name,
|
||||
"email_address": user.email_address,
|
||||
"mobile_number": user.mobile_number,
|
||||
"password_changed_at": None,
|
||||
"id": user.id,
|
||||
"logged_in_at": None,
|
||||
"state": user.state,
|
||||
"failed_login_count": 0,
|
||||
"permissions": permissions
|
||||
}
|
||||
assert json_resp['data'] == expected
|
||||
|
||||
|
||||
def test_get_user_by_email(notify_api, notify_db, notify_db_session, sample_user, sample_admin_service_id):
|
||||
|
||||
with notify_api.test_request_context():
|
||||
@@ -414,8 +287,7 @@ def test_get_user_by_email(notify_api, notify_db, notify_db_session, sample_user
|
||||
"password_changed_at": None,
|
||||
"logged_in_at": None,
|
||||
"state": "active",
|
||||
"failed_login_count": 0,
|
||||
"permissions": []
|
||||
"failed_login_count": 0
|
||||
}
|
||||
|
||||
assert json_resp['data'] == expected
|
||||
|
||||
Reference in New Issue
Block a user