mirror of
https://github.com/GSA/notifications-api.git
synced 2026-02-05 02:41:14 -05:00
All four http methods working now for user and service restful apis.
This commit is contained in:
@@ -0,0 +1,6 @@
|
|||||||
|
from sqlalchemy.exc import SQLAlchemyError
|
||||||
|
|
||||||
|
|
||||||
|
# Should I use SQLAlchemyError?
|
||||||
|
class DAOException(SQLAlchemyError):
|
||||||
|
pass
|
||||||
|
|||||||
@@ -1,24 +1,22 @@
|
|||||||
import json
|
import json
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
|
||||||
from sqlalchemy.orm import load_only
|
from sqlalchemy.orm import load_only
|
||||||
from sqlalchemy.exc import SQLAlchemyError
|
from . import DAOException
|
||||||
|
|
||||||
from app import db
|
from app import db
|
||||||
from app.models import Service
|
from app.models import Service
|
||||||
|
|
||||||
|
|
||||||
# Should I use SQLAlchemyError?
|
def save_model_service(service, update_dict=None):
|
||||||
class DAOException(SQLAlchemyError):
|
users_list = update_dict.get('users', []) if update_dict else getattr(service, 'users', [])
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
def create_model_service(service):
|
|
||||||
users_list = getattr(service, 'users', [])
|
|
||||||
if not users_list:
|
if not users_list:
|
||||||
error_msg = {'users': 'Missing data for required attribute'}
|
error_msg = {'users': 'Missing data for required attribute'}
|
||||||
raise DAOException(json.dumps(error_msg))
|
raise DAOException(json.dumps(error_msg))
|
||||||
db.session.add(service)
|
if update_dict:
|
||||||
|
del update_dict['id']
|
||||||
|
del update_dict['users']
|
||||||
|
db.session.query(Service).filter_by(id=service.id).update(update_dict)
|
||||||
|
else:
|
||||||
|
db.session.add(service)
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -1,13 +1,17 @@
|
|||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
from . import DAOException
|
||||||
from sqlalchemy.orm import load_only
|
from sqlalchemy.orm import load_only
|
||||||
|
|
||||||
from app import db
|
from app import db
|
||||||
from app.models import User
|
from app.models import User
|
||||||
|
|
||||||
|
|
||||||
def create_model_user(usr):
|
def save_model_user(usr, update_dict={}):
|
||||||
db.session.add(usr)
|
if update_dict:
|
||||||
|
del update_dict['id']
|
||||||
|
db.session.query(User).filter_by(id=usr.id).update(update_dict)
|
||||||
|
else:
|
||||||
|
db.session.add(usr)
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -13,17 +13,6 @@ class UserSchema(ma.ModelSchema):
|
|||||||
model = models.User
|
model = models.User
|
||||||
exclude = ("updated_at", "created_at", "user_to_service")
|
exclude = ("updated_at", "created_at", "user_to_service")
|
||||||
|
|
||||||
def make_object(self, data):
|
|
||||||
# TODO possibly override to handle instance creation
|
|
||||||
return super(UserSchema, self).make_object(data)
|
|
||||||
|
|
||||||
# def dump(self, obj, many=None, update_fields=True, **kwargs):
|
|
||||||
# retval = super(UserSchema, self).dump(
|
|
||||||
# obj, many=many, update_fields=update_fields, **kwargs)
|
|
||||||
# if not many and 'email_address' not in retval.data:
|
|
||||||
# retval.data['email_address'] = obj.email_address
|
|
||||||
# return retval
|
|
||||||
|
|
||||||
|
|
||||||
# TODO process users list, to return a list of user.id
|
# TODO process users list, to return a list of user.id
|
||||||
# Should that list be restricted by the auth parsed??
|
# Should that list be restricted by the auth parsed??
|
||||||
@@ -32,10 +21,6 @@ class ServiceSchema(ma.ModelSchema):
|
|||||||
model = models.Service
|
model = models.Service
|
||||||
exclude = ("updated_at", "created_at")
|
exclude = ("updated_at", "created_at")
|
||||||
|
|
||||||
def make_object(self, data):
|
|
||||||
# TODO possibly override to handle instance creation
|
|
||||||
return super(ServiceSchema, self).make_object(data)
|
|
||||||
|
|
||||||
|
|
||||||
user_schema = UserSchema()
|
user_schema = UserSchema()
|
||||||
users_schema = UserSchema(many=True)
|
users_schema = UserSchema(many=True)
|
||||||
|
|||||||
@@ -1,9 +1,10 @@
|
|||||||
from flask import (jsonify, request)
|
from flask import (jsonify, request)
|
||||||
from sqlalchemy.exc import DataError
|
from sqlalchemy.exc import DataError
|
||||||
from sqlalchemy.orm.exc import NoResultFound
|
from sqlalchemy.orm.exc import NoResultFound
|
||||||
from app.dao.services_dao import (create_model_service, get_model_services)
|
from app.dao.services_dao import (save_model_service, get_model_services)
|
||||||
from app.dao.users_dao import get_model_users
|
from app.dao.users_dao import get_model_users
|
||||||
from .. import service
|
from .. import service
|
||||||
|
from app import db
|
||||||
from app.schemas import (services_schema, service_schema)
|
from app.schemas import (services_schema, service_schema)
|
||||||
|
|
||||||
|
|
||||||
@@ -11,17 +12,29 @@ from app.schemas import (services_schema, service_schema)
|
|||||||
@service.route('/', methods=['POST'])
|
@service.route('/', methods=['POST'])
|
||||||
def create_service():
|
def create_service():
|
||||||
# TODO what exceptions get passed from schema parsing?
|
# TODO what exceptions get passed from schema parsing?
|
||||||
service = service_schema.load(request.get_json()).data
|
service, errors = service_schema.load(request.get_json())
|
||||||
print(service_schema.dump(service).data)
|
# I believe service is already added to the session but just needs a
|
||||||
# Some magic here, it automatically creates the service object.
|
# db.session.commit
|
||||||
# Cool but need to understand how this works.
|
save_model_service(service)
|
||||||
return jsonify(data=service_schema.dump(service).data), 201
|
return jsonify(data=service_schema.dump(service).data), 201
|
||||||
|
|
||||||
|
|
||||||
# TODO auth to be added
|
# TODO auth to be added
|
||||||
@service.route('/<int:service_id>', methods=['PUT'])
|
@service.route('/<int:service_id>', methods=['PUT'])
|
||||||
def update_service(service_id):
|
def update_service(service_id):
|
||||||
service = get_services(service_id=service_id)
|
try:
|
||||||
|
service = get_model_services(service_id=service_id)
|
||||||
|
except DataError:
|
||||||
|
return jsonify(result="error", message="Invalid service id"), 400
|
||||||
|
except NoResultFound:
|
||||||
|
return jsonify(result="error", message="Service not found"), 404
|
||||||
|
# TODO there has got to be a better way to do the next three lines
|
||||||
|
update_service, errors = service_schema.load(request.get_json())
|
||||||
|
update_dict, errors = service_schema.dump(update_service)
|
||||||
|
# TODO FIX ME
|
||||||
|
# Remove update_service model which is added to db.session
|
||||||
|
db.session.rollback()
|
||||||
|
save_model_service(service, update_dict=update_dict)
|
||||||
return jsonify(data=service_schema.dump(service).data)
|
return jsonify(data=service_schema.dump(service).data)
|
||||||
|
|
||||||
|
|
||||||
@@ -34,6 +47,6 @@ def get_service(service_id=None):
|
|||||||
except DataError:
|
except DataError:
|
||||||
return jsonify(result="error", message="Invalid service id"), 400
|
return jsonify(result="error", message="Invalid service id"), 400
|
||||||
except NoResultFound:
|
except NoResultFound:
|
||||||
return jsonify(result="error", message="Service doesn't exist"), 404
|
return jsonify(result="error", message="Service not found"), 404
|
||||||
result = services_schema.dump(services) if isinstance(services, list) else service_schema.dump(services)
|
data, errors = services_schema.dump(services) if isinstance(services, list) else service_schema.dump(services)
|
||||||
return jsonify(data=result.data)
|
return jsonify(data=data)
|
||||||
|
|||||||
@@ -2,24 +2,37 @@ from flask import (jsonify, request)
|
|||||||
from sqlalchemy.exc import DataError
|
from sqlalchemy.exc import DataError
|
||||||
from sqlalchemy.orm.exc import NoResultFound
|
from sqlalchemy.orm.exc import NoResultFound
|
||||||
from app.dao.services_dao import get_model_services
|
from app.dao.services_dao import get_model_services
|
||||||
from app.dao.users_dao import (get_model_users, create_model_user)
|
from app.dao.users_dao import (get_model_users, save_model_user)
|
||||||
from app.schemas import (
|
from app.schemas import (
|
||||||
user_schema, users_schema, service_schema, services_schema)
|
user_schema, users_schema, service_schema, services_schema)
|
||||||
from .. import user
|
from .. import user
|
||||||
|
from app import db
|
||||||
|
|
||||||
|
|
||||||
# TODO auth to be added
|
# TODO auth to be added
|
||||||
@user.route('/', methods=['POST'])
|
@user.route('/', methods=['POST'])
|
||||||
def create_user():
|
def create_user():
|
||||||
user = user_schema.load(request.get_json()).data
|
user = user_schema.load(request.get_json()).data
|
||||||
create_model_user(user)
|
save_model_user(user)
|
||||||
return jsonify(data=user_schema.dump(user).data), 201
|
return jsonify(data=user_schema.dump(user).data), 201
|
||||||
|
|
||||||
|
|
||||||
# TODO auth to be added
|
# TODO auth to be added
|
||||||
@user.route('/<int:user_id>', methods=['PUT'])
|
@user.route('/<int:user_id>', methods=['PUT'])
|
||||||
def update_user(user_id):
|
def update_user(user_id):
|
||||||
user = get_model_users(user_id=user_id)
|
try:
|
||||||
|
user = get_model_users(user_id=user_id)
|
||||||
|
except DataError:
|
||||||
|
return jsonify(result="error", message="Invalid user id"), 400
|
||||||
|
except NoResultFound:
|
||||||
|
return jsonify(result="error", message="User not found"), 404
|
||||||
|
# TODO there has got to be a better way to do the next three lines
|
||||||
|
update_user, errors = user_schema.load(request.get_json())
|
||||||
|
update_dict, errors = user_schema.dump(update_user)
|
||||||
|
# TODO FIX ME
|
||||||
|
# Remove update_service model which is added to db.session
|
||||||
|
db.session.rollback()
|
||||||
|
save_model_user(user, update_dict=update_dict)
|
||||||
return jsonify(data=user_schema.dump(user).data)
|
return jsonify(data=user_schema.dump(user).data)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
import pytest
|
import pytest
|
||||||
from app.models import (User, Service)
|
from app.models import (User, Service)
|
||||||
from app.dao.users_dao import (create_model_user, get_model_users)
|
from app.dao.users_dao import (save_model_user, get_model_users)
|
||||||
from app.dao.services_dao import create_model_service
|
from app.dao.services_dao import save_model_service
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(scope='function')
|
@pytest.fixture(scope='function')
|
||||||
@@ -9,7 +9,7 @@ def sample_user(notify_db,
|
|||||||
notify_db_session,
|
notify_db_session,
|
||||||
email="notify@digital.cabinet-office.gov.uk"):
|
email="notify@digital.cabinet-office.gov.uk"):
|
||||||
user = User(**{'email_address': email})
|
user = User(**{'email_address': email})
|
||||||
create_model_user(user)
|
save_model_user(user)
|
||||||
return user
|
return user
|
||||||
|
|
||||||
|
|
||||||
@@ -27,5 +27,5 @@ def sample_service(notify_db,
|
|||||||
'active': False,
|
'active': False,
|
||||||
'restricted': False}
|
'restricted': False}
|
||||||
service = Service(**data)
|
service = Service(**data)
|
||||||
create_model_service(service)
|
save_model_service(service)
|
||||||
return service
|
return service
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
import pytest
|
import pytest
|
||||||
from app.dao.services_dao import (
|
from app.dao.services_dao import (
|
||||||
create_model_service, get_model_services, DAOException)
|
save_model_service, get_model_services, DAOException)
|
||||||
from tests.app.conftest import sample_service as create_sample_service
|
from tests.app.conftest import sample_service as create_sample_service
|
||||||
from app.models import Service
|
from app.models import Service
|
||||||
|
|
||||||
@@ -15,7 +15,7 @@ def test_create_service(notify_api, notify_db, notify_db_session, sample_user):
|
|||||||
'active': False,
|
'active': False,
|
||||||
'restricted': False}
|
'restricted': False}
|
||||||
service = Service(**data)
|
service = Service(**data)
|
||||||
create_model_service(service)
|
save_model_service(service)
|
||||||
assert Service.query.count() == 1
|
assert Service.query.count() == 1
|
||||||
assert Service.query.first().name == service_name
|
assert Service.query.first().name == service_name
|
||||||
assert Service.query.first().id == service.id
|
assert Service.query.first().id == service.id
|
||||||
@@ -58,7 +58,7 @@ def test_missing_user_attribute(notify_api, notify_db, notify_db_session):
|
|||||||
'restricted': False}
|
'restricted': False}
|
||||||
|
|
||||||
service = Service(**data)
|
service = Service(**data)
|
||||||
create_model_service(service)
|
save_model_service(service)
|
||||||
pytest.fail("DAOException not thrown")
|
pytest.fail("DAOException not thrown")
|
||||||
except DAOException as e:
|
except DAOException as e:
|
||||||
assert "Missing data for required attribute" in str(e)
|
assert "Missing data for required attribute" in str(e)
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
from sqlalchemy.exc import DataError
|
from sqlalchemy.exc import DataError
|
||||||
from sqlalchemy.orm.exc import NoResultFound
|
from sqlalchemy.orm.exc import NoResultFound
|
||||||
from app.dao.users_dao import (create_model_user, get_model_users)
|
from app.dao.users_dao import (save_model_user, get_model_users)
|
||||||
from tests.app.conftest import sample_user as create_sample_user
|
from tests.app.conftest import sample_user as create_sample_user
|
||||||
from app.models import User
|
from app.models import User
|
||||||
|
|
||||||
@@ -8,7 +8,7 @@ from app.models import User
|
|||||||
def test_create_user(notify_api, notify_db, notify_db_session):
|
def test_create_user(notify_api, notify_db, notify_db_session):
|
||||||
email = 'notify@digital.cabinet-office.gov.uk'
|
email = 'notify@digital.cabinet-office.gov.uk'
|
||||||
user = User(**{'email_address': email})
|
user = User(**{'email_address': email})
|
||||||
create_model_user(user)
|
save_model_user(user)
|
||||||
assert User.query.count() == 1
|
assert User.query.count() == 1
|
||||||
assert User.query.first().email_address == email
|
assert User.query.first().email_address == email
|
||||||
assert User.query.first().id == user.id
|
assert User.query.first().id == user.id
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
import json
|
import json
|
||||||
from app.models import Service
|
from app.models import (Service, User)
|
||||||
from flask import url_for
|
from flask import url_for
|
||||||
|
|
||||||
|
|
||||||
@@ -58,4 +58,30 @@ def test_post_service(notify_api, notify_db, notify_db_session, sample_user):
|
|||||||
|
|
||||||
|
|
||||||
def test_put_service(notify_api, notify_db, notify_db_session, sample_service):
|
def test_put_service(notify_api, notify_db, notify_db_session, sample_service):
|
||||||
pass
|
"""
|
||||||
|
Tests Put endpoint '/<service_id' to edit a service.
|
||||||
|
"""
|
||||||
|
with notify_api.test_request_context():
|
||||||
|
with notify_api.test_client() as client:
|
||||||
|
assert Service.query.count() == 1
|
||||||
|
sample_user = User.query.first()
|
||||||
|
old_service = Service.query.first()
|
||||||
|
new_name = 'updated service'
|
||||||
|
data = {
|
||||||
|
'name': new_name,
|
||||||
|
'users': [sample_user.id],
|
||||||
|
'limit': 1000,
|
||||||
|
'restricted': False,
|
||||||
|
'active': False}
|
||||||
|
headers = [('Content-Type', 'application/json')]
|
||||||
|
resp = client.put(
|
||||||
|
url_for('service.update_service', service_id=old_service.id),
|
||||||
|
data=json.dumps(data),
|
||||||
|
headers=headers)
|
||||||
|
assert Service.query.count() == 1
|
||||||
|
assert resp.status_code == 200
|
||||||
|
updated_service = Service.query.first()
|
||||||
|
json_resp = json.loads(resp.get_data(as_text=True))
|
||||||
|
assert json_resp['data']['name'] == updated_service.name
|
||||||
|
assert json_resp['data']['limit'] == updated_service.limit
|
||||||
|
assert updated_service.name == new_name
|
||||||
|
|||||||
@@ -54,4 +54,20 @@ def test_post_user(notify_api, notify_db, notify_db_session):
|
|||||||
|
|
||||||
|
|
||||||
def test_put_user(notify_api, notify_db, notify_db_session, sample_user):
|
def test_put_user(notify_api, notify_db, notify_db_session, sample_user):
|
||||||
pass
|
with notify_api.test_request_context():
|
||||||
|
with notify_api.test_client() as client:
|
||||||
|
assert User.query.count() == 1
|
||||||
|
new_email = 'new@digital.cabinet-office.gov.uk'
|
||||||
|
data = {
|
||||||
|
'email_address': new_email}
|
||||||
|
headers = [('Content-Type', 'application/json')]
|
||||||
|
resp = client.put(
|
||||||
|
url_for('user.update_user', user_id=sample_user.id),
|
||||||
|
data=json.dumps(data),
|
||||||
|
headers=headers)
|
||||||
|
assert resp.status_code == 200
|
||||||
|
assert User.query.count() == 1
|
||||||
|
user = User.query.first()
|
||||||
|
json_resp = json.loads(resp.get_data(as_text=True))
|
||||||
|
assert json_resp['data']['email_address'] == new_email
|
||||||
|
assert json_resp['data']['id'] == user.id
|
||||||
|
|||||||
Reference in New Issue
Block a user