Merge pull request #4 from alphagov/service_creation

This commit is contained in:
Adam Shimali
2016-01-12 17:34:15 +00:00
41 changed files with 1288 additions and 38 deletions

5
.gitignore vendored
View File

@@ -55,4 +55,7 @@ docs/_build/
# PyBuilder
target/
.idea/
.idea/
# Mac
*.DS_Store

View File

@@ -2,8 +2,12 @@ sudo: false
language: python
python:
- '3.4'
addons:
postgresql: '9.3'
install:
- pip install -r requirements_for_test.txt
before_script:
- psql -c 'create database test_notification_api;' -U postgres
script:
- ./scripts/run_tests.sh
env:

View File

@@ -2,11 +2,16 @@ import os
from flask._compat import string_types
from flask import Flask, _request_ctx_stack
from flask.ext.sqlalchemy import SQLAlchemy
from flask_marshmallow import Marshmallow
from werkzeug.local import LocalProxy
from config import configs
from utils import logging
db = SQLAlchemy()
ma = Marshmallow()
api_user = LocalProxy(lambda: _request_ctx_stack.top.api_user)
@@ -16,16 +21,24 @@ def create_app(config_name):
application.config['NOTIFY_API_ENVIRONMENT'] = config_name
application.config.from_object(configs[config_name])
db.init_app(application)
ma.init_app(application)
init_app(application)
logging.init_app(application)
from .main import main as main_blueprint
from .service import service as service_blueprint
from .user import user as user_blueprint
application.register_blueprint(main_blueprint)
application.register_blueprint(service_blueprint, url_prefix='/service')
application.register_blueprint(user_blueprint, url_prefix='/user')
from .status import status as status_blueprint
application.register_blueprint(status_blueprint)
from app import models
return application

6
app/dao/__init__.py Normal file
View File

@@ -0,0 +1,6 @@
from sqlalchemy.exc import SQLAlchemyError
# Should I use SQLAlchemyError?
class DAOException(SQLAlchemyError):
pass

47
app/dao/services_dao.py Normal file
View File

@@ -0,0 +1,47 @@
import json
from datetime import datetime
from sqlalchemy.orm import load_only
from . import DAOException
from app import db
from app.models import (Service, User)
def save_model_service(service, update_dict=None):
users_list = update_dict.get('users', []) if update_dict else getattr(service, 'users', [])
if not users_list:
error_msg = {'users': ['Missing data for required attribute']}
raise DAOException(json.dumps(error_msg))
if update_dict:
# Make sure the update_dict doesn't contain conflicting
update_dict.pop('id', None)
update_dict.pop('users', None)
# TODO optimize this algorithm
new_users = User.query.filter(User.id.in_(users_list)).all()
for x in service.users:
if x in new_users:
new_users.remove(x)
else:
service.users.remove(x)
for x in new_users:
service.users.append(x)
Service.query.filter_by(id=service.id).update(update_dict)
else:
db.session.add(service)
db.session.commit()
def delete_model_service(service):
db.session.delete(service)
db.session.commit()
def get_model_services(service_id=None, user_id=None):
# TODO need better mapping from function params to sql query.
if user_id and service_id:
return Service.query.filter(
Service.users.any(id=user_id)).filter_by(id=service_id).one()
elif service_id:
return Service.query.filter_by(id=service_id).one()
elif user_id:
return Service.query.filter(Service.users.any(id=user_id)).all()
return Service.query.all()

26
app/dao/users_dao.py Normal file
View File

@@ -0,0 +1,26 @@
from datetime import datetime
from . import DAOException
from sqlalchemy.orm import load_only
from app import db
from app.models import User
def save_model_user(usr, update_dict={}):
if update_dict:
del update_dict['id']
db.session.query(User).filter_by(id=usr.id).update(update_dict)
else:
db.session.add(usr)
db.session.commit()
def delete_model_user(user):
db.session.delete(user)
db.session.commit()
def get_model_users(user_id=None):
if user_id:
return User.query.filter_by(id=user_id).one()
return User.query.filter_by().all()

View File

@@ -1,15 +1,5 @@
from flask import Blueprint
from app.main.authentication.auth import requires_auth
AUTHORIZATION_HEADER = 'Authorization'
AUTHORIZATION_SCHEME = 'Bearer'
WINDOW = 1
main = Blueprint('main', __name__)
main.before_request(requires_auth)
from app.main.views import notifications, index
from app.main import errors
from app.main.views import index

View File

@@ -1,7 +0,0 @@
from flask import jsonify
from .. import main
@main.route('/', methods=['GET'])
def get_index():
return jsonify(result="hello world"), 200

View File

@@ -1,12 +1,2 @@
from flask import jsonify
from .. import main
@main.route('/', methods=['GET'])
def get_index():
return jsonify(result="hello world"), 200
@main.route('/', methods=['POST'])
def post_index():
return jsonify(result="hello world"), 200

View File

@@ -1,7 +0,0 @@
from flask import jsonify
from .. import main
@main.route('/notification', methods=['POST'])
def create_notification():
return jsonify(result="created"), 201

61
app/models.py Normal file
View File

@@ -0,0 +1,61 @@
from . import db
import datetime
def filter_null_value_fields(obj):
return dict(
filter(lambda x: x[1] is not None, obj.items())
)
class User(db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
email_address = db.Column(db.String(255), nullable=False, index=True, unique=True)
created_at = db.Column(
db.DateTime,
index=False,
unique=False,
nullable=False,
default=datetime.datetime.now)
updated_at = db.Column(
db.DateTime,
index=False,
unique=False,
nullable=True,
onupdate=datetime.datetime.now)
user_to_service = db.Table(
'user_to_service',
db.Model.metadata,
db.Column('user_id', db.Integer, db.ForeignKey('users.id')),
db.Column('service_id', db.Integer, db.ForeignKey('services.id'))
)
class Service(db.Model):
__tablename__ = 'services'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(255), nullable=False)
created_at = db.Column(
db.DateTime,
index=False,
unique=False,
nullable=False,
default=datetime.datetime.now)
updated_at = db.Column(
db.DateTime,
index=False,
unique=False,
nullable=True,
onupdate=datetime.datetime.now)
active = db.Column(db.Boolean, index=False, unique=False, nullable=False)
limit = db.Column(db.BigInteger, index=False, unique=False, nullable=False)
users = db.relationship(
'User',
secondary=user_to_service,
backref=db.backref('user_to_service', lazy='dynamic'))
restricted = db.Column(db.Boolean, index=False, unique=False, nullable=False)

28
app/schemas.py Normal file
View File

@@ -0,0 +1,28 @@
from . import ma
from . import models
from marshmallow import post_load
# TODO I think marshmallow provides a better integration and error handling.
# Would be better to replace functionality in dao with the marshmallow supported
# functionality.
# http://marshmallow.readthedocs.org/en/latest/api_reference.html
class UserSchema(ma.ModelSchema):
class Meta:
model = models.User
exclude = ("updated_at", "created_at", "user_to_service")
# TODO process users list, to return a list of user.id
# Should that list be restricted by the auth parsed??
class ServiceSchema(ma.ModelSchema):
class Meta:
model = models.Service
exclude = ("updated_at", "created_at")
user_schema = UserSchema()
users_schema = UserSchema(many=True)
service_schema = ServiceSchema()
services_schema = ServiceSchema(many=True)

5
app/service/__init__.py Normal file
View File

@@ -0,0 +1,5 @@
from flask import Blueprint
service = Blueprint('service', __name__)
from app.service.views import rest

View File

69
app/service/views/rest.py Normal file
View File

@@ -0,0 +1,69 @@
from flask import (jsonify, request)
from sqlalchemy.exc import DataError
from sqlalchemy.orm.exc import NoResultFound
from app.dao.services_dao import (
save_model_service, get_model_services, delete_model_service)
from app.dao.users_dao import get_model_users
from app.dao import DAOException
from .. import service
from app import db
from app.schemas import (services_schema, service_schema)
# TODO auth to be added.
@service.route('/', methods=['POST'])
def create_service():
# TODO what exceptions get passed from schema parsing?
service, errors = service_schema.load(request.get_json())
if errors:
return jsonify(result="error", message=errors), 400
# I believe service is already added to the session but just needs a
# db.session.commit
try:
save_model_service(service)
except DAOException as e:
return jsonify(result="error", message=str(e)), 400
return jsonify(data=service_schema.dump(service).data), 201
# TODO auth to be added
@service.route('/<int:service_id>', methods=['PUT', 'DELETE'])
def update_service(service_id):
try:
service = get_model_services(service_id=service_id)
except DataError:
return jsonify(result="error", message="Invalid service id"), 400
except NoResultFound:
return jsonify(result="error", message="Service not found"), 404
if request.method == 'DELETE':
status_code = 202
delete_model_service(service)
else:
status_code = 200
# TODO there has got to be a better way to do the next three lines
upd_serv, errors = service_schema.load(request.get_json())
if errors:
return jsonify(result="error", message=errors), 400
update_dict, errors = service_schema.dump(upd_serv)
# TODO FIX ME
# Remove update_service model which is added to db.session
db.session.rollback()
try:
save_model_service(service, update_dict=update_dict)
except DAOException as e:
return jsonify(result="error", message=str(e)), 400
return jsonify(data=service_schema.dump(service).data), status_code
# TODO auth to be added.
@service.route('/<int:service_id>', methods=['GET'])
@service.route('/', methods=['GET'])
def get_service(service_id=None):
try:
services = get_model_services(service_id=service_id)
except DataError:
return jsonify(result="error", message="Invalid service id"), 400
except NoResultFound:
return jsonify(result="error", message="Service not found"), 404
data, errors = services_schema.dump(services) if isinstance(services, list) else service_schema.dump(services)
return jsonify(data=data)

5
app/user/__init__.py Normal file
View File

@@ -0,0 +1,5 @@
from flask import Blueprint
user = Blueprint('user', __name__)
from app.user.views import rest

View File

81
app/user/views/rest.py Normal file
View File

@@ -0,0 +1,81 @@
from flask import (jsonify, request)
from sqlalchemy.exc import DataError
from sqlalchemy.orm.exc import NoResultFound
from app.dao.services_dao import get_model_services
from app.dao.users_dao import (
get_model_users, save_model_user, delete_model_user)
from app.schemas import (
user_schema, users_schema, service_schema, services_schema)
from .. import user
from app import db
# TODO auth to be added
@user.route('/', methods=['POST'])
def create_user():
user, errors = user_schema.load(request.get_json())
if errors:
return jsonify(result="error", message=errors), 400
save_model_user(user)
return jsonify(data=user_schema.dump(user).data), 201
# TODO auth to be added
@user.route('/<int:user_id>', methods=['PUT', 'DELETE'])
def update_user(user_id):
try:
user = get_model_users(user_id=user_id)
except DataError:
return jsonify(result="error", message="Invalid user id"), 400
except NoResultFound:
return jsonify(result="error", message="User not found"), 404
if request.method == 'DELETE':
status_code = 202
delete_model_user(user)
else:
status_code = 200
# TODO there has got to be a better way to do the next three lines
update_user, errors = user_schema.load(request.get_json())
if errors:
return jsonify(result="error", message=errors), 400
update_dict, errors = user_schema.dump(update_user)
# TODO FIX ME
# Remove update_service model which is added to db.session
db.session.rollback()
save_model_user(user, update_dict=update_dict)
return jsonify(data=user_schema.dump(user).data), status_code
# TODO auth to be added.
@user.route('/<int:user_id>', methods=['GET'])
@user.route('/', methods=['GET'])
def get_user(user_id=None):
try:
users = get_model_users(user_id=user_id)
except DataError:
return jsonify(result="error", message="Invalid user id"), 400
except NoResultFound:
return jsonify(result="error", message="User not found"), 404
result = users_schema.dump(users) if isinstance(users, list) else user_schema.dump(users)
return jsonify(data=result.data)
# TODO auth to be added
@user.route('/<int:user_id>/service', methods=['GET'])
@user.route('/<int:user_id>/service/<int:service_id>', methods=['GET'])
def get_service_by_user_id(user_id, service_id=None):
try:
user = get_model_users(user_id=user_id)
except DataError:
return jsonify(result="error", message="Invalid user id"), 400
except NoResultFound:
return jsonify(result="error", message="User not found"), 404
try:
services = get_model_services(user_id=user.id, service_id=service_id)
except DataError:
return jsonify(result="error", message="Invalid service id"), 400
except NoResultFound:
return jsonify(result="error", message="Service not found"), 404
services, errors = services_schema.dump(services) if isinstance(services, list) else service_schema.dump(services)
return jsonify(data=services)

View File

@@ -5,14 +5,18 @@ from __future__ import print_function
import os
from flask.ext.script import Manager, Server
from flask.ext.migrate import Migrate, MigrateCommand
from app import create_app
from app import create_app, db
application = create_app(os.getenv('NOTIFY_API_ENVIRONMENT') or 'development')
manager = Manager(application)
port = int(os.environ.get('PORT', 6011))
manager.add_command("runserver", Server(host='0.0.0.0', port=port))
migrate = Migrate(application, db)
manager.add_command('db', MigrateCommand)
@manager.command
def list_routes():

View File

@@ -4,6 +4,9 @@ class Config(object):
NOTIFY_LOG_LEVEL = 'DEBUG'
NOTIFY_APP_NAME = 'api'
NOTIFY_LOG_PATH = '/var/log/notify/application.log'
SQLALCHEMY_COMMIT_ON_TEARDOWN = False
SQLALCHEMY_RECORD_QUERIES = True
SQLALCHEMY_DATABASE_URI = 'postgresql://localhost/notification_api'
class Development(Config):
@@ -12,11 +15,13 @@ class Development(Config):
class Test(Config):
DEBUG = True
SQLALCHEMY_DATABASE_URI = 'postgresql://localhost/test_notification_api'
class Live(Config):
pass
configs = {
'development': Development,
'test': Test,

1
migrations/README Normal file
View File

@@ -0,0 +1 @@
Generic single-database configuration.

45
migrations/alembic.ini Normal file
View File

@@ -0,0 +1,45 @@
# A generic, single database configuration.
[alembic]
# template used to generate migration files
# file_template = %%(rev)s_%%(slug)s
# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false
# Logging configuration
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers =
qualname =
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

73
migrations/env.py Normal file
View File

@@ -0,0 +1,73 @@
from __future__ import with_statement
from alembic import context
from sqlalchemy import engine_from_config, pool
from logging.config import fileConfig
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
fileConfig(config.config_file_name)
# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
from flask import current_app
config.set_main_option('sqlalchemy.url', current_app.config.get('SQLALCHEMY_DATABASE_URI'))
target_metadata = current_app.extensions['migrate'].db.metadata
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
def run_migrations_offline():
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(url=url)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online():
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
engine = engine_from_config(
config.get_section(config.config_ini_section),
prefix='sqlalchemy.',
poolclass=pool.NullPool)
connection = engine.connect()
context.configure(
connection=connection,
target_metadata=target_metadata
)
try:
with context.begin_transaction():
context.run_migrations()
finally:
connection.close()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()

22
migrations/script.py.mako Normal file
View File

@@ -0,0 +1,22 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision}
Create Date: ${create_date}
"""
# revision identifiers, used by Alembic.
revision = ${repr(up_revision)}
down_revision = ${repr(down_revision)}
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
def upgrade():
${upgrades if upgrades else "pass"}
def downgrade():
${downgrades if downgrades else "pass"}

View File

@@ -0,0 +1,52 @@
"""empty message
Revision ID: 0001_initialise_data
Revises: None
Create Date: 2016-01-12 09:33:29.249042
"""
# revision identifiers, used by Alembic.
revision = '0001_initialise_data'
down_revision = None
from alembic import op
import sqlalchemy as sa
def upgrade():
### commands auto generated by Alembic - please adjust! ###
op.create_table('services',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('name', sa.String(length=255), nullable=False),
sa.Column('created_at', sa.DateTime(), nullable=False),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.Column('active', sa.Boolean(), nullable=False),
sa.Column('limit', sa.BigInteger(), nullable=False),
sa.Column('restricted', sa.Boolean(), nullable=False),
sa.PrimaryKeyConstraint('id')
)
op.create_table('users',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('email_address', sa.String(length=255), nullable=False),
sa.Column('created_at', sa.DateTime(), nullable=False),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_users_email_address'), 'users', ['email_address'], unique=True)
op.create_table('user_to_service',
sa.Column('user_id', sa.Integer(), nullable=True),
sa.Column('service_id', sa.Integer(), nullable=True),
sa.ForeignKeyConstraint(['service_id'], ['services.id'], ),
sa.ForeignKeyConstraint(['user_id'], ['users.id'], )
)
### end Alembic commands ###
def downgrade():
### commands auto generated by Alembic - please adjust! ###
op.drop_table('user_to_service')
op.drop_index(op.f('ix_users_email_address'), table_name='users')
op.drop_table('users')
op.drop_table('services')
### end Alembic commands ###

View File

@@ -1,6 +1,14 @@
Flask==0.10.1
Flask-Script==2.0.5
Flask-Migrate==1.3.1
Flask-SQLAlchemy==2.0
psycopg2==2.6.1
SQLAlchemy==1.0.5
SQLAlchemy-Utils==0.30.5
PyJWT==1.4.0
marshmallow==2.4.2
marshmallow-sqlalchemy==0.8.0
flask-marshmallow==0.6.2
git+https://github.com/alphagov/notifications-python-client.git@0.1.5#egg=notifications-python-client==0.1.5

View File

@@ -28,3 +28,10 @@ fi
# Install Python development dependencies
pip3 install -r requirements_for_test.txt
# Create Postgres databases
createdb notification_api
createdb test_notification_api
# Upgrade databases
python application.py db upgrade

View File

@@ -29,5 +29,6 @@ display_result $? 1 "Code style check"
#py.test --cov=app tests/
#display_result $? 2 "Code coverage"
py.test -v
display_result $? 3 "Unit tests"

31
tests/app/conftest.py Normal file
View File

@@ -0,0 +1,31 @@
import pytest
from app.models import (User, Service)
from app.dao.users_dao import (save_model_user, get_model_users)
from app.dao.services_dao import save_model_service
@pytest.fixture(scope='function')
def sample_user(notify_db,
notify_db_session,
email="notify@digital.cabinet-office.gov.uk"):
user = User(**{'email_address': email})
save_model_user(user)
return user
@pytest.fixture(scope='function')
def sample_service(notify_db,
notify_db_session,
service_name="Sample service",
user=None):
if user is None:
user = sample_user(notify_db, notify_db_session)
data = {
'name': service_name,
'users': [user],
'limit': 1000,
'active': False,
'restricted': False}
service = Service(**data)
save_model_service(service)
return service

View File

View File

@@ -0,0 +1,70 @@
import pytest
from app.dao.services_dao import (
save_model_service, get_model_services, DAOException, delete_model_service)
from tests.app.conftest import sample_service as create_sample_service
from app.models import Service
def test_create_service(notify_api, notify_db, notify_db_session, sample_user):
assert Service.query.count() == 0
service_name = 'Sample Service'
data = {
'name': service_name,
'users': [sample_user],
'limit': 1000,
'active': False,
'restricted': False}
service = Service(**data)
save_model_service(service)
assert Service.query.count() == 1
assert Service.query.first().name == service_name
assert Service.query.first().id == service.id
def test_get_services(notify_api, notify_db, notify_db_session, sample_user):
sample_service = create_sample_service(notify_db,
notify_db_session,
user=sample_user)
assert Service.query.count() == 1
assert len(get_model_services()) == 1
service_name = "Another service"
sample_service = create_sample_service(notify_db,
notify_db_session,
service_name=service_name,
user=sample_user)
assert Service.query.count() == 2
assert len(get_model_services()) == 2
def test_get_user_service(notify_api, notify_db, notify_db_session, sample_user):
assert Service.query.count() == 0
service_name = "Random service"
sample_service = create_sample_service(notify_db,
notify_db_session,
service_name=service_name,
user=sample_user)
assert get_model_services(service_id=sample_service.id).name == service_name
assert Service.query.count() == 1
def test_missing_user_attribute(notify_api, notify_db, notify_db_session):
assert Service.query.count() == 0
try:
service_name = 'Sample Service'
data = {
'name': service_name,
'limit': 1000,
'active': False,
'restricted': False}
service = Service(**data)
save_model_service(service)
pytest.fail("DAOException not thrown")
except DAOException as e:
assert "Missing data for required attribute" in str(e)
def test_delete_service(notify_api, notify_db, notify_db_session, sample_service):
assert Service.query.count() == 1
delete_model_service(sample_service)
assert Service.query.count() == 0

View File

@@ -0,0 +1,56 @@
from sqlalchemy.exc import DataError
from sqlalchemy.orm.exc import NoResultFound
from app.dao.users_dao import (
save_model_user, get_model_users, delete_model_user)
from tests.app.conftest import sample_user as create_sample_user
from app.models import User
def test_create_user(notify_api, notify_db, notify_db_session):
email = 'notify@digital.cabinet-office.gov.uk'
user = User(**{'email_address': email})
save_model_user(user)
assert User.query.count() == 1
assert User.query.first().email_address == email
assert User.query.first().id == user.id
def test_get_all_users(notify_api, notify_db, notify_db_session, sample_user):
assert User.query.count() == 1
assert len(get_model_users()) == 1
email = "another.notify@digital.cabinet-office.gov.uk"
another_user = create_sample_user(notify_db,
notify_db_session,
email=email)
assert User.query.count() == 2
assert len(get_model_users()) == 2
def test_get_user(notify_api, notify_db, notify_db_session):
email = "another.notify@digital.cabinet-office.gov.uk"
another_user = create_sample_user(notify_db,
notify_db_session,
email=email)
assert get_model_users(user_id=another_user.id).email_address == email
def test_get_user_not_exists(notify_api, notify_db, notify_db_session):
try:
get_model_users(user_id="12345")
pytest.fail("NoResultFound exception not thrown.")
except:
pass
def test_get_user_invalid_id(notify_api, notify_db, notify_db_session):
try:
get_model_users(user_id="blah")
pytest.fail("DataError exception not thrown.")
except DataError:
pass
def test_delete_users(notify_api, notify_db, notify_db_session, sample_user):
assert User.query.count() == 1
delete_model_user(sample_user)
assert User.query.count() == 0

View File

View File

@@ -1,7 +1,9 @@
import pytest
from flask import json
from client.authentication import create_jwt_token
@pytest.mark.xfail(reason="Authentication to be added.")
def test_should_not_allow_request_with_no_token(notify_api):
response = notify_api.test_client().get("/")
assert response.status_code == 401
@@ -9,6 +11,7 @@ def test_should_not_allow_request_with_no_token(notify_api):
assert data['error'] == 'Unauthorized, authentication token must be provided'
@pytest.mark.xfail(reason="Authentication to be added.")
def test_should_not_allow_request_with_incorrect_header(notify_api):
response = notify_api.test_client().get(
"/",
@@ -21,6 +24,7 @@ def test_should_not_allow_request_with_incorrect_header(notify_api):
assert data['error'] == 'Unauthorized, authentication bearer scheme must be used'
@pytest.mark.xfail(reason="Authentication to be added.")
def test_should_not_allow_request_with_incorrect_token(notify_api):
response = notify_api.test_client().get(
"/",
@@ -33,6 +37,7 @@ def test_should_not_allow_request_with_incorrect_token(notify_api):
assert data['error'] == 'Invalid token: signature'
@pytest.mark.xfail(reason="Authentication to be added.")
def test_should_not_allow_incorrect_path(notify_api):
token = create_jwt_token(request_method="GET", request_path="/bad", secret="secret", client_id="client_id")
response = notify_api.test_client().get(
@@ -46,6 +51,7 @@ def test_should_not_allow_incorrect_path(notify_api):
assert data['error'] == 'Invalid token: request'
@pytest.mark.xfail(reason="Authentication to be added.")
def test_should_not_allow_incorrect_method(notify_api):
token = create_jwt_token(request_method="POST", request_path="/", secret="secret", client_id="client_id")
response = notify_api.test_client().get(
@@ -59,6 +65,7 @@ def test_should_not_allow_incorrect_method(notify_api):
assert data['error'] == 'Invalid token: request'
@pytest.mark.xfail(reason="Authentication to be added.")
def test_should_not_allow_invalid_secret(notify_api):
token = create_jwt_token(request_method="POST", request_path="/", secret="not-so-secret", client_id="client_id")
response = notify_api.test_client().get(
@@ -72,6 +79,7 @@ def test_should_not_allow_invalid_secret(notify_api):
assert data['error'] == 'Invalid token: signature'
@pytest.mark.xfail(reason="Authentication to be added.")
def test_should_allow_valid_token(notify_api):
token = create_jwt_token(request_method="GET", request_path="/", secret="secret", client_id="client_id")
response = notify_api.test_client().get(
@@ -83,6 +91,7 @@ def test_should_allow_valid_token(notify_api):
assert response.status_code == 200
@pytest.mark.xfail(reason="Authentication to be added.")
def test_should_allow_valid_token_with_post_body(notify_api):
json_body = json.dumps({
"key1": "value1",
@@ -106,6 +115,7 @@ def test_should_allow_valid_token_with_post_body(notify_api):
assert response.status_code == 200
@pytest.mark.xfail(reason="Authentication to be added.")
def test_should_not_allow_valid_token_with_invalid_post_body(notify_api):
json_body = json.dumps({
"key1": "value1",

View File

View File

View File

@@ -0,0 +1,263 @@
import json
from app.models import (Service, User)
from app.dao.services_dao import save_model_service
from tests.app.conftest import sample_user as create_sample_user
from flask import url_for
def test_get_service_list(notify_api, notify_db, notify_db_session, sample_service):
"""
Tests GET endpoint '/' to retrieve entire service list.
"""
with notify_api.test_request_context():
with notify_api.test_client() as client:
response = client.get(url_for('service.get_service'))
assert response.status_code == 200
json_resp = json.loads(response.get_data(as_text=True))
# TODO assert correct json returned
assert len(json_resp) == 1
assert json_resp['data'][0]['name'] == sample_service.name
assert json_resp['data'][0]['id'] == sample_service.id
def test_get_service(notify_api, notify_db, notify_db_session, sample_service):
"""
Tests GET endpoint '/<service_id>' to retrieve a single service.
"""
with notify_api.test_request_context():
with notify_api.test_client() as client:
resp = client.get(url_for('service.get_service',
service_id=sample_service.id))
assert resp.status_code == 200
json_resp = json.loads(resp.get_data(as_text=True))
assert json_resp['data']['name'] == sample_service.name
assert json_resp['data']['id'] == sample_service.id
def test_post_service(notify_api, notify_db, notify_db_session, sample_user):
"""
Tests POST endpoint '/' to create a service.
"""
with notify_api.test_request_context():
with notify_api.test_client() as client:
assert Service.query.count() == 0
data = {
'name': 'created service',
'users': [sample_user.id],
'limit': 1000,
'restricted': False,
'active': False}
headers = [('Content-Type', 'application/json')]
resp = client.post(
url_for('service.create_service'),
data=json.dumps(data),
headers=headers)
assert resp.status_code == 201
service = Service.query.first()
json_resp = json.loads(resp.get_data(as_text=True))
assert json_resp['data']['name'] == service.name
assert json_resp['data']['limit'] == service.limit
def test_post_service_multiple_users(notify_api, notify_db, notify_db_session, sample_user):
"""
Tests POST endpoint '/' to create a service with multiple users.
"""
with notify_api.test_request_context():
with notify_api.test_client() as client:
another_user = create_sample_user(
notify_db,
notify_db_session,
"new@digital.cabinet-office.gov.uk")
assert Service.query.count() == 0
data = {
'name': 'created service',
'users': [sample_user.id, another_user.id],
'limit': 1000,
'restricted': False,
'active': False}
headers = [('Content-Type', 'application/json')]
resp = client.post(
url_for('service.create_service'),
data=json.dumps(data),
headers=headers)
assert resp.status_code == 201
service = Service.query.first()
json_resp = json.loads(resp.get_data(as_text=True))
assert json_resp['data']['name'] == service.name
assert json_resp['data']['limit'] == service.limit
assert len(service.users) == 2
def test_post_service_without_users_attribute(notify_api, notify_db, notify_db_session):
"""
Tests POST endpoint '/' to create a service without 'users' attribute.
"""
with notify_api.test_request_context():
with notify_api.test_client() as client:
assert Service.query.count() == 0
data = {
'name': 'created service',
'limit': 1000,
'restricted': False,
'active': False}
headers = [('Content-Type', 'application/json')]
resp = client.post(
url_for('service.create_service'),
data=json.dumps(data),
headers=headers)
assert resp.status_code == 400
assert Service.query.count() == 0
json_resp = json.loads(resp.get_data(as_text=True))
assert json_resp['message'] == '{"users": ["Missing data for required attribute"]}'
def test_put_service(notify_api, notify_db, notify_db_session, sample_service):
"""
Tests PUT endpoint '/<service_id>' to edit a service.
"""
with notify_api.test_request_context():
with notify_api.test_client() as client:
assert Service.query.count() == 1
sample_user = User.query.first()
old_service = Service.query.first()
new_name = 'updated service'
data = {
'name': new_name,
'users': [sample_user.id],
'limit': 1000,
'restricted': False,
'active': False}
headers = [('Content-Type', 'application/json')]
resp = client.put(
url_for('service.update_service', service_id=old_service.id),
data=json.dumps(data),
headers=headers)
assert Service.query.count() == 1
assert resp.status_code == 200
updated_service = Service.query.first()
json_resp = json.loads(resp.get_data(as_text=True))
assert json_resp['data']['name'] == updated_service.name
assert json_resp['data']['limit'] == updated_service.limit
assert updated_service.name == new_name
def test_put_service_not_exists(notify_api, notify_db, notify_db_session, sample_service):
"""
Tests PUT endpoint '/<service_id>' service doesn't exist.
"""
with notify_api.test_request_context():
with notify_api.test_client() as client:
sample_user = sample_service.users[0]
new_name = 'updated service'
data = {
'name': new_name,
'users': [sample_user.id],
'limit': 1000,
'restricted': False,
'active': False}
resp = client.put(
url_for('service.update_service', service_id="123"),
data=data,
headers=[('Content-Type', 'application/json')])
assert resp.status_code == 404
assert Service.query.first().name == sample_service.name
assert Service.query.first().name != new_name
def test_put_service_add_user(notify_api, notify_db, notify_db_session, sample_service):
"""
Tests PUT endpoint '/<service_id>' add user to the service.
"""
with notify_api.test_request_context():
with notify_api.test_client() as client:
assert Service.query.count() == 1
another_user = create_sample_user(
notify_db,
notify_db_session,
"new@digital.cabinet-office.gov.uk")
sample_user = User.query.first()
old_service = Service.query.first()
new_name = 'updated service'
data = {
'name': new_name,
'users': [sample_user.id, another_user.id],
'limit': 1000,
'restricted': False,
'active': False}
headers = [('Content-Type', 'application/json')]
resp = client.put(
url_for('service.update_service', service_id=old_service.id),
data=json.dumps(data),
headers=headers)
assert Service.query.count() == 1
assert resp.status_code == 200
updated_service = Service.query.first()
json_resp = json.loads(resp.get_data(as_text=True))
assert len(json_resp['data']['users']) == 2
assert sample_user.id in json_resp['data']['users']
assert another_user.id in json_resp['data']['users']
def test_put_service_remove_user(notify_api, notify_db, notify_db_session, sample_service):
"""
Tests PUT endpoint '/<service_id>' add user to the service.
"""
with notify_api.test_request_context():
with notify_api.test_client() as client:
sample_user = User.query.first()
another_user = create_sample_user(
notify_db,
notify_db_session,
"new@digital.cabinet-office.gov.uk")
data = {
'name': sample_service.name,
'users': [sample_user.id, another_user.id],
'limit': sample_service.limit,
'restricted': sample_service.restricted,
'active': sample_service.active}
save_model_service(sample_service, update_dict=data)
assert Service.query.count() == 1
sample_user = User.query.first()
data['users'] = [another_user.id]
headers = [('Content-Type', 'application/json')]
resp = client.put(
url_for('service.update_service', service_id=sample_service.id),
data=json.dumps(data),
headers=headers)
assert Service.query.count() == 1
assert resp.status_code == 200
updated_service = Service.query.first()
json_resp = json.loads(resp.get_data(as_text=True))
assert len(json_resp['data']['users']) == 1
assert sample_user.id not in json_resp['data']['users']
assert another_user.id in json_resp['data']['users']
def test_delete_service(notify_api, notify_db, notify_db_session, sample_service):
"""
Tests DELETE endpoint '/<service_id>' delete service.
"""
with notify_api.test_request_context():
with notify_api.test_client() as client:
service = Service.query.first()
resp = client.delete(
url_for('service.update_service', service_id=service.id),
headers=[('Content-Type', 'application/json')])
assert resp.status_code == 202
json_resp = json.loads(resp.get_data(as_text=True))
json_resp['data']['name'] == sample_service.name
assert Service.query.count() == 0
def test_delete_service_not_exists(notify_api, notify_db, notify_db_session, sample_service):
"""
Tests DELETE endpoint '/<service_id>' delete service doesn't exist.
"""
with notify_api.test_request_context():
with notify_api.test_client() as client:
resp = client.delete(
url_for('service.update_service', service_id="123"),
headers=[('Content-Type', 'application/json')])
assert resp.status_code == 404
assert Service.query.count() == 1

View File

View File

View File

@@ -0,0 +1,248 @@
import json
from app.models import (User, Service)
from tests.app.conftest import sample_service as create_sample_service
from flask import url_for
def test_get_user_list(notify_api, notify_db, notify_db_session, sample_user):
"""
Tests GET endpoint '/' to retrieve entire user list.
"""
with notify_api.test_request_context():
with notify_api.test_client() as client:
response = client.get(url_for('user.get_user'))
assert response.status_code == 200
json_resp = json.loads(response.get_data(as_text=True))
# TODO assert correct json returned
assert len(json_resp['data']) == 1
assert json_resp['data'][0]['email_address'] == sample_user.email_address
assert json_resp['data'][0]['id'] == sample_user.id
def test_get_user(notify_api, notify_db, notify_db_session, sample_user):
"""
Tests GET endpoint '/<user_id>' to retrieve a single service.
"""
with notify_api.test_request_context():
with notify_api.test_client() as client:
resp = client.get(url_for('user.get_user',
user_id=sample_user.id))
assert resp.status_code == 200
json_resp = json.loads(resp.get_data(as_text=True))
assert json_resp['data']['email_address'] == sample_user.email_address
assert json_resp['data']['id'] == sample_user.id
def test_post_user(notify_api, notify_db, notify_db_session):
"""
Tests POST endpoint '/' to create a user.
"""
with notify_api.test_request_context():
with notify_api.test_client() as client:
assert User.query.count() == 0
data = {
'email_address': 'user@digital.cabinet-office.gov.uk'}
headers = [('Content-Type', 'application/json')]
resp = client.post(
url_for('user.create_user'),
data=json.dumps(data),
headers=headers)
assert resp.status_code == 201
user = User.query.first()
json_resp = json.loads(resp.get_data(as_text=True))
assert json_resp['data']['email_address'] == user.email_address
assert json_resp['data']['id'] == user.id
def test_post_user_missing_attribute_email(notify_api, notify_db, notify_db_session):
"""
Tests POST endpoint '/' missing attribute email.
"""
with notify_api.test_request_context():
with notify_api.test_client() as client:
assert User.query.count() == 0
data = {
'blah': 'blah.blah'}
headers = [('Content-Type', 'application/json')]
resp = client.post(
url_for('user.create_user'),
data=json.dumps(data),
headers=headers)
assert resp.status_code == 400
assert User.query.count() == 0
json_resp = json.loads(resp.get_data(as_text=True))
assert {'email_address': ['Missing data for required field.']} == json_resp['message']
def test_put_user(notify_api, notify_db, notify_db_session, sample_user):
"""
Tests PUT endpoint '/' to update a user.
"""
with notify_api.test_request_context():
with notify_api.test_client() as client:
assert User.query.count() == 1
new_email = 'new@digital.cabinet-office.gov.uk'
data = {
'email_address': new_email}
headers = [('Content-Type', 'application/json')]
resp = client.put(
url_for('user.update_user', user_id=sample_user.id),
data=json.dumps(data),
headers=headers)
assert resp.status_code == 200
assert User.query.count() == 1
user = User.query.first()
json_resp = json.loads(resp.get_data(as_text=True))
assert json_resp['data']['email_address'] == new_email
assert json_resp['data']['id'] == user.id
def test_put_user_not_exists(notify_api, notify_db, notify_db_session, sample_user):
"""
Tests PUT endpoint '/' to update a user doesn't exist.
"""
with notify_api.test_request_context():
with notify_api.test_client() as client:
assert User.query.count() == 1
new_email = 'new@digital.cabinet-office.gov.uk'
data = {
'email_address': new_email}
headers = [('Content-Type', 'application/json')]
resp = client.put(
url_for('user.update_user', user_id="123"),
data=json.dumps(data),
headers=headers)
assert resp.status_code == 404
assert User.query.count() == 1
user = User.query.first()
json_resp = json.loads(resp.get_data(as_text=True))
assert user.email_address != new_email
def test_put_user_missing_email(notify_api, notify_db, notify_db_session, sample_user):
"""
Tests PUT endpoint '/' missing attribute email.
"""
with notify_api.test_request_context():
with notify_api.test_client() as client:
assert User.query.count() == 1
new_email = 'new@digital.cabinet-office.gov.uk'
data = {
'blah': new_email}
headers = [('Content-Type', 'application/json')]
resp = client.put(
url_for('user.update_user', user_id=sample_user.id),
data=json.dumps(data),
headers=headers)
assert resp.status_code == 400
assert User.query.count() == 1
user = User.query.first()
json_resp = json.loads(resp.get_data(as_text=True))
assert user.email_address == sample_user.email_address
assert {'email_address': ['Missing data for required field.']} == json_resp['message']
def test_get_user_services(notify_api, notify_db, notify_db_session, sample_service):
"""
Tests GET endpoint "/<user_id>/service/<service_id>" to retrieve services for a user.
"""
with notify_api.test_request_context():
with notify_api.test_client() as client:
user = User.query.first()
another_name = "another name"
another_service = create_sample_service(
notify_db,
notify_db_session,
service_name=another_name,
user=user)
assert Service.query.count() == 2
resp = client.get(
url_for('user.get_service_by_user_id', user_id=user.id),
headers=[('Content-Type', 'application/json')])
assert resp.status_code == 200
json_resp = json.loads(resp.get_data(as_text=True))
assert len(json_resp['data']) == 2
def test_get_user_service(notify_api, notify_db, notify_db_session, sample_service):
"""
Tests GET endpoint "/<user_id>/service/<service_id>" to retrieve a service for a user.
"""
with notify_api.test_request_context():
with notify_api.test_client() as client:
user = User.query.first()
another_name = "another name"
another_service = create_sample_service(
notify_db,
notify_db_session,
service_name=another_name,
user=user)
assert Service.query.count() == 2
resp = client.get(
url_for('user.get_service_by_user_id', user_id=user.id, service_id=another_service.id),
headers=[('Content-Type', 'application/json')])
assert resp.status_code == 200
json_resp = json.loads(resp.get_data(as_text=True))
assert json_resp['data']['name'] == another_name
assert json_resp['data']['id'] == another_service.id
def test_get_user_service_user_not_exists(notify_api, notify_db, notify_db_session, sample_service):
"""
Tests GET endpoint "/<user_id>/service/<service_id>" 404 is returned for invalid user.
"""
with notify_api.test_request_context():
with notify_api.test_client() as client:
user = User.query.first()
assert Service.query.count() == 1
resp = client.get(
url_for('user.get_service_by_user_id', user_id="123", service_id=sample_service.id),
headers=[('Content-Type', 'application/json')])
assert resp.status_code == 404
json_resp = json.loads(resp.get_data(as_text=True))
assert "User not found" in json_resp['message']
def test_get_user_service_service_not_exists(notify_api, notify_db, notify_db_session, sample_service):
"""
Tests GET endpoint "/<user_id>/service/<service_id>" 404 is returned for invalid service.
"""
with notify_api.test_request_context():
with notify_api.test_client() as client:
user = User.query.first()
assert Service.query.count() == 1
resp = client.get(
url_for('user.get_service_by_user_id', user_id=user.id, service_id="123"),
headers=[('Content-Type', 'application/json')])
assert resp.status_code == 404
json_resp = json.loads(resp.get_data(as_text=True))
assert "Service not found" in json_resp['message']
def test_delete_user(notify_api, notify_db, notify_db_session, sample_user):
"""
Tests DELETE endpoint '/<user_id>' delete user.
"""
with notify_api.test_request_context():
with notify_api.test_client() as client:
user = User.query.first()
resp = client.delete(
url_for('user.update_user', user_id=user.id),
headers=[('Content-Type', 'application/json')])
assert resp.status_code == 202
json_resp = json.loads(resp.get_data(as_text=True))
assert User.query.count() == 0
def test_delete_user_not_exists(notify_api, notify_db, notify_db_session, sample_user):
"""
Tests DELETE endpoint '/<user_id>' delete user.
"""
with notify_api.test_request_context():
with notify_api.test_client() as client:
user = User.query.first()
resp = client.delete(
url_for('user.update_user', user_id="123"),
headers=[('Content-Type', 'application/json')])
assert resp.status_code == 404
assert User.query.count() == 1

View File

@@ -1,7 +1,14 @@
import pytest
import mock
from app import create_app
import os
from config import configs
from alembic.command import upgrade
from alembic.config import Config
from flask.ext.migrate import Migrate, MigrateCommand
from flask.ext.script import Manager
from sqlalchemy.schema import MetaData
from app import create_app, db
from app import models
@pytest.fixture(scope='session')
@@ -17,6 +24,39 @@ def notify_api(request):
return app
@pytest.fixture(scope='session')
def notify_db(notify_api, request):
Migrate(notify_api, db)
Manager(db, MigrateCommand)
BASE_DIR = os.path.dirname(os.path.dirname(__file__))
ALEMBIC_CONFIG = os.path.join(BASE_DIR, 'migrations')
config = Config(ALEMBIC_CONFIG + '/alembic.ini')
config.set_main_option("script_location", ALEMBIC_CONFIG)
with notify_api.app_context():
upgrade(config, 'head')
def teardown():
db.session.remove()
db.drop_all()
db.engine.execute("drop table alembic_version")
db.get_engine(notify_api).dispose()
request.addfinalizer(teardown)
@pytest.fixture(scope='function')
def notify_db_session(request):
def teardown():
db.session.remove()
for tbl in reversed(meta.sorted_tables):
if tbl.fullname not in ['roles']:
db.engine.execute(tbl.delete())
meta = MetaData(bind=db.engine, reflect=True)
request.addfinalizer(teardown)
@pytest.fixture(scope='function')
def notify_config(notify_api):
notify_api.config['NOTIFY_API_ENVIRONMENT'] = 'test'