mirror of
https://github.com/GSA/notifications-api.git
synced 2026-02-03 01:41:05 -05:00
@@ -1,48 +1,44 @@
|
||||
import json
|
||||
from datetime import datetime
|
||||
from sqlalchemy.orm import load_only
|
||||
from . import DAOException
|
||||
from app import db
|
||||
from app.models import (Service, User)
|
||||
from app.models import Service
|
||||
from sqlalchemy import asc
|
||||
|
||||
|
||||
def save_model_service(service, update_dict=None):
|
||||
users_list = update_dict.get('users', []) if update_dict else getattr(service, 'users', [])
|
||||
if not users_list:
|
||||
error_msg = {'users': ['Missing data for required attribute']}
|
||||
raise DAOException(json.dumps(error_msg))
|
||||
if update_dict:
|
||||
# Make sure the update_dict doesn't contain conflicting
|
||||
update_dict.pop('id', None)
|
||||
update_dict.pop('users', None)
|
||||
# TODO optimize this algorithm
|
||||
for i, x in enumerate(service.users):
|
||||
if x not in users_list:
|
||||
service.users.remove(x)
|
||||
else:
|
||||
users_list.remove(x)
|
||||
for x in users_list:
|
||||
service.users.append(x)
|
||||
Service.query.filter_by(id=service.id).update(update_dict)
|
||||
else:
|
||||
db.session.add(service)
|
||||
def dao_fetch_all_services():
|
||||
return Service.query.order_by(asc(Service.created_at)).all()
|
||||
|
||||
|
||||
def dao_fetch_service_by_id(service_id):
|
||||
return Service.query.filter_by(id=service_id).first()
|
||||
|
||||
|
||||
def dao_fetch_all_services_by_user(user_id):
|
||||
return Service.query.filter(Service.users.any(id=user_id)).order_by(asc(Service.created_at)).all()
|
||||
|
||||
|
||||
def dao_fetch_service_by_id_and_user(service_id, user_id):
|
||||
return Service.query.filter(Service.users.any(id=user_id)).filter_by(id=service_id).first()
|
||||
|
||||
|
||||
def dao_create_service(service, user):
|
||||
service.users.append(user)
|
||||
db.session.add(service)
|
||||
db.session.commit()
|
||||
|
||||
|
||||
def delete_model_service(service):
|
||||
db.session.delete(service)
|
||||
def dao_update_service(service):
|
||||
db.session.add(service)
|
||||
db.session.commit()
|
||||
|
||||
|
||||
def get_model_services(service_id=None, user_id=None, _raise=True):
|
||||
# TODO need better mapping from function params to sql query.
|
||||
if user_id and service_id:
|
||||
return Service.query.filter(
|
||||
Service.users.any(id=user_id)).filter_by(id=service_id).one()
|
||||
elif service_id:
|
||||
result = Service.query.filter_by(id=service_id).one() if _raise else Service.query.filter_by(
|
||||
id=service_id).first()
|
||||
return result
|
||||
elif user_id:
|
||||
return Service.query.filter(Service.users.any(id=user_id)).all()
|
||||
return Service.query.all()
|
||||
def dao_add_user_to_service(service, user):
|
||||
service.users.append(user)
|
||||
db.session.add(service)
|
||||
db.session.commit()
|
||||
|
||||
|
||||
def dao_remove_user_from_service(service, user):
|
||||
service.users.remove(user)
|
||||
db.session.add(service)
|
||||
db.session.commit()
|
||||
|
||||
@@ -60,7 +60,7 @@ def delete_model_user(user):
|
||||
|
||||
def get_model_users(user_id=None):
|
||||
if user_id:
|
||||
return User.query.filter_by(id=user_id).one()
|
||||
return User.query.filter_by(id=user_id).first()
|
||||
return User.query.filter_by().all()
|
||||
|
||||
|
||||
|
||||
@@ -67,7 +67,7 @@ class Service(db.Model):
|
||||
|
||||
id = db.Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
|
||||
old_id = db.Column(db.Integer, Sequence('services_id_seq'), nullable=False)
|
||||
name = db.Column(db.String(255), nullable=False)
|
||||
name = db.Column(db.String(255), nullable=False, unique=True)
|
||||
created_at = db.Column(
|
||||
db.DateTime,
|
||||
index=False,
|
||||
@@ -87,6 +87,7 @@ class Service(db.Model):
|
||||
secondary=user_to_service,
|
||||
backref=db.backref('user_to_service', lazy='dynamic'))
|
||||
restricted = db.Column(db.Boolean, index=False, unique=False, nullable=False)
|
||||
email_from = db.Column(db.Text, index=False, unique=True, nullable=False)
|
||||
|
||||
|
||||
class ApiKey(db.Model):
|
||||
@@ -128,6 +129,7 @@ class Template(db.Model):
|
||||
content = db.Column(db.Text, index=False, unique=False, nullable=False)
|
||||
service_id = db.Column(UUID(as_uuid=True), db.ForeignKey('services.id'), index=True, unique=False, nullable=False)
|
||||
service = db.relationship('Service', backref=db.backref('templates', lazy='dynamic'))
|
||||
subject = db.Column(db.Text, index=False, unique=True, nullable=True)
|
||||
|
||||
|
||||
JOB_STATUS_TYPES = ['pending', 'in progress', 'finished']
|
||||
|
||||
@@ -1,93 +1,130 @@
|
||||
import re
|
||||
from datetime import datetime
|
||||
|
||||
from flask import (jsonify, request, abort)
|
||||
from flask import (jsonify, request)
|
||||
from sqlalchemy.exc import DataError
|
||||
from sqlalchemy.orm.exc import NoResultFound
|
||||
|
||||
from app.dao import DAOException
|
||||
|
||||
from app.dao.users_dao import get_model_users
|
||||
from app.dao.services_dao import (
|
||||
save_model_service, get_model_services, delete_model_service)
|
||||
dao_fetch_service_by_id_and_user,
|
||||
dao_fetch_service_by_id,
|
||||
dao_fetch_all_services,
|
||||
dao_create_service,
|
||||
dao_update_service,
|
||||
dao_fetch_all_services_by_user
|
||||
)
|
||||
from app.dao.templates_dao import (
|
||||
save_model_template, get_model_templates, delete_model_template)
|
||||
from app.dao.api_key_dao import (save_model_api_key, get_model_api_keys, get_unsigned_secret)
|
||||
save_model_template,
|
||||
get_model_templates,
|
||||
delete_model_template
|
||||
)
|
||||
from app.dao.api_key_dao import (
|
||||
save_model_api_key,
|
||||
get_model_api_keys,
|
||||
get_unsigned_secret
|
||||
)
|
||||
from app.models import ApiKey
|
||||
from app.schemas import (
|
||||
services_schema, service_schema, template_schema, templates_schema,
|
||||
api_keys_schema, service_schema_load_json, template_schema_load_json)
|
||||
services_schema,
|
||||
service_schema,
|
||||
templates_schema,
|
||||
api_keys_schema,
|
||||
template_schema_load_json,
|
||||
template_schema
|
||||
)
|
||||
|
||||
from flask import Blueprint
|
||||
|
||||
service = Blueprint('service', __name__)
|
||||
|
||||
from app.errors import register_errors
|
||||
|
||||
register_errors(service)
|
||||
|
||||
|
||||
@service.route('', methods=['GET'])
|
||||
def get_services():
|
||||
user_id = request.args.get('user_id', None)
|
||||
if user_id:
|
||||
services = dao_fetch_all_services_by_user(user_id)
|
||||
else:
|
||||
services = dao_fetch_all_services()
|
||||
data, errors = services_schema.dump(services)
|
||||
return jsonify(data=data)
|
||||
|
||||
|
||||
@service.route('/<service_id>', methods=['GET'])
|
||||
def get_service_by_id(service_id):
|
||||
user_id = request.args.get('user_id', None)
|
||||
if user_id:
|
||||
fetched = dao_fetch_service_by_id_and_user(service_id, user_id)
|
||||
else:
|
||||
fetched = dao_fetch_service_by_id(service_id)
|
||||
if not fetched:
|
||||
return jsonify(result="error", message="not found"), 404
|
||||
data, errors = service_schema.dump(fetched)
|
||||
return jsonify(data=data)
|
||||
|
||||
|
||||
@service.route('', methods=['POST'])
|
||||
def create_service():
|
||||
# TODO what exceptions get passed from schema parsing?
|
||||
service, errors = service_schema.load(request.get_json())
|
||||
data = request.get_json()
|
||||
if not data.get('user_id', None):
|
||||
return jsonify(result="error", message={'user_id': ['Missing data for required field.']}), 400
|
||||
|
||||
user = get_model_users(data['user_id'])
|
||||
if not user:
|
||||
return jsonify(result="error", message={'user_id': ['not found']}), 400
|
||||
|
||||
data.pop('user_id', None)
|
||||
if 'name' in data:
|
||||
data['email_from'] = _email_safe(data.get('name', None))
|
||||
|
||||
valid_service, errors = service_schema.load(request.get_json())
|
||||
|
||||
if errors:
|
||||
return jsonify(result="error", message=errors), 400
|
||||
# I believe service is already added to the session but just needs a
|
||||
# db.session.commit
|
||||
try:
|
||||
save_model_service(service)
|
||||
except DAOException as e:
|
||||
return jsonify(result="error", message=str(e)), 500
|
||||
return jsonify(data=service_schema.dump(service).data), 201
|
||||
|
||||
dao_create_service(valid_service, user)
|
||||
return jsonify(data=service_schema.dump(valid_service).data), 201
|
||||
|
||||
|
||||
@service.route('/<service_id>', methods=['PUT', 'DELETE'])
|
||||
def _email_safe(string):
|
||||
return "".join([
|
||||
character.lower() if character.isalnum() or character == "." else ""
|
||||
for character in re.sub("\s+", ".", string.strip())
|
||||
])
|
||||
|
||||
|
||||
@service.route('/<service_id>', methods=['POST'])
|
||||
def update_service(service_id):
|
||||
try:
|
||||
service = get_model_services(service_id=service_id)
|
||||
except DataError:
|
||||
return jsonify(result="error", message="Invalid service id"), 400
|
||||
except NoResultFound:
|
||||
return jsonify(result="error", message="Service not found"), 404
|
||||
if request.method == 'DELETE':
|
||||
status_code = 202
|
||||
delete_model_service(service)
|
||||
else:
|
||||
status_code = 200
|
||||
update_dict, errors = service_schema_load_json.load(request.get_json())
|
||||
if errors:
|
||||
return jsonify(result="error", message=errors), 400
|
||||
try:
|
||||
save_model_service(service, update_dict=update_dict)
|
||||
except DAOException as e:
|
||||
return jsonify(result="error", message=str(e)), 500
|
||||
return jsonify(data=service_schema.dump(service).data), status_code
|
||||
fetched_service = dao_fetch_service_by_id(service_id)
|
||||
if not fetched_service:
|
||||
return jsonify(result="error", message="not found"), 404
|
||||
|
||||
current_data = dict(service_schema.dump(fetched_service).data.items())
|
||||
current_data.update(request.get_json())
|
||||
|
||||
@service.route('/<service_id>', methods=['GET'])
|
||||
@service.route('', methods=['GET'])
|
||||
def get_service(service_id=None):
|
||||
user_id = request.args.get('user_id', None)
|
||||
try:
|
||||
services = get_model_services(service_id=service_id, user_id=user_id)
|
||||
except DataError:
|
||||
return jsonify(result="error", message="Invalid service id"), 400
|
||||
except NoResultFound:
|
||||
return jsonify(result="error", message="Service not found"), 404
|
||||
data, errors = services_schema.dump(services) if isinstance(services, list) else service_schema.dump(services)
|
||||
return jsonify(data=data)
|
||||
update_dict, errors = service_schema.load(current_data)
|
||||
if errors:
|
||||
return jsonify(result="error", message=errors), 400
|
||||
dao_update_service(update_dict)
|
||||
return jsonify(data=service_schema.dump(fetched_service).data), 200
|
||||
|
||||
|
||||
@service.route('/<service_id>/api-key', methods=['POST'])
|
||||
def renew_api_key(service_id=None):
|
||||
try:
|
||||
service = get_model_services(service_id=service_id)
|
||||
except DataError:
|
||||
return jsonify(result="error", message="Invalid service id"), 400
|
||||
except NoResultFound:
|
||||
fetched_service = dao_fetch_service_by_id(service_id=service_id)
|
||||
if not fetched_service:
|
||||
return jsonify(result="error", message="Service not found"), 404
|
||||
|
||||
try:
|
||||
# create a new one
|
||||
# TODO: what validation should be done here?
|
||||
secret_name = request.get_json()['name']
|
||||
key = ApiKey(service=service, name=secret_name)
|
||||
key = ApiKey(service=fetched_service, name=secret_name)
|
||||
save_model_api_key(key)
|
||||
except DAOException as e:
|
||||
return jsonify(result='error', message=str(e)), 500
|
||||
@@ -112,7 +149,7 @@ def revoke_api_key(service_id, api_key_id):
|
||||
@service.route('/<service_id>/api-keys/<int:key_id>', methods=['GET'])
|
||||
def get_api_keys(service_id, key_id=None):
|
||||
try:
|
||||
service = get_model_services(service_id=service_id)
|
||||
service = dao_fetch_service_by_id(service_id=service_id)
|
||||
except DataError:
|
||||
return jsonify(result="error", message="Invalid service id"), 400
|
||||
except NoResultFound:
|
||||
@@ -133,16 +170,13 @@ def get_api_keys(service_id, key_id=None):
|
||||
|
||||
@service.route('/<service_id>/template', methods=['POST'])
|
||||
def create_template(service_id):
|
||||
try:
|
||||
service = get_model_services(service_id=service_id)
|
||||
except DataError:
|
||||
return jsonify(result="error", message="Invalid service id"), 400
|
||||
except NoResultFound:
|
||||
fetched_service = dao_fetch_service_by_id(service_id=service_id)
|
||||
if not fetched_service:
|
||||
return jsonify(result="error", message="Service not found"), 404
|
||||
template, errors = template_schema.load(request.get_json())
|
||||
if errors:
|
||||
return jsonify(result="error", message=errors), 400
|
||||
template.service = service
|
||||
template.service = fetched_service
|
||||
# I believe service is already added to the session but just needs a
|
||||
# db.session.commit
|
||||
save_model_template(template)
|
||||
@@ -151,11 +185,8 @@ def create_template(service_id):
|
||||
|
||||
@service.route('/<service_id>/template/<int:template_id>', methods=['PUT', 'DELETE'])
|
||||
def update_template(service_id, template_id):
|
||||
try:
|
||||
service = get_model_services(service_id=service_id)
|
||||
except DataError:
|
||||
return jsonify(result="error", message="Invalid service id"), 400
|
||||
except NoResultFound:
|
||||
fetched_service = dao_fetch_service_by_id(service_id=service_id)
|
||||
if not fetched_service:
|
||||
return jsonify(result="error", message="Service not found"), 404
|
||||
try:
|
||||
template = get_model_templates(template_id=template_id)
|
||||
|
||||
@@ -1,12 +1,10 @@
|
||||
from datetime import datetime
|
||||
from flask import (jsonify, request, abort, Blueprint, current_app)
|
||||
|
||||
from app import encryption
|
||||
from app.dao.services_dao import get_model_services
|
||||
|
||||
from app.dao.users_dao import (
|
||||
get_model_users,
|
||||
save_model_user,
|
||||
delete_model_user,
|
||||
create_user_code,
|
||||
get_user_code,
|
||||
use_user_code,
|
||||
@@ -14,14 +12,17 @@ from app.dao.users_dao import (
|
||||
reset_failed_login_count
|
||||
)
|
||||
from app.schemas import (
|
||||
user_schema, users_schema, service_schema, services_schema,
|
||||
old_request_verify_code_schema, user_schema_load_json,
|
||||
request_verify_code_schema)
|
||||
old_request_verify_code_schema,
|
||||
user_schema,
|
||||
users_schema,
|
||||
request_verify_code_schema,
|
||||
user_schema_load_json
|
||||
)
|
||||
|
||||
from app.celery.tasks import (send_sms_code, send_email_code)
|
||||
from app.errors import register_errors
|
||||
|
||||
user = Blueprint('user', __name__)
|
||||
|
||||
from app.errors import register_errors
|
||||
register_errors(user)
|
||||
|
||||
|
||||
@@ -39,25 +40,23 @@ def create_user():
|
||||
return jsonify(data=user_schema.dump(user).data), 201
|
||||
|
||||
|
||||
@user.route('/<int:user_id>', methods=['PUT', 'DELETE'])
|
||||
@user.route('/<int:user_id>', methods=['PUT'])
|
||||
def update_user(user_id):
|
||||
user_to_update = get_model_users(user_id=user_id)
|
||||
if not user_to_update:
|
||||
return jsonify(result="error", message="User not found"), 404
|
||||
|
||||
if request.method == 'DELETE':
|
||||
status_code = 202
|
||||
delete_model_user(user_to_update)
|
||||
else:
|
||||
req_json = request.get_json()
|
||||
update_dct, errors = user_schema_load_json.load(req_json)
|
||||
pwd = req_json.get('password', None)
|
||||
# TODO password validation, it is already done on the admin app
|
||||
# but would be good to have the same validation here.
|
||||
if pwd is not None and not pwd:
|
||||
errors.update({'password': ['Invalid data for field']})
|
||||
if errors:
|
||||
return jsonify(result="error", message=errors), 400
|
||||
status_code = 200
|
||||
save_model_user(user_to_update, update_dict=update_dct, pwd=pwd)
|
||||
req_json = request.get_json()
|
||||
update_dct, errors = user_schema_load_json.load(req_json)
|
||||
pwd = req_json.get('password', None)
|
||||
# TODO password validation, it is already done on the admin app
|
||||
# but would be good to have the same validation here.
|
||||
if pwd is not None and not pwd:
|
||||
errors.update({'password': ['Invalid data for field']})
|
||||
if errors:
|
||||
return jsonify(result="error", message=errors), 400
|
||||
status_code = 200
|
||||
save_model_user(user_to_update, update_dict=update_dct, pwd=pwd)
|
||||
return jsonify(data=user_schema.dump(user_to_update).data), status_code
|
||||
|
||||
|
||||
@@ -111,6 +110,9 @@ def verify_user_code(user_id):
|
||||
def send_user_sms_code(user_id):
|
||||
user_to_send_to = get_model_users(user_id=user_id)
|
||||
|
||||
if not user_to_send_to:
|
||||
return jsonify(result="error", message="No user found"), 404
|
||||
|
||||
verify_code, errors = request_verify_code_schema.load(request.get_json())
|
||||
if errors:
|
||||
return jsonify(result="error", message=errors), 400
|
||||
@@ -130,6 +132,8 @@ def send_user_sms_code(user_id):
|
||||
@user.route('/<int:user_id>/email-code', methods=['POST'])
|
||||
def send_user_email_code(user_id):
|
||||
user_to_send_to = get_model_users(user_id=user_id)
|
||||
if not user_to_send_to:
|
||||
return jsonify(result="error", message="No user found"), 404
|
||||
|
||||
verify_code, errors = request_verify_code_schema.load(request.get_json())
|
||||
if errors:
|
||||
@@ -152,6 +156,9 @@ def send_user_email_code(user_id):
|
||||
def send_user_code(user_id):
|
||||
user_to_send_to = get_model_users(user_id=user_id)
|
||||
|
||||
if not user_to_send_to:
|
||||
return jsonify(result="error", message="not found"), 404
|
||||
|
||||
verify_code, errors = old_request_verify_code_schema.load(request.get_json())
|
||||
if errors:
|
||||
return jsonify(result="error", message=errors), 400
|
||||
@@ -180,17 +187,7 @@ def send_user_code(user_id):
|
||||
@user.route('', methods=['GET'])
|
||||
def get_user(user_id=None):
|
||||
users = get_model_users(user_id=user_id)
|
||||
|
||||
if not users:
|
||||
return jsonify(result="error", message="not found"), 404
|
||||
result = users_schema.dump(users) if isinstance(users, list) else user_schema.dump(users)
|
||||
return jsonify(data=result.data)
|
||||
|
||||
|
||||
@user.route('/<int:user_id>/service', methods=['GET'])
|
||||
@user.route('/<int:user_id>/service/<service_id>', methods=['GET'])
|
||||
def get_service_by_user_id(user_id, service_id=None):
|
||||
ret_user = get_model_users(user_id=user_id)
|
||||
|
||||
services = get_model_services(user_id=ret_user.id, service_id=service_id)
|
||||
|
||||
services, errors = services_schema.dump(services) if isinstance(services, list) else service_schema.dump(services)
|
||||
return jsonify(data=services)
|
||||
|
||||
26
migrations/versions/0015_add_subject_line.py
Normal file
26
migrations/versions/0015_add_subject_line.py
Normal file
@@ -0,0 +1,26 @@
|
||||
"""empty message
|
||||
|
||||
Revision ID: 0015_add_subject_line
|
||||
Revises: 0015_add_permissions
|
||||
Create Date: 2016-02-18 09:43:29.282804
|
||||
|
||||
"""
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = '0015_add_subject_line'
|
||||
down_revision = '0015_add_permissions'
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
|
||||
def upgrade():
|
||||
pass
|
||||
op.add_column('templates', sa.Column('subject', sa.Text(), nullable=True))
|
||||
op.create_unique_constraint(None, 'templates', ['subject'])
|
||||
|
||||
|
||||
def downgrade():
|
||||
pass
|
||||
op.drop_constraint(None, 'templates', type_='unique')
|
||||
op.drop_column('templates', 'subject')
|
||||
23
migrations/versions/0016_add_email_from.py
Normal file
23
migrations/versions/0016_add_email_from.py
Normal file
@@ -0,0 +1,23 @@
|
||||
"""empty message
|
||||
|
||||
Revision ID: 0016_add_email_from
|
||||
Revises: 0015_add_subject_line
|
||||
Create Date: 2016-02-18 11:25:37.915137
|
||||
|
||||
"""
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = '0016_add_email_from'
|
||||
down_revision = '0015_add_subject_line'
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
|
||||
def upgrade():
|
||||
op.add_column('services', sa.Column('email_from', sa.Text(), nullable=True))
|
||||
op.execute("UPDATE services SET email_from=name")
|
||||
|
||||
|
||||
def downgrade():
|
||||
op.drop_column('services', 'email_from')
|
||||
26
migrations/versions/0017_emailfrom_notnull.py
Normal file
26
migrations/versions/0017_emailfrom_notnull.py
Normal file
@@ -0,0 +1,26 @@
|
||||
"""empty message
|
||||
|
||||
Revision ID: 0017_emailfrom_notnull
|
||||
Revises: 0016_add_email_from
|
||||
Create Date: 2016-02-18 11:41:25.753694
|
||||
|
||||
"""
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = '0017_emailfrom_notnull'
|
||||
down_revision = '0016_add_email_from'
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
|
||||
def upgrade():
|
||||
op.alter_column('services', 'email_from',
|
||||
existing_type=sa.TEXT(),
|
||||
nullable=False)
|
||||
|
||||
|
||||
def downgrade():
|
||||
op.alter_column('services', 'email_from',
|
||||
existing_type=sa.TEXT(),
|
||||
nullable=True)
|
||||
22
migrations/versions/0018_unique_emailfrom.py
Normal file
22
migrations/versions/0018_unique_emailfrom.py
Normal file
@@ -0,0 +1,22 @@
|
||||
"""empty message
|
||||
|
||||
Revision ID: 0018_unique_emailfrom
|
||||
Revises: 0017_emailfrom_notnull
|
||||
Create Date: 2016-02-18 11:42:18.246280
|
||||
|
||||
"""
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = '0018_unique_emailfrom'
|
||||
down_revision = '0017_emailfrom_notnull'
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
|
||||
def upgrade():
|
||||
op.create_unique_constraint(None, 'services', ['email_from'])
|
||||
|
||||
|
||||
def downgrade():
|
||||
op.drop_constraint(None, 'services', type_='unique')
|
||||
22
migrations/versions/0019_unique_servicename.py
Normal file
22
migrations/versions/0019_unique_servicename.py
Normal file
@@ -0,0 +1,22 @@
|
||||
"""empty message
|
||||
|
||||
Revision ID: 0019_unique_servicename
|
||||
Revises: 0018_unique_emailfrom
|
||||
Create Date: 2016-02-18 11:45:29.102891
|
||||
|
||||
"""
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = '0019_unique_servicename'
|
||||
down_revision = '0018_unique_emailfrom'
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
|
||||
def upgrade():
|
||||
op.create_unique_constraint(None, 'services', ['name'])
|
||||
|
||||
|
||||
def downgrade():
|
||||
op.drop_constraint(None, 'services', type_='unique')
|
||||
@@ -9,7 +9,7 @@ from app.models import ApiKey, Service
|
||||
def test_should_not_allow_request_with_no_token(notify_api):
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
response = client.get(url_for('service.get_service'))
|
||||
response = client.get('/service')
|
||||
assert response.status_code == 401
|
||||
data = json.loads(response.get_data())
|
||||
assert data['error'] == 'Unauthorized, authentication token must be provided'
|
||||
@@ -18,89 +18,103 @@ def test_should_not_allow_request_with_no_token(notify_api):
|
||||
def test_should_not_allow_request_with_incorrect_header(notify_api):
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
response = client.get(url_for('service.get_service'),
|
||||
headers={'Authorization': 'Basic 1234'})
|
||||
response = client.get(
|
||||
'/service',
|
||||
headers={'Authorization': 'Basic 1234'})
|
||||
assert response.status_code == 401
|
||||
data = json.loads(response.get_data())
|
||||
assert data['error'] == 'Unauthorized, authentication bearer scheme must be used'
|
||||
|
||||
|
||||
def test_should_not_allow_request_with_incorrect_token(notify_api, notify_db, notify_db_session, sample_api_key):
|
||||
def test_should_not_allow_request_with_incorrect_token(notify_api, sample_user):
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
response = client.get(url_for('service.get_service'),
|
||||
headers={'Authorization': 'Bearer 1234'})
|
||||
response = client.get(
|
||||
'/service',
|
||||
headers={'Authorization': 'Bearer 1234'})
|
||||
assert response.status_code == 403
|
||||
data = json.loads(response.get_data())
|
||||
assert data['error'] == 'Invalid token: signature'
|
||||
|
||||
|
||||
def test_should_not_allow_incorrect_path(notify_api, notify_db, notify_db_session, sample_api_key):
|
||||
def test_should_not_allow_incorrect_path(notify_api, sample_api_key):
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
token = create_jwt_token(request_method="GET",
|
||||
request_path="/bad",
|
||||
secret=get_unsigned_secrets(sample_api_key.service_id)[0],
|
||||
client_id=str(sample_api_key.service_id))
|
||||
response = client.get(url_for('service.get_service'),
|
||||
headers={'Authorization': "Bearer {}".format(token)})
|
||||
token = create_jwt_token(
|
||||
request_method="GET",
|
||||
request_path="/bad",
|
||||
secret=get_unsigned_secrets(sample_api_key.service_id)[0],
|
||||
client_id=str(sample_api_key.service_id)
|
||||
)
|
||||
response = client.get(
|
||||
'/service',
|
||||
headers={'Authorization': "Bearer {}".format(token)})
|
||||
assert response.status_code == 403
|
||||
data = json.loads(response.get_data())
|
||||
assert data['error'] == 'Invalid token: request'
|
||||
|
||||
|
||||
def test_should_not_allow_incorrect_method(notify_api, notify_db, notify_db_session, sample_api_key):
|
||||
def test_should_not_allow_incorrect_method(notify_api, sample_api_key):
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
token = __create_post_token(sample_api_key.service_id, {})
|
||||
response = client.get(url_for('service.get_service'),
|
||||
headers={'Authorization': "Bearer {}".format(token)})
|
||||
response = client.get(
|
||||
'/service',
|
||||
headers={'Authorization': "Bearer {}".format(token)}
|
||||
)
|
||||
assert response.status_code == 403
|
||||
data = json.loads(response.get_data())
|
||||
assert data['error'] == 'Invalid token: request'
|
||||
|
||||
|
||||
def test_should_not_allow_invalid_secret(notify_api, notify_db, notify_db_session, sample_api_key):
|
||||
def test_should_not_allow_invalid_secret(notify_api, sample_api_key):
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
token = create_jwt_token(request_method="POST", request_path="/", secret="not-so-secret",
|
||||
client_id=str(sample_api_key.service_id))
|
||||
response = client.get(url_for('service.get_service'),
|
||||
headers={'Authorization': "Bearer {}".format(token)})
|
||||
token = create_jwt_token(
|
||||
request_method="POST",
|
||||
request_path="/service",
|
||||
secret="not-so-secret",
|
||||
client_id=str(sample_api_key.service_id))
|
||||
response = client.get(
|
||||
'/service',
|
||||
headers={'Authorization': "Bearer {}".format(token)}
|
||||
)
|
||||
assert response.status_code == 403
|
||||
data = json.loads(response.get_data())
|
||||
assert data['error'] == 'Invalid token: signature'
|
||||
|
||||
|
||||
def test_should_allow_valid_token(notify_api, notify_db, notify_db_session, sample_api_key):
|
||||
def test_should_allow_valid_token(notify_api, sample_api_key):
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
token = __create_get_token(sample_api_key.service_id)
|
||||
response = client.get(url_for('service.get_service', service_id=str(sample_api_key.service_id)),
|
||||
headers={'Authorization': 'Bearer {}'.format(token)})
|
||||
response = client.get(
|
||||
'/service/{}'.format(str(sample_api_key.service_id)),
|
||||
headers={'Authorization': 'Bearer {}'.format(token)}
|
||||
)
|
||||
assert response.status_code == 200
|
||||
|
||||
|
||||
def test_should_allow_valid_token_for_request_with_path_params(notify_api, notify_db, notify_db_session,
|
||||
sample_api_key):
|
||||
def test_should_allow_valid_token_for_request_with_path_params(notify_api, sample_api_key):
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
token = __create_get_token(sample_api_key.service_id)
|
||||
response = client.get(url_for('service.get_service', service_id=str(sample_api_key.service_id)),
|
||||
headers={'Authorization': 'Bearer {}'.format(token)})
|
||||
response = client.get(
|
||||
'/service/{}'.format(str(sample_api_key.service_id)),
|
||||
headers={'Authorization': 'Bearer {}'.format(token)})
|
||||
assert response.status_code == 200
|
||||
|
||||
|
||||
def test_should_allow_valid_token_when_service_has_multiple_keys(notify_api, notify_db, notify_db_session,
|
||||
sample_api_key):
|
||||
def test_should_allow_valid_token_when_service_has_multiple_keys(notify_api, sample_api_key):
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
data = {'service_id': sample_api_key.service_id, 'name': 'some key name'}
|
||||
api_key = ApiKey(**data)
|
||||
save_model_api_key(api_key)
|
||||
token = __create_get_token(sample_api_key.service_id)
|
||||
response = client.get(url_for('service.get_service', service_id=str(sample_api_key.service_id)),
|
||||
headers={'Authorization': 'Bearer {}'.format(token)})
|
||||
response = client.get(
|
||||
'/service/{}'.format(str(sample_api_key.service_id)),
|
||||
headers={'Authorization': 'Bearer {}'.format(token)})
|
||||
assert response.status_code == 200
|
||||
|
||||
|
||||
@@ -111,26 +125,28 @@ JSON_BODY = json.dumps({
|
||||
})
|
||||
|
||||
|
||||
def test_should_allow_valid_token_with_post_body(notify_api, notify_db, notify_db_session, sample_api_key):
|
||||
def test_should_allow_valid_token_with_post_body(notify_api, sample_api_key):
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
service = Service.query.get(sample_api_key.service_id)
|
||||
data = {'name': 'new name',
|
||||
'users': [service.users[0].id],
|
||||
'limit': 1000,
|
||||
'restricted': False,
|
||||
'active': False}
|
||||
data = {
|
||||
'email_from': 'new name',
|
||||
'name': 'new name',
|
||||
'users': [service.users[0].id],
|
||||
'limit': 1000,
|
||||
'restricted': False,
|
||||
'active': False}
|
||||
|
||||
token = create_jwt_token(
|
||||
request_method="PUT",
|
||||
request_path=url_for('service.update_service', service_id=str(sample_api_key.service_id)),
|
||||
request_method="POST",
|
||||
request_path='/service/{}'.format(str(sample_api_key.service_id)),
|
||||
secret=get_unsigned_secret(sample_api_key.id),
|
||||
client_id=str(sample_api_key.service_id),
|
||||
request_body=json.dumps(data)
|
||||
)
|
||||
headers = [('Content-Type', 'application/json'), ('Authorization', 'Bearer {}'.format(token))]
|
||||
response = client.put(
|
||||
url_for('service.update_service', service_id=service.id),
|
||||
response = client.post(
|
||||
'/service/{}'.format(service.id),
|
||||
data=json.dumps(data),
|
||||
headers=headers)
|
||||
assert response.status_code == 200
|
||||
@@ -140,9 +156,10 @@ def test_should_not_allow_valid_token_with_invalid_post_body(notify_api, notify_
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
token = __create_post_token(str(sample_api_key.service_id), JSON_BODY)
|
||||
response = client.post(url_for('service.create_service'),
|
||||
data="spurious",
|
||||
headers={'Authorization': 'Bearer {}'.format(token)})
|
||||
response = client.post(
|
||||
'/service',
|
||||
data="spurious",
|
||||
headers={'Authorization': 'Bearer {}'.format(token)})
|
||||
assert response.status_code == 403
|
||||
data = json.loads(response.get_data())
|
||||
assert data['error'] == 'Invalid token: payload'
|
||||
@@ -154,19 +171,22 @@ def test_authentication_passes_admin_client_token(notify_api,
|
||||
sample_api_key):
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
token = create_jwt_token(request_method="GET",
|
||||
request_path=url_for('service.get_service'),
|
||||
secret=current_app.config.get('ADMIN_CLIENT_SECRET'),
|
||||
client_id=current_app.config.get('ADMIN_CLIENT_USER_NAME'))
|
||||
response = client.get(url_for('service.get_service'),
|
||||
headers={'Authorization': 'Bearer {}'.format(token)})
|
||||
token = create_jwt_token(
|
||||
request_method="GET",
|
||||
request_path='/service',
|
||||
secret=current_app.config.get('ADMIN_CLIENT_SECRET'),
|
||||
client_id=current_app.config.get('ADMIN_CLIENT_USER_NAME'))
|
||||
response = client.get(
|
||||
'/service',
|
||||
headers={'Authorization': 'Bearer {}'.format(token)})
|
||||
assert response.status_code == 200
|
||||
|
||||
|
||||
def test_authentication_passes_when_service_has_multiple_keys_some_expired(notify_api,
|
||||
notify_db,
|
||||
notify_db_session,
|
||||
sample_api_key):
|
||||
def test_authentication_passes_when_service_has_multiple_keys_some_expired(
|
||||
notify_api,
|
||||
notify_db,
|
||||
notify_db_session,
|
||||
sample_api_key):
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
exprired_key = {'service_id': sample_api_key.service_id, 'name': 'expired_key',
|
||||
@@ -176,12 +196,14 @@ def test_authentication_passes_when_service_has_multiple_keys_some_expired(notif
|
||||
another_key = {'service_id': sample_api_key.service_id, 'name': 'another_key'}
|
||||
api_key = ApiKey(**another_key)
|
||||
save_model_api_key(api_key)
|
||||
token = create_jwt_token(request_method="GET",
|
||||
request_path=url_for('service.get_service'),
|
||||
secret=get_unsigned_secret(api_key.id),
|
||||
client_id=str(sample_api_key.service_id))
|
||||
response = client.get(url_for('service.get_service'),
|
||||
headers={'Authorization': 'Bearer {}'.format(token)})
|
||||
token = create_jwt_token(
|
||||
request_method="GET",
|
||||
request_path='/service',
|
||||
secret=get_unsigned_secret(api_key.id),
|
||||
client_id=str(sample_api_key.service_id))
|
||||
response = client.get(
|
||||
'/service',
|
||||
headers={'Authorization': 'Bearer {}'.format(token)})
|
||||
assert response.status_code == 200
|
||||
|
||||
|
||||
@@ -197,18 +219,20 @@ def test_authentication_returns_token_expired_when_service_uses_expired_key_and_
|
||||
another_key = {'service_id': sample_api_key.service_id, 'name': 'another_key'}
|
||||
api_key = ApiKey(**another_key)
|
||||
save_model_api_key(api_key)
|
||||
token = create_jwt_token(request_method="GET",
|
||||
request_path=url_for('service.get_service'),
|
||||
secret=get_unsigned_secret(expired_api_key.id),
|
||||
client_id=str(sample_api_key.service_id))
|
||||
token = create_jwt_token(
|
||||
request_method="GET",
|
||||
request_path='/service',
|
||||
secret=get_unsigned_secret(expired_api_key.id),
|
||||
client_id=str(sample_api_key.service_id))
|
||||
# expire the key
|
||||
expire_the_key = {'id': expired_api_key.id,
|
||||
'service_id': str(sample_api_key.service_id),
|
||||
'name': 'expired_key',
|
||||
'expiry_date': datetime.now() + timedelta(hours=-2)}
|
||||
save_model_api_key(expired_api_key, expire_the_key)
|
||||
response = client.get(url_for('service.get_service'),
|
||||
headers={'Authorization': 'Bearer {}'.format(token)})
|
||||
response = client.get(
|
||||
'/service',
|
||||
headers={'Authorization': 'Bearer {}'.format(token)})
|
||||
assert response.status_code == 403
|
||||
data = json.loads(response.get_data())
|
||||
assert data['error'] == 'Invalid token: signature'
|
||||
@@ -220,14 +244,16 @@ def test_authentication_returns_error_when_api_client_has_no_secrets(notify_api,
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
api_secret = notify_api.config.get('ADMIN_CLIENT_SECRET')
|
||||
token = create_jwt_token(request_method="GET",
|
||||
request_path=url_for('service.get_service'),
|
||||
secret=api_secret,
|
||||
client_id=notify_api.config.get('ADMIN_CLIENT_USER_NAME')
|
||||
)
|
||||
token = create_jwt_token(
|
||||
request_method="GET",
|
||||
request_path='/service',
|
||||
secret=api_secret,
|
||||
client_id=notify_api.config.get('ADMIN_CLIENT_USER_NAME')
|
||||
)
|
||||
notify_api.config['ADMIN_CLIENT_SECRET'] = ''
|
||||
response = client.get(url_for('service.get_service'),
|
||||
headers={'Authorization': 'Bearer {}'.format(token)})
|
||||
response = client.get(
|
||||
'/service',
|
||||
headers={'Authorization': 'Bearer {}'.format(token)})
|
||||
assert response.status_code == 403
|
||||
error_message = json.loads(response.get_data())
|
||||
assert error_message['error'] == 'Invalid token: signature'
|
||||
@@ -237,12 +263,12 @@ def test_authentication_returns_error_when_api_client_has_no_secrets(notify_api,
|
||||
def __create_get_token(service_id):
|
||||
if service_id:
|
||||
return create_jwt_token(request_method="GET",
|
||||
request_path=url_for('service.get_service', service_id=service_id),
|
||||
request_path='/service/{}'.format(service_id),
|
||||
secret=get_unsigned_secrets(service_id)[0],
|
||||
client_id=str(service_id))
|
||||
else:
|
||||
return create_jwt_token(request_method="GET",
|
||||
request_path=url_for('service.get_service'),
|
||||
request_path='/service',
|
||||
secret=get_unsigned_secrets(service_id)[0],
|
||||
client_id=service_id)
|
||||
|
||||
@@ -250,7 +276,7 @@ def __create_get_token(service_id):
|
||||
def __create_post_token(service_id, request_body):
|
||||
return create_jwt_token(
|
||||
request_method="POST",
|
||||
request_path=url_for('service.create_service'),
|
||||
request_path='/service',
|
||||
secret=get_unsigned_secrets(service_id)[0],
|
||||
client_id=str(service_id),
|
||||
request_body=request_body
|
||||
|
||||
@@ -2,7 +2,7 @@ import pytest
|
||||
|
||||
from app.models import (User, Service, Template, ApiKey, Job, Notification)
|
||||
from app.dao.users_dao import (save_model_user, create_user_code, create_secret_code)
|
||||
from app.dao.services_dao import save_model_service
|
||||
from app.dao.services_dao import dao_create_service
|
||||
from app.dao.templates_dao import save_model_template
|
||||
from app.dao.api_key_dao import save_model_api_key
|
||||
from app.dao.jobs_dao import save_job
|
||||
@@ -13,8 +13,9 @@ import uuid
|
||||
@pytest.fixture(scope='function')
|
||||
def service_factory(notify_db, notify_db_session):
|
||||
class ServiceFactory(object):
|
||||
def get(self, service_name):
|
||||
user = sample_user(notify_db, notify_db_session)
|
||||
def get(self, service_name, user=None):
|
||||
if not user:
|
||||
user = sample_user(notify_db, notify_db_session)
|
||||
service = sample_service(notify_db, notify_db_session, service_name, user)
|
||||
sample_template(notify_db, notify_db_session, service=service)
|
||||
return service
|
||||
@@ -91,11 +92,13 @@ def sample_service(notify_db,
|
||||
'users': [user],
|
||||
'limit': 1000,
|
||||
'active': False,
|
||||
'restricted': False}
|
||||
'restricted': False,
|
||||
'email_from': service_name
|
||||
}
|
||||
service = Service.query.filter_by(name=service_name).first()
|
||||
if not service:
|
||||
service = Service(**data)
|
||||
save_model_service(service)
|
||||
dao_create_service(service, user)
|
||||
return service
|
||||
|
||||
|
||||
@@ -105,6 +108,7 @@ def sample_template(notify_db,
|
||||
template_name="Template Name",
|
||||
template_type="sms",
|
||||
content="This is a template",
|
||||
subject_line=None,
|
||||
service=None):
|
||||
if service is None:
|
||||
service = sample_service(notify_db, notify_db_session)
|
||||
@@ -115,6 +119,10 @@ def sample_template(notify_db,
|
||||
'content': content,
|
||||
'service': service
|
||||
}
|
||||
if subject_line:
|
||||
data.update({
|
||||
'subject': subject_line
|
||||
})
|
||||
template = Template(**data)
|
||||
save_model_template(template)
|
||||
return template
|
||||
|
||||
@@ -1,88 +1,172 @@
|
||||
import uuid
|
||||
import pytest
|
||||
from app.dao.services_dao import (
|
||||
save_model_service, get_model_services, DAOException, delete_model_service)
|
||||
from tests.app.conftest import sample_service as create_sample_service
|
||||
from tests.app.conftest import sample_user as create_sample_user
|
||||
from app.models import Service
|
||||
dao_create_service,
|
||||
dao_add_user_to_service,
|
||||
dao_remove_user_from_service,
|
||||
dao_fetch_all_services,
|
||||
dao_fetch_service_by_id,
|
||||
dao_fetch_all_services_by_user,
|
||||
dao_fetch_service_by_id_and_user
|
||||
)
|
||||
from app.dao.users_dao import save_model_user
|
||||
from app.models import Service, User
|
||||
from sqlalchemy.orm.exc import FlushError
|
||||
from sqlalchemy.exc import IntegrityError
|
||||
|
||||
|
||||
def test_create_service(notify_api, notify_db, notify_db_session, sample_user):
|
||||
def test_create_service(sample_user):
|
||||
assert Service.query.count() == 0
|
||||
service_name = 'Sample Service'
|
||||
data = {
|
||||
'name': service_name,
|
||||
'users': [sample_user],
|
||||
'limit': 1000,
|
||||
'active': False,
|
||||
'restricted': False}
|
||||
service = Service(**data)
|
||||
save_model_service(service)
|
||||
service = Service(name="service_name", email_from="email_from", limit=1000, active=True, restricted=False)
|
||||
dao_create_service(service, sample_user)
|
||||
assert Service.query.count() == 1
|
||||
assert Service.query.first().name == service_name
|
||||
assert Service.query.first().name == "service_name"
|
||||
assert Service.query.first().id == service.id
|
||||
assert sample_user in Service.query.first().users
|
||||
|
||||
|
||||
def test_get_services(notify_api, notify_db, notify_db_session, sample_user):
|
||||
sample_service = create_sample_service(notify_db,
|
||||
notify_db_session,
|
||||
user=sample_user)
|
||||
assert Service.query.count() == 1
|
||||
assert len(get_model_services()) == 1
|
||||
service_name = "Another service"
|
||||
sample_service = create_sample_service(notify_db,
|
||||
notify_db_session,
|
||||
service_name=service_name,
|
||||
user=sample_user)
|
||||
assert Service.query.count() == 2
|
||||
assert len(get_model_services()) == 2
|
||||
|
||||
|
||||
def test_get_user_service(notify_api, notify_db, notify_db_session, sample_user):
|
||||
def test_cannot_create_two_services_with_same_name(sample_user):
|
||||
assert Service.query.count() == 0
|
||||
service_name = "Random service"
|
||||
sample_service = create_sample_service(notify_db,
|
||||
notify_db_session,
|
||||
service_name=service_name,
|
||||
user=sample_user)
|
||||
assert get_model_services(service_id=sample_service.id).name == service_name
|
||||
assert Service.query.count() == 1
|
||||
service1 = Service(name="service_name", email_from="email_from1", limit=1000, active=True, restricted=False)
|
||||
service2 = Service(name="service_name", email_from="email_from2", limit=1000, active=True, restricted=False)
|
||||
with pytest.raises(IntegrityError) as excinfo:
|
||||
dao_create_service(service1, sample_user)
|
||||
dao_create_service(service2, sample_user)
|
||||
assert 'duplicate key value violates unique constraint "services_name_key"' in str(excinfo.value)
|
||||
|
||||
|
||||
def test_get_services_for_user(notify_api, notify_db, notify_db_session, sample_service):
|
||||
assert Service.query.count() == 1
|
||||
service_name = "Random service"
|
||||
second_user = create_sample_user(notify_db, notify_db_session, 'an@other.gov.uk')
|
||||
create_sample_service(notify_db, notify_db_session, service_name='another service', user=second_user)
|
||||
|
||||
sample_service = create_sample_service(notify_db,
|
||||
notify_db_session,
|
||||
service_name=service_name,
|
||||
user=sample_service.users[0])
|
||||
assert Service.query.count() == 3
|
||||
services = get_model_services(user_id=sample_service.users[0].id)
|
||||
assert len(services) == 2
|
||||
assert service_name in [x.name for x in services]
|
||||
assert 'Sample service' in [x.name for x in services]
|
||||
|
||||
|
||||
def test_missing_user_attribute(notify_api, notify_db, notify_db_session):
|
||||
def test_cannot_create_two_services_with_same_email_from(sample_user):
|
||||
assert Service.query.count() == 0
|
||||
try:
|
||||
service_name = 'Sample Service'
|
||||
data = {
|
||||
'name': service_name,
|
||||
'limit': 1000,
|
||||
'active': False,
|
||||
'restricted': False}
|
||||
|
||||
service = Service(**data)
|
||||
save_model_service(service)
|
||||
pytest.fail("DAOException not thrown")
|
||||
except DAOException as e:
|
||||
assert "Missing data for required attribute" in str(e)
|
||||
service1 = Service(name="service_name1", email_from="email_from", limit=1000, active=True, restricted=False)
|
||||
service2 = Service(name="service_name2", email_from="email_from", limit=1000, active=True, restricted=False)
|
||||
with pytest.raises(IntegrityError) as excinfo:
|
||||
dao_create_service(service1, sample_user)
|
||||
dao_create_service(service2, sample_user)
|
||||
assert 'duplicate key value violates unique constraint "services_email_from_key"' in str(excinfo.value)
|
||||
|
||||
|
||||
def test_delete_service(notify_api, notify_db, notify_db_session, sample_service):
|
||||
assert Service.query.count() == 1
|
||||
delete_model_service(sample_service)
|
||||
def test_cannot_create_service_with_no_user(notify_db_session):
|
||||
assert Service.query.count() == 0
|
||||
service = Service(name="service_name", email_from="email_from", limit=1000, active=True, restricted=False)
|
||||
with pytest.raises(FlushError) as excinfo:
|
||||
dao_create_service(service, None)
|
||||
assert "Can't flush None value found in collection Service.users" in str(excinfo.value)
|
||||
|
||||
|
||||
def test_should_add_user_to_service(sample_user):
|
||||
service = Service(name="service_name", email_from="email_from", limit=1000, active=True, restricted=False)
|
||||
dao_create_service(service, sample_user)
|
||||
assert sample_user in Service.query.first().users
|
||||
new_user = User(
|
||||
name='Test User',
|
||||
email_address='new_user@digital.cabinet-office.gov.uk',
|
||||
password='password',
|
||||
mobile_number='+447700900986'
|
||||
)
|
||||
save_model_user(new_user)
|
||||
dao_add_user_to_service(service, new_user)
|
||||
assert new_user in Service.query.first().users
|
||||
|
||||
|
||||
def test_should_remove_user_from_service(sample_user):
|
||||
service = Service(name="service_name", email_from="email_from", limit=1000, active=True, restricted=False)
|
||||
dao_create_service(service, sample_user)
|
||||
new_user = User(
|
||||
name='Test User',
|
||||
email_address='new_user@digital.cabinet-office.gov.uk',
|
||||
password='password',
|
||||
mobile_number='+447700900986'
|
||||
)
|
||||
save_model_user(new_user)
|
||||
dao_add_user_to_service(service, new_user)
|
||||
assert new_user in Service.query.first().users
|
||||
dao_remove_user_from_service(service, new_user)
|
||||
assert new_user not in Service.query.first().users
|
||||
|
||||
|
||||
def test_get_all_services(service_factory):
|
||||
service_factory.get('service 1')
|
||||
assert len(dao_fetch_all_services()) == 1
|
||||
assert dao_fetch_all_services()[0].name == 'service 1'
|
||||
|
||||
service_factory.get('service 2')
|
||||
assert len(dao_fetch_all_services()) == 2
|
||||
assert dao_fetch_all_services()[1].name == 'service 2'
|
||||
|
||||
|
||||
def test_get_all_services_should_return_in_created_order(service_factory):
|
||||
service_factory.get('service 1')
|
||||
service_factory.get('service 2')
|
||||
service_factory.get('service 3')
|
||||
service_factory.get('service 4')
|
||||
assert len(dao_fetch_all_services()) == 4
|
||||
assert dao_fetch_all_services()[0].name == 'service 1'
|
||||
assert dao_fetch_all_services()[1].name == 'service 2'
|
||||
assert dao_fetch_all_services()[2].name == 'service 3'
|
||||
assert dao_fetch_all_services()[3].name == 'service 4'
|
||||
|
||||
|
||||
def test_get_all_services_should_return_empty_list_if_no_services():
|
||||
assert len(dao_fetch_all_services()) == 0
|
||||
|
||||
|
||||
def test_get_all_services_for_user(service_factory, sample_user):
|
||||
service_factory.get('service 1', sample_user)
|
||||
service_factory.get('service 2', sample_user)
|
||||
service_factory.get('service 3', sample_user)
|
||||
assert len(dao_fetch_all_services_by_user(sample_user.id)) == 3
|
||||
assert dao_fetch_all_services_by_user(sample_user.id)[0].name == 'service 1'
|
||||
assert dao_fetch_all_services_by_user(sample_user.id)[1].name == 'service 2'
|
||||
assert dao_fetch_all_services_by_user(sample_user.id)[2].name == 'service 3'
|
||||
|
||||
|
||||
def test_get_all_only_services_user_has_access_to(service_factory, sample_user):
|
||||
service_factory.get('service 1', sample_user)
|
||||
service_factory.get('service 2', sample_user)
|
||||
service_3 = service_factory.get('service 3', sample_user)
|
||||
new_user = User(
|
||||
name='Test User',
|
||||
email_address='new_user@digital.cabinet-office.gov.uk',
|
||||
password='password',
|
||||
mobile_number='+447700900986'
|
||||
)
|
||||
save_model_user(new_user)
|
||||
dao_add_user_to_service(service_3, new_user)
|
||||
assert len(dao_fetch_all_services_by_user(sample_user.id)) == 3
|
||||
assert dao_fetch_all_services_by_user(sample_user.id)[0].name == 'service 1'
|
||||
assert dao_fetch_all_services_by_user(sample_user.id)[1].name == 'service 2'
|
||||
assert dao_fetch_all_services_by_user(sample_user.id)[2].name == 'service 3'
|
||||
assert len(dao_fetch_all_services_by_user(new_user.id)) == 1
|
||||
assert dao_fetch_all_services_by_user(new_user.id)[0].name == 'service 3'
|
||||
|
||||
|
||||
def test_get_all_user_services_should_return_empty_list_if_no_services_for_user(sample_user):
|
||||
assert len(dao_fetch_all_services_by_user(sample_user.id)) == 0
|
||||
|
||||
|
||||
def test_get_service_by_id_returns_none_if_no_service(notify_db):
|
||||
assert not dao_fetch_service_by_id(str(uuid.uuid4()))
|
||||
|
||||
|
||||
def test_get_service_by_id_returns_service(service_factory):
|
||||
service = service_factory.get('testing')
|
||||
assert dao_fetch_service_by_id(service.id).name == 'testing'
|
||||
|
||||
|
||||
def test_can_get_service_by_id_and_user(service_factory, sample_user):
|
||||
service = service_factory.get('service 1', sample_user)
|
||||
assert dao_fetch_service_by_id_and_user(service.id, sample_user.id).name == 'service 1'
|
||||
|
||||
|
||||
def test_cannot_get_service_by_id_and_owned_by_different_user(service_factory, sample_user):
|
||||
service1 = service_factory.get('service 1', sample_user)
|
||||
new_user = User(
|
||||
name='Test User',
|
||||
email_address='new_user@digital.cabinet-office.gov.uk',
|
||||
password='password',
|
||||
mobile_number='+447700900986'
|
||||
)
|
||||
save_model_user(new_user)
|
||||
service2 = service_factory.get('service 2', new_user)
|
||||
assert dao_fetch_service_by_id_and_user(service1.id, sample_user.id).name == 'service 1'
|
||||
assert not dao_fetch_service_by_id_and_user(service2.id, sample_user.id)
|
||||
|
||||
@@ -1,352 +1,347 @@
|
||||
import json
|
||||
import uuid
|
||||
from collections import Set
|
||||
|
||||
from flask import url_for
|
||||
from app.dao.services_dao import save_model_service
|
||||
from app.models import (Service, ApiKey, Template)
|
||||
|
||||
from app.dao.users_dao import save_model_user
|
||||
from app.models import User, Template, Service
|
||||
from tests import create_authorization_header
|
||||
from tests.app.conftest import sample_user as create_sample_user
|
||||
from tests.app.conftest import sample_service as create_sample_service
|
||||
|
||||
|
||||
def test_get_service_list(notify_api, notify_db, notify_db_session, sample_service):
|
||||
"""
|
||||
Tests GET endpoint '/' to retrieve entire service list.
|
||||
"""
|
||||
def test_get_service_list(notify_api, service_factory):
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
auth_header = create_authorization_header(path=url_for('service.get_service'),
|
||||
method='GET')
|
||||
response = client.get(url_for('service.get_service'),
|
||||
headers=[auth_header])
|
||||
service_factory.get('one')
|
||||
service_factory.get('two')
|
||||
service_factory.get('three')
|
||||
|
||||
auth_header = create_authorization_header(
|
||||
path='/service',
|
||||
method='GET'
|
||||
)
|
||||
response = client.get(
|
||||
'/service',
|
||||
headers=[auth_header]
|
||||
)
|
||||
assert response.status_code == 200
|
||||
json_resp = json.loads(response.get_data(as_text=True))
|
||||
# TODO assert correct json returned
|
||||
assert len(json_resp['data']) == 1
|
||||
assert json_resp['data'][0]['name'] == sample_service.name
|
||||
assert json_resp['data'][0]['id'] == str(sample_service.id)
|
||||
assert len(json_resp['data']) == 3
|
||||
assert json_resp['data'][0]['name'] == 'one'
|
||||
assert json_resp['data'][1]['name'] == 'two'
|
||||
assert json_resp['data'][2]['name'] == 'three'
|
||||
|
||||
|
||||
def test_get_service(notify_api, notify_db, notify_db_session, sample_service):
|
||||
"""
|
||||
Tests GET endpoint '/<service_id>' to retrieve a single service.
|
||||
"""
|
||||
def test_get_service_list_by_user(notify_api, service_factory, sample_user):
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
auth_header = create_authorization_header(path=url_for('service.get_service', service_id=sample_service.id),
|
||||
method='GET')
|
||||
resp = client.get(url_for('service.get_service',
|
||||
service_id=sample_service.id),
|
||||
headers=[auth_header])
|
||||
service_factory.get('one', sample_user)
|
||||
service_factory.get('two', sample_user)
|
||||
service_factory.get('three', sample_user)
|
||||
|
||||
auth_header = create_authorization_header(
|
||||
path='/service',
|
||||
method='GET'
|
||||
)
|
||||
response = client.get(
|
||||
'/service?user_id='.format(sample_user.id),
|
||||
headers=[auth_header]
|
||||
)
|
||||
json_resp = json.loads(response.get_data(as_text=True))
|
||||
assert response.status_code == 200
|
||||
assert len(json_resp['data']) == 3
|
||||
assert json_resp['data'][0]['name'] == 'one'
|
||||
assert json_resp['data'][1]['name'] == 'two'
|
||||
assert json_resp['data'][2]['name'] == 'three'
|
||||
|
||||
|
||||
def test_get_service_list_by_user_should_return_empty_list_if_no_services(notify_api, service_factory, sample_user):
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
new_user = User(
|
||||
name='Test User',
|
||||
email_address='new_user@digital.cabinet-office.gov.uk',
|
||||
password='password',
|
||||
mobile_number='+447700900986'
|
||||
)
|
||||
save_model_user(new_user)
|
||||
|
||||
service_factory.get('one', sample_user)
|
||||
service_factory.get('two', sample_user)
|
||||
service_factory.get('three', sample_user)
|
||||
|
||||
auth_header = create_authorization_header(
|
||||
path='/service',
|
||||
method='GET'
|
||||
)
|
||||
response = client.get(
|
||||
'/service?user_id={}'.format(new_user.id),
|
||||
headers=[auth_header]
|
||||
)
|
||||
json_resp = json.loads(response.get_data(as_text=True))
|
||||
assert response.status_code == 200
|
||||
assert len(json_resp['data']) == 0
|
||||
|
||||
|
||||
def test_get_service_list_should_return_empty_list_if_no_services(notify_api, notify_db):
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
auth_header = create_authorization_header(
|
||||
path='/service',
|
||||
method='GET'
|
||||
)
|
||||
response = client.get(
|
||||
'/service',
|
||||
headers=[auth_header]
|
||||
)
|
||||
assert response.status_code == 200
|
||||
json_resp = json.loads(response.get_data(as_text=True))
|
||||
assert len(json_resp['data']) == 0
|
||||
|
||||
|
||||
def test_get_service_by_id(notify_api, sample_service):
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
auth_header = create_authorization_header(
|
||||
path='/service/{}'.format(sample_service.id),
|
||||
method='GET'
|
||||
)
|
||||
resp = client.get(
|
||||
'/service/{}'.format(sample_service.id),
|
||||
headers=[auth_header]
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
json_resp = json.loads(resp.get_data(as_text=True))
|
||||
assert json_resp['data']['name'] == sample_service.name
|
||||
assert json_resp['data']['id'] == str(sample_service.id)
|
||||
|
||||
|
||||
def test_get_service_for_user(notify_api, notify_db, notify_db_session, sample_service):
|
||||
second_user = create_sample_user(notify_db, notify_db_session, 'an@other.gov.uk')
|
||||
create_sample_service(notify_db, notify_db_session, service_name='Second Service', user=second_user)
|
||||
create_sample_service(notify_db, notify_db_session, service_name='Another Service', user=sample_service.users[0])
|
||||
def test_get_service_by_id_should_404_if_no_service(notify_api, notify_db):
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
service_id = str(uuid.uuid4())
|
||||
auth_header = create_authorization_header(
|
||||
path='/service/{}'.format(service_id),
|
||||
method='GET'
|
||||
)
|
||||
resp = client.get(
|
||||
'/service/{}'.format(service_id),
|
||||
headers=[auth_header]
|
||||
)
|
||||
assert resp.status_code == 404
|
||||
json_resp = json.loads(resp.get_data(as_text=True))
|
||||
assert json_resp['result'] == 'error'
|
||||
assert json_resp['message'] == 'not found'
|
||||
|
||||
|
||||
def test_get_service_by_id_and_user(notify_api, service_factory, sample_user):
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
service = service_factory.get('new service', sample_user)
|
||||
auth_header = create_authorization_header(
|
||||
path='/service/{}'.format(service.id),
|
||||
method='GET'
|
||||
)
|
||||
resp = client.get(
|
||||
'/service/{}?user_id={}'.format(service.id, sample_user.id),
|
||||
headers=[auth_header]
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
json_resp = json.loads(resp.get_data(as_text=True))
|
||||
assert json_resp['data']['name'] == service.name
|
||||
assert json_resp['data']['id'] == str(service.id)
|
||||
|
||||
|
||||
def test_get_service_by_id_should_404_if_no_service_for_user(notify_api, sample_user):
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
service_id = str(uuid.uuid4())
|
||||
auth_header = create_authorization_header(
|
||||
path='/service/{}'.format(service_id),
|
||||
method='GET'
|
||||
)
|
||||
resp = client.get(
|
||||
'/service/{}?user_id={}'.format(service_id, sample_user.id),
|
||||
headers=[auth_header]
|
||||
)
|
||||
assert resp.status_code == 404
|
||||
json_resp = json.loads(resp.get_data(as_text=True))
|
||||
assert json_resp['result'] == 'error'
|
||||
assert json_resp['message'] == 'not found'
|
||||
|
||||
|
||||
def test_create_service(notify_api, sample_user):
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
data = {
|
||||
'name': 'created service',
|
||||
'user_id': sample_user.id,
|
||||
'limit': 1000,
|
||||
'restricted': False,
|
||||
'active': False}
|
||||
auth_header = create_authorization_header(
|
||||
path='/service',
|
||||
method='POST',
|
||||
request_body=json.dumps(data)
|
||||
)
|
||||
headers = [('Content-Type', 'application/json'), auth_header]
|
||||
resp = client.post(
|
||||
'/service',
|
||||
data=json.dumps(data),
|
||||
headers=headers)
|
||||
json_resp = json.loads(resp.get_data(as_text=True))
|
||||
assert resp.status_code == 201
|
||||
assert json_resp['data']['id']
|
||||
assert json_resp['data']['name'] == 'created service'
|
||||
assert json_resp['data']['email_from'] == 'created.service'
|
||||
|
||||
auth_header_fetch = create_authorization_header(
|
||||
path='/service/{}'.format(json_resp['data']['id']),
|
||||
method='GET'
|
||||
)
|
||||
|
||||
resp = client.get(
|
||||
'/service/{}?user_id={}'.format(json_resp['data']['id'], sample_user.id),
|
||||
headers=[auth_header_fetch]
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
json_resp = json.loads(resp.get_data(as_text=True))
|
||||
assert json_resp['data']['name'] == 'created service'
|
||||
|
||||
|
||||
def test_should_not_create_service_with_missing_user_id_field(notify_api):
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
data = {
|
||||
'email_from': 'service',
|
||||
'name': 'created service',
|
||||
'limit': 1000,
|
||||
'restricted': False,
|
||||
'active': False
|
||||
}
|
||||
auth_header = create_authorization_header(
|
||||
path='/service',
|
||||
method='POST',
|
||||
request_body=json.dumps(data)
|
||||
)
|
||||
headers = [('Content-Type', 'application/json'), auth_header]
|
||||
resp = client.post(
|
||||
'/service',
|
||||
data=json.dumps(data),
|
||||
headers=headers)
|
||||
json_resp = json.loads(resp.get_data(as_text=True))
|
||||
assert resp.status_code == 400
|
||||
assert json_resp['result'] == 'error'
|
||||
assert 'Missing data for required field.' in json_resp['message']['user_id']
|
||||
|
||||
|
||||
def test_should_not_create_service_with_missing_if_user_id_is_not_in_database(notify_api, notify_db):
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
data = {
|
||||
'email_from': 'service',
|
||||
'user_id': 1234,
|
||||
'name': 'created service',
|
||||
'limit': 1000,
|
||||
'restricted': False,
|
||||
'active': False
|
||||
}
|
||||
auth_header = create_authorization_header(
|
||||
path='/service',
|
||||
method='POST',
|
||||
request_body=json.dumps(data)
|
||||
)
|
||||
headers = [('Content-Type', 'application/json'), auth_header]
|
||||
resp = client.post(
|
||||
'/service',
|
||||
data=json.dumps(data),
|
||||
headers=headers)
|
||||
json_resp = json.loads(resp.get_data(as_text=True))
|
||||
assert resp.status_code == 400
|
||||
assert json_resp['result'] == 'error'
|
||||
assert 'not found' in json_resp['message']['user_id']
|
||||
|
||||
|
||||
def test_should_not_create_service_if_missing_data(notify_api, sample_user):
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
data = {
|
||||
'user_id': sample_user.id
|
||||
}
|
||||
auth_header = create_authorization_header(
|
||||
path='/service',
|
||||
method='POST',
|
||||
request_body=json.dumps(data)
|
||||
)
|
||||
headers = [('Content-Type', 'application/json'), auth_header]
|
||||
resp = client.post(
|
||||
'/service',
|
||||
data=json.dumps(data),
|
||||
headers=headers)
|
||||
json_resp = json.loads(resp.get_data(as_text=True))
|
||||
assert resp.status_code == 400
|
||||
assert json_resp['result'] == 'error'
|
||||
assert 'Missing data for required field.' in json_resp['message']['name']
|
||||
assert 'Missing data for required field.' in json_resp['message']['active']
|
||||
assert 'Missing data for required field.' in json_resp['message']['limit']
|
||||
assert 'Missing data for required field.' in json_resp['message']['restricted']
|
||||
|
||||
|
||||
def test_update_service(notify_api, sample_service):
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
auth_header = create_authorization_header(
|
||||
path='/service',
|
||||
method='GET')
|
||||
resp = client.get('/service?user_id={}'.format(sample_service.users[0].id),
|
||||
headers=[auth_header])
|
||||
path='/service/{}'.format(sample_service.id),
|
||||
method='GET'
|
||||
)
|
||||
resp = client.get(
|
||||
'/service/{}'.format(sample_service.id),
|
||||
headers=[auth_header]
|
||||
)
|
||||
json_resp = json.loads(resp.get_data(as_text=True))
|
||||
assert resp.status_code == 200
|
||||
json_resp = json.loads(resp.get_data(as_text=True))
|
||||
assert len(json_resp['data']) == 2
|
||||
print(x for x in json_resp['data'])
|
||||
assert 'Another Service' in [x.get('name') for x in json_resp['data']]
|
||||
assert 'Sample service' in [x.get('name') for x in json_resp['data']]
|
||||
assert 'Second Service' not in [x.get('name') for x in json_resp['data']]
|
||||
assert json_resp['data']['name'] == sample_service.name
|
||||
|
||||
|
||||
def test_post_service(notify_api, notify_db, notify_db_session, sample_user):
|
||||
"""
|
||||
Tests POST endpoint '/' to create a service.
|
||||
"""
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
assert Service.query.count() == 0
|
||||
data = {
|
||||
'name': 'created service',
|
||||
'users': [sample_user.id],
|
||||
'limit': 1000,
|
||||
'restricted': False,
|
||||
'active': False}
|
||||
auth_header = create_authorization_header(path=url_for('service.create_service'),
|
||||
method='POST',
|
||||
request_body=json.dumps(data))
|
||||
headers = [('Content-Type', 'application/json'), auth_header]
|
||||
'name': 'updated service name'
|
||||
}
|
||||
|
||||
auth_header = create_authorization_header(
|
||||
path='/service/{}'.format(sample_service.id),
|
||||
method='POST',
|
||||
request_body=json.dumps(data)
|
||||
)
|
||||
|
||||
resp = client.post(
|
||||
url_for('service.create_service'),
|
||||
'/service/{}'.format(sample_service.id),
|
||||
data=json.dumps(data),
|
||||
headers=headers)
|
||||
assert resp.status_code == 201
|
||||
service = Service.query.filter_by(name='created service').first()
|
||||
json_resp = json.loads(resp.get_data(as_text=True))
|
||||
assert json_resp['data']['name'] == service.name
|
||||
assert json_resp['data']['limit'] == service.limit
|
||||
|
||||
|
||||
def test_post_service_multiple_users(notify_api, notify_db, notify_db_session, sample_user):
|
||||
"""
|
||||
Tests POST endpoint '/' to create a service with multiple users.
|
||||
"""
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
another_user = create_sample_user(
|
||||
notify_db,
|
||||
notify_db_session,
|
||||
"new@digital.cabinet-office.gov.uk")
|
||||
assert Service.query.count() == 0
|
||||
data = {
|
||||
'name': 'created service',
|
||||
'users': [sample_user.id, another_user.id],
|
||||
'limit': 1000,
|
||||
'restricted': False,
|
||||
'active': False}
|
||||
auth_header = create_authorization_header(path=url_for('service.create_service'),
|
||||
method='POST',
|
||||
request_body=json.dumps(data))
|
||||
headers = [('Content-Type', 'application/json'), auth_header]
|
||||
resp = client.post(
|
||||
url_for('service.create_service'),
|
||||
data=json.dumps(data),
|
||||
headers=headers)
|
||||
assert resp.status_code == 201
|
||||
service = Service.query.filter_by(name='created service').first()
|
||||
json_resp = json.loads(resp.get_data(as_text=True))
|
||||
assert json_resp['data']['name'] == service.name
|
||||
assert json_resp['data']['limit'] == service.limit
|
||||
assert len(service.users) == 2
|
||||
|
||||
|
||||
def test_post_service_without_users_attribute(notify_api, notify_db, notify_db_session):
|
||||
"""
|
||||
Tests POST endpoint '/' to create a service without 'users' attribute.
|
||||
"""
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
assert Service.query.count() == 0
|
||||
data = {
|
||||
'name': 'created service',
|
||||
'limit': 1000,
|
||||
'restricted': False,
|
||||
'active': False}
|
||||
auth_header = create_authorization_header(path=url_for('service.create_service'),
|
||||
method='POST',
|
||||
request_body=json.dumps(data))
|
||||
headers = [('Content-Type', 'application/json'), auth_header]
|
||||
resp = client.post(
|
||||
url_for('service.create_service'),
|
||||
data=json.dumps(data),
|
||||
headers=headers)
|
||||
assert resp.status_code == 500
|
||||
assert Service.query.count() == 0
|
||||
json_resp = json.loads(resp.get_data(as_text=True))
|
||||
assert json_resp['message'] == '{"users": ["Missing data for required attribute"]}'
|
||||
|
||||
|
||||
def test_put_service(notify_api, notify_db, notify_db_session, sample_service):
|
||||
"""
|
||||
Tests PUT endpoint '/<service_id>' to edit a service.
|
||||
"""
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
assert Service.query.count() == 1
|
||||
new_name = 'updated service'
|
||||
data = {
|
||||
'name': new_name,
|
||||
'users': [sample_service.users[0].id],
|
||||
'limit': 1000,
|
||||
'restricted': False,
|
||||
'active': False}
|
||||
auth_header = create_authorization_header(path=url_for('service.update_service',
|
||||
service_id=sample_service.id),
|
||||
method='PUT',
|
||||
request_body=json.dumps(data))
|
||||
headers = [('Content-Type', 'application/json'), auth_header]
|
||||
resp = client.put(
|
||||
url_for('service.update_service', service_id=sample_service.id),
|
||||
data=json.dumps(data),
|
||||
headers=headers)
|
||||
assert Service.query.count() == 1
|
||||
headers=[('Content-Type', 'application/json'), auth_header]
|
||||
)
|
||||
result = json.loads(resp.get_data(as_text=True))
|
||||
assert resp.status_code == 200
|
||||
updated_service = Service.query.get(sample_service.id)
|
||||
json_resp = json.loads(resp.get_data(as_text=True))
|
||||
assert json_resp['data']['name'] == updated_service.name
|
||||
assert json_resp['data']['limit'] == updated_service.limit
|
||||
assert updated_service.name == new_name
|
||||
assert result['data']['name'] == 'updated service name'
|
||||
|
||||
|
||||
def test_put_service_not_exists(notify_api, notify_db, notify_db_session, sample_service):
|
||||
"""
|
||||
Tests PUT endpoint '/<service_id>' service doesn't exist.
|
||||
"""
|
||||
def test_update_service_should_404_if_id_is_invalid(notify_api, notify_db):
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
sample_user = sample_service.users[0]
|
||||
new_name = 'updated service'
|
||||
|
||||
data = {
|
||||
'name': new_name,
|
||||
'users': [sample_user.id],
|
||||
'limit': 1000,
|
||||
'restricted': False,
|
||||
'active': False}
|
||||
'name': 'updated service name'
|
||||
}
|
||||
|
||||
missing_service_id = uuid.uuid4()
|
||||
auth_header = create_authorization_header(path=url_for('service.update_service',
|
||||
service_id=missing_service_id),
|
||||
method='PUT',
|
||||
request_body=json.dumps(data))
|
||||
resp = client.put(
|
||||
url_for('service.update_service', service_id=missing_service_id),
|
||||
|
||||
auth_header = create_authorization_header(
|
||||
path='/service/{}'.format(missing_service_id),
|
||||
method='POST',
|
||||
request_body=json.dumps(data)
|
||||
)
|
||||
|
||||
resp = client.post(
|
||||
'/service/{}'.format(missing_service_id),
|
||||
data=json.dumps(data),
|
||||
headers=[('Content-Type', 'application/json'), auth_header])
|
||||
headers=[('Content-Type', 'application/json'), auth_header]
|
||||
)
|
||||
assert resp.status_code == 404
|
||||
assert Service.query.first().name == sample_service.name
|
||||
assert Service.query.first().name != new_name
|
||||
|
||||
|
||||
def test_put_service_add_user(notify_api, notify_db, notify_db_session, sample_service):
|
||||
"""
|
||||
Tests PUT endpoint '/<service_id>' add user to the service.
|
||||
"""
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
assert Service.query.count() == 1
|
||||
another_user = create_sample_user(
|
||||
notify_db,
|
||||
notify_db_session,
|
||||
"new@digital.cabinet-office.gov.uk")
|
||||
new_name = 'updated service'
|
||||
sample_user = sample_service.users[0]
|
||||
data = {
|
||||
'name': new_name,
|
||||
'users': [sample_user.id, another_user.id],
|
||||
'limit': 1000,
|
||||
'restricted': False,
|
||||
'active': False}
|
||||
auth_header = create_authorization_header(path=url_for('service.update_service',
|
||||
service_id=sample_service.id),
|
||||
method='PUT',
|
||||
request_body=json.dumps(data))
|
||||
headers = [('Content-Type', 'application/json'), auth_header]
|
||||
resp = client.put(
|
||||
url_for('service.update_service', service_id=sample_service.id),
|
||||
data=json.dumps(data),
|
||||
headers=headers)
|
||||
assert Service.query.count() == 1
|
||||
assert resp.status_code == 200
|
||||
updated_service = Service.query.get(sample_service.id)
|
||||
json_resp = json.loads(resp.get_data(as_text=True))
|
||||
assert len(json_resp['data']['users']) == 2
|
||||
assert sample_user.id in json_resp['data']['users']
|
||||
assert another_user.id in json_resp['data']['users']
|
||||
assert len(updated_service.users) == 2
|
||||
assert set(updated_service.users) == set([sample_user, another_user])
|
||||
|
||||
|
||||
def test_put_service_remove_user(notify_api, notify_db, notify_db_session, sample_service):
|
||||
"""
|
||||
Tests PUT endpoint '/<service_id>' add user to the service.
|
||||
"""
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
sample_user = sample_service.users[0]
|
||||
another_user = create_sample_user(
|
||||
notify_db,
|
||||
notify_db_session,
|
||||
"new@digital.cabinet-office.gov.uk")
|
||||
data = {
|
||||
'name': sample_service.name,
|
||||
'users': [sample_user, another_user],
|
||||
'limit': sample_service.limit,
|
||||
'restricted': sample_service.restricted,
|
||||
'active': sample_service.active}
|
||||
save_model_service(sample_service, update_dict=data)
|
||||
assert Service.query.count() == 1
|
||||
data['users'] = [another_user.id]
|
||||
|
||||
auth_header = create_authorization_header(path=url_for('service.update_service',
|
||||
service_id=sample_service.id),
|
||||
method='PUT',
|
||||
request_body=json.dumps(data))
|
||||
headers = [('Content-Type', 'application/json'), auth_header]
|
||||
resp = client.put(
|
||||
url_for('service.update_service', service_id=sample_service.id),
|
||||
data=json.dumps(data),
|
||||
headers=headers)
|
||||
assert Service.query.count() == 1
|
||||
assert resp.status_code == 200
|
||||
updated_service = Service.query.get(sample_service.id)
|
||||
json_resp = json.loads(resp.get_data(as_text=True))
|
||||
assert len(json_resp['data']['users']) == 1
|
||||
assert sample_user.id not in json_resp['data']['users']
|
||||
assert another_user.id in json_resp['data']['users']
|
||||
assert sample_user not in updated_service.users
|
||||
assert another_user in updated_service.users
|
||||
|
||||
|
||||
def test_delete_service(notify_api, notify_db, notify_db_session, sample_service):
|
||||
"""
|
||||
Tests DELETE endpoint '/<service_id>' delete service.
|
||||
"""
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
auth_header = create_authorization_header(path=url_for('service.update_service',
|
||||
service_id=sample_service.id),
|
||||
method='DELETE')
|
||||
resp = client.delete(
|
||||
url_for('service.update_service', service_id=sample_service.id),
|
||||
headers=[('Content-Type', 'application/json'), auth_header])
|
||||
assert resp.status_code == 202
|
||||
json_resp = json.loads(resp.get_data(as_text=True))
|
||||
json_resp['data']['name'] == sample_service.name
|
||||
assert Service.query.count() == 0
|
||||
|
||||
|
||||
def test_delete_service_not_exists(notify_api, notify_db, notify_db_session, sample_service):
|
||||
"""
|
||||
Tests DELETE endpoint '/<service_id>' delete service doesn't exist.
|
||||
"""
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
assert Service.query.count() == 1
|
||||
missing_service_id = uuid.uuid4()
|
||||
auth_header = create_authorization_header(path=url_for('service.update_service',
|
||||
service_id=missing_service_id),
|
||||
method='DELETE')
|
||||
resp = client.delete(
|
||||
url_for('service.update_service', service_id=missing_service_id),
|
||||
headers=[('Content-Type', 'application/json'), auth_header])
|
||||
assert resp.status_code == 404
|
||||
assert Service.query.count() == 1
|
||||
|
||||
|
||||
def test_create_service_should_create_new_service_for_user(notify_api, notify_db, notify_db_session, sample_user):
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
assert Service.query.count() == 0
|
||||
data = {
|
||||
'name': 'created service',
|
||||
'users': [sample_user.id],
|
||||
'limit': 1000,
|
||||
'restricted': False,
|
||||
'active': False}
|
||||
auth_header = create_authorization_header(path=url_for('service.create_service'),
|
||||
method='POST',
|
||||
request_body=json.dumps(data))
|
||||
headers = [('Content-Type', 'application/json'), auth_header]
|
||||
resp = client.post(url_for('service.create_service'),
|
||||
data=json.dumps(data),
|
||||
headers=headers)
|
||||
assert resp.status_code == 201
|
||||
assert Service.query.count() == 1
|
||||
|
||||
|
||||
def test_create_template(notify_api, notify_db, notify_db_session, sample_service):
|
||||
|
||||
@@ -270,153 +270,13 @@ def test_put_user_not_exists(notify_api, notify_db, notify_db_session, sample_us
|
||||
assert User.query.count() == 2
|
||||
user = User.query.filter_by(id=sample_user.id).first()
|
||||
json_resp = json.loads(resp.get_data(as_text=True))
|
||||
assert json_resp == {'error': 'No result found'}
|
||||
assert json_resp['result'] == "error"
|
||||
assert json_resp['message'] == "User not found"
|
||||
|
||||
assert user == sample_user
|
||||
assert user.email_address != new_email
|
||||
|
||||
|
||||
def test_get_user_services(notify_api, notify_db, notify_db_session, sample_service, sample_admin_service_id):
|
||||
"""
|
||||
Tests GET endpoint "/<user_id>/service/<service_id>" to retrieve services for a user.
|
||||
"""
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
user = User.query.get(sample_service.users[0].id)
|
||||
another_name = "another name"
|
||||
create_sample_service(
|
||||
notify_db,
|
||||
notify_db_session,
|
||||
service_name=another_name,
|
||||
user=user)
|
||||
assert Service.query.count() == 3
|
||||
auth_header = create_authorization_header(service_id=sample_admin_service_id,
|
||||
path=url_for('user.get_service_by_user_id', user_id=user.id),
|
||||
method='GET')
|
||||
resp = client.get(
|
||||
url_for('user.get_service_by_user_id', user_id=user.id),
|
||||
headers=[('Content-Type', 'application/json'), auth_header])
|
||||
assert resp.status_code == 200
|
||||
json_resp = json.loads(resp.get_data(as_text=True))
|
||||
assert len(json_resp['data']) == 2
|
||||
|
||||
|
||||
def test_get_user_service(notify_api, notify_db, notify_db_session, sample_service, sample_admin_service_id):
|
||||
"""
|
||||
Tests GET endpoint "/<user_id>/service/<service_id>" to retrieve a service for a user.
|
||||
"""
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
user = User.query.first()
|
||||
another_name = "another name"
|
||||
another_service = create_sample_service(
|
||||
notify_db,
|
||||
notify_db_session,
|
||||
service_name=another_name,
|
||||
user=user)
|
||||
assert Service.query.count() == 3
|
||||
auth_header = create_authorization_header(service_id=sample_admin_service_id,
|
||||
path=url_for('user.get_service_by_user_id', user_id=user.id,
|
||||
service_id=another_service.id),
|
||||
method='GET')
|
||||
resp = client.get(
|
||||
url_for('user.get_service_by_user_id', user_id=user.id, service_id=another_service.id),
|
||||
headers=[('Content-Type', 'application/json'), auth_header])
|
||||
assert resp.status_code == 200
|
||||
json_resp = json.loads(resp.get_data(as_text=True))
|
||||
assert json_resp['data']['name'] == another_name
|
||||
assert json_resp['data']['id'] == str(another_service.id)
|
||||
|
||||
|
||||
def test_get_user_service_user_not_exists(notify_api, sample_service, sample_admin_service_id):
|
||||
"""
|
||||
Tests GET endpoint "/<user_id>/service/<service_id>" 404 is returned for invalid user.
|
||||
"""
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
assert Service.query.count() == 2
|
||||
auth_header = create_authorization_header(service_id=sample_admin_service_id,
|
||||
path=url_for('user.get_service_by_user_id', user_id="123423",
|
||||
service_id=sample_service.id),
|
||||
method='GET')
|
||||
resp = client.get(
|
||||
url_for('user.get_service_by_user_id', user_id="123423", service_id=sample_service.id),
|
||||
headers=[('Content-Type', 'application/json'), auth_header])
|
||||
assert resp.status_code == 404
|
||||
json_resp = json.loads(resp.get_data(as_text=True))
|
||||
assert json_resp == {'error': 'No result found'}
|
||||
|
||||
|
||||
def test_get_user_service_service_not_exists(notify_api, sample_service):
|
||||
"""
|
||||
Tests GET endpoint "/<user_id>/service/<service_id>" 404 is returned for invalid service.
|
||||
"""
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
user = User.query.first()
|
||||
assert Service.query.count() == 1
|
||||
import uuid
|
||||
missing_service_id = uuid.uuid4()
|
||||
auth_header = create_authorization_header(path=url_for('user.get_service_by_user_id', user_id=user.id,
|
||||
service_id=missing_service_id),
|
||||
method='GET')
|
||||
resp = client.get(
|
||||
url_for('user.get_service_by_user_id', user_id=user.id, service_id=missing_service_id),
|
||||
headers=[('Content-Type', 'application/json'), auth_header])
|
||||
assert resp.status_code == 404
|
||||
json_resp = json.loads(resp.get_data(as_text=True))
|
||||
assert json_resp == {'error': 'No result found'}
|
||||
|
||||
|
||||
def test_delete_user(notify_api, sample_user, sample_admin_service_id):
|
||||
"""
|
||||
Tests DELETE endpoint '/<user_id>' delete user.
|
||||
"""
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
assert User.query.count() == 2
|
||||
auth_header = create_authorization_header(service_id=sample_admin_service_id,
|
||||
path=url_for('user.update_user', user_id=sample_user.id),
|
||||
method='DELETE')
|
||||
resp = client.delete(
|
||||
url_for('user.update_user', user_id=sample_user.id),
|
||||
headers=[('Content-Type', 'application/json'), auth_header])
|
||||
assert resp.status_code == 202
|
||||
json_resp = json.loads(resp.get_data(as_text=True))
|
||||
assert User.query.count() == 1
|
||||
expected = {
|
||||
"name": "Test User",
|
||||
"email_address": sample_user.email_address,
|
||||
"mobile_number": "+447700900986",
|
||||
"password_changed_at": None,
|
||||
"id": sample_user.id,
|
||||
"logged_in_at": None,
|
||||
"state": "active",
|
||||
"failed_login_count": 0,
|
||||
"permissions": []
|
||||
|
||||
}
|
||||
assert json_resp['data'] == expected
|
||||
|
||||
|
||||
def test_delete_user_not_exists(notify_api, notify_db, notify_db_session, sample_user, sample_admin_service_id):
|
||||
"""
|
||||
Tests DELETE endpoint '/<user_id>' delete user.
|
||||
"""
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
assert User.query.count() == 2
|
||||
auth_header = create_authorization_header(service_id=sample_admin_service_id,
|
||||
path=url_for('user.update_user', user_id='99999'),
|
||||
method='DELETE')
|
||||
resp = client.delete(
|
||||
url_for('user.update_user', user_id="99999"),
|
||||
headers=[('Content-Type', 'application/json'), auth_header])
|
||||
assert resp.status_code == 404
|
||||
assert User.query.count() == 2
|
||||
json_resp = json.loads(resp.get_data(as_text=True))
|
||||
assert json_resp == {'error': 'No result found'}
|
||||
|
||||
|
||||
def test_post_with_permissions(notify_api, notify_db, notify_db_session, sample_admin_service_id):
|
||||
"""
|
||||
Tests POST endpoint '/' to create a user with permissions.
|
||||
|
||||
@@ -406,7 +406,7 @@ def test_send_sms_code_returns_404_for_bad_input_data(notify_api, notify_db, not
|
||||
data=data,
|
||||
headers=[('Content-Type', 'application/json'), auth_header])
|
||||
assert resp.status_code == 404
|
||||
assert json.loads(resp.get_data(as_text=True))['error'] == 'No result found'
|
||||
assert json.loads(resp.get_data(as_text=True))['message'] == 'No user found'
|
||||
|
||||
|
||||
def test_send_user_email_code(notify_api,
|
||||
@@ -449,4 +449,4 @@ def test_send_user_email_code_returns_404_for_when_user_does_not_exist(notify_ap
|
||||
data=data,
|
||||
headers=[('Content-Type', 'application/json'), auth_header])
|
||||
assert resp.status_code == 404
|
||||
assert json.loads(resp.get_data(as_text=True))['error'] == 'No result found'
|
||||
assert json.loads(resp.get_data(as_text=True))['message'] == 'No user found'
|
||||
|
||||
Reference in New Issue
Block a user