mirror of
https://github.com/GSA/notifications-api.git
synced 2026-02-04 10:21:14 -05:00
More tests.
This commit is contained in:
@@ -3,18 +3,28 @@ from datetime import datetime
|
|||||||
from sqlalchemy.orm import load_only
|
from sqlalchemy.orm import load_only
|
||||||
from . import DAOException
|
from . import DAOException
|
||||||
from app import db
|
from app import db
|
||||||
from app.models import Service
|
from app.models import (Service, User)
|
||||||
|
|
||||||
|
|
||||||
def save_model_service(service, update_dict=None):
|
def save_model_service(service, update_dict=None):
|
||||||
users_list = update_dict.get('users', []) if update_dict else getattr(service, 'users', [])
|
users_list = update_dict.get('users', []) if update_dict else getattr(service, 'users', [])
|
||||||
if not users_list:
|
if not users_list:
|
||||||
error_msg = {'users': 'Missing data for required attribute'}
|
error_msg = {'users': ['Missing data for required attribute']}
|
||||||
raise DAOException(json.dumps(error_msg))
|
raise DAOException(json.dumps(error_msg))
|
||||||
if update_dict:
|
if update_dict:
|
||||||
del update_dict['id']
|
# Make sure the update_dict doesn't contain conflicting
|
||||||
del update_dict['users']
|
update_dict.pop('id', None)
|
||||||
db.session.query(Service).filter_by(id=service.id).update(update_dict)
|
update_dict.pop('users', None)
|
||||||
|
# TODO optimize this algorithm
|
||||||
|
new_users = User.query.filter(User.id.in_(users_list)).all()
|
||||||
|
for x in service.users:
|
||||||
|
if x in new_users:
|
||||||
|
new_users.remove(x)
|
||||||
|
else:
|
||||||
|
service.users.remove(x)
|
||||||
|
for x in new_users:
|
||||||
|
service.users.append(x)
|
||||||
|
Service.query.filter_by(id=service.id).update(update_dict)
|
||||||
else:
|
else:
|
||||||
db.session.add(service)
|
db.session.add(service)
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ from sqlalchemy.exc import DataError
|
|||||||
from sqlalchemy.orm.exc import NoResultFound
|
from sqlalchemy.orm.exc import NoResultFound
|
||||||
from app.dao.services_dao import (save_model_service, get_model_services)
|
from app.dao.services_dao import (save_model_service, get_model_services)
|
||||||
from app.dao.users_dao import get_model_users
|
from app.dao.users_dao import get_model_users
|
||||||
|
from app.dao import DAOException
|
||||||
from .. import service
|
from .. import service
|
||||||
from app import db
|
from app import db
|
||||||
from app.schemas import (services_schema, service_schema)
|
from app.schemas import (services_schema, service_schema)
|
||||||
@@ -17,7 +18,10 @@ def create_service():
|
|||||||
return jsonify(result="error", message=errors), 400
|
return jsonify(result="error", message=errors), 400
|
||||||
# I believe service is already added to the session but just needs a
|
# I believe service is already added to the session but just needs a
|
||||||
# db.session.commit
|
# db.session.commit
|
||||||
save_model_service(service)
|
try:
|
||||||
|
save_model_service(service)
|
||||||
|
except DAOException as e:
|
||||||
|
return jsonify(result="error", message=str(e)), 400
|
||||||
return jsonify(data=service_schema.dump(service).data), 201
|
return jsonify(data=service_schema.dump(service).data), 201
|
||||||
|
|
||||||
|
|
||||||
@@ -31,14 +35,17 @@ def update_service(service_id):
|
|||||||
except NoResultFound:
|
except NoResultFound:
|
||||||
return jsonify(result="error", message="Service not found"), 404
|
return jsonify(result="error", message="Service not found"), 404
|
||||||
# TODO there has got to be a better way to do the next three lines
|
# TODO there has got to be a better way to do the next three lines
|
||||||
update_service, errors = service_schema.load(request.get_json())
|
upd_serv, errors = service_schema.load(request.get_json())
|
||||||
if errors:
|
if errors:
|
||||||
return jsonify(result="error", message=errors), 400
|
return jsonify(result="error", message=errors), 400
|
||||||
update_dict, errors = service_schema.dump(update_service)
|
update_dict, errors = service_schema.dump(upd_serv)
|
||||||
# TODO FIX ME
|
# TODO FIX ME
|
||||||
# Remove update_service model which is added to db.session
|
# Remove update_service model which is added to db.session
|
||||||
db.session.rollback()
|
db.session.rollback()
|
||||||
save_model_service(service, update_dict=update_dict)
|
try:
|
||||||
|
save_model_service(service, update_dict=update_dict)
|
||||||
|
except DAOException as e:
|
||||||
|
return jsonify(result="error", message=str(e)), 400
|
||||||
return jsonify(data=service_schema.dump(service).data)
|
return jsonify(data=service_schema.dump(service).data)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -1,5 +1,7 @@
|
|||||||
import json
|
import json
|
||||||
from app.models import (Service, User)
|
from app.models import (Service, User)
|
||||||
|
from app.dao.services_dao import save_model_service
|
||||||
|
from tests.app.conftest import sample_user as create_sample_user
|
||||||
from flask import url_for
|
from flask import url_for
|
||||||
|
|
||||||
|
|
||||||
@@ -57,9 +59,61 @@ def test_post_service(notify_api, notify_db, notify_db_session, sample_user):
|
|||||||
assert json_resp['data']['limit'] == service.limit
|
assert json_resp['data']['limit'] == service.limit
|
||||||
|
|
||||||
|
|
||||||
|
def test_post_service_multiple_users(notify_api, notify_db, notify_db_session, sample_user):
|
||||||
|
"""
|
||||||
|
Tests POST endpoint '/' to create a service with multiple users.
|
||||||
|
"""
|
||||||
|
with notify_api.test_request_context():
|
||||||
|
with notify_api.test_client() as client:
|
||||||
|
another_user = create_sample_user(
|
||||||
|
notify_db,
|
||||||
|
notify_db_session,
|
||||||
|
"new@digital.cabinet-office.gov.uk")
|
||||||
|
assert Service.query.count() == 0
|
||||||
|
data = {
|
||||||
|
'name': 'created service',
|
||||||
|
'users': [sample_user.id, another_user.id],
|
||||||
|
'limit': 1000,
|
||||||
|
'restricted': False,
|
||||||
|
'active': False}
|
||||||
|
headers = [('Content-Type', 'application/json')]
|
||||||
|
resp = client.post(
|
||||||
|
url_for('service.create_service'),
|
||||||
|
data=json.dumps(data),
|
||||||
|
headers=headers)
|
||||||
|
assert resp.status_code == 201
|
||||||
|
service = Service.query.first()
|
||||||
|
json_resp = json.loads(resp.get_data(as_text=True))
|
||||||
|
assert json_resp['data']['name'] == service.name
|
||||||
|
assert json_resp['data']['limit'] == service.limit
|
||||||
|
assert len(service.users) == 2
|
||||||
|
|
||||||
|
def test_post_service_without_users_attribute(notify_api, notify_db, notify_db_session):
|
||||||
|
"""
|
||||||
|
Tests POST endpoint '/' to create a service without 'users' attribute.
|
||||||
|
"""
|
||||||
|
with notify_api.test_request_context():
|
||||||
|
with notify_api.test_client() as client:
|
||||||
|
assert Service.query.count() == 0
|
||||||
|
data = {
|
||||||
|
'name': 'created service',
|
||||||
|
'limit': 1000,
|
||||||
|
'restricted': False,
|
||||||
|
'active': False}
|
||||||
|
headers = [('Content-Type', 'application/json')]
|
||||||
|
resp = client.post(
|
||||||
|
url_for('service.create_service'),
|
||||||
|
data=json.dumps(data),
|
||||||
|
headers=headers)
|
||||||
|
assert resp.status_code == 400
|
||||||
|
assert Service.query.count() == 0
|
||||||
|
json_resp = json.loads(resp.get_data(as_text=True))
|
||||||
|
assert json_resp['message'] == '{"users": ["Missing data for required attribute"]}'
|
||||||
|
|
||||||
|
|
||||||
def test_put_service(notify_api, notify_db, notify_db_session, sample_service):
|
def test_put_service(notify_api, notify_db, notify_db_session, sample_service):
|
||||||
"""
|
"""
|
||||||
Tests Put endpoint '/<service_id' to edit a service.
|
Tests PUT endpoint '/<service_id>' to edit a service.
|
||||||
"""
|
"""
|
||||||
with notify_api.test_request_context():
|
with notify_api.test_request_context():
|
||||||
with notify_api.test_client() as client:
|
with notify_api.test_client() as client:
|
||||||
@@ -85,3 +139,73 @@ def test_put_service(notify_api, notify_db, notify_db_session, sample_service):
|
|||||||
assert json_resp['data']['name'] == updated_service.name
|
assert json_resp['data']['name'] == updated_service.name
|
||||||
assert json_resp['data']['limit'] == updated_service.limit
|
assert json_resp['data']['limit'] == updated_service.limit
|
||||||
assert updated_service.name == new_name
|
assert updated_service.name == new_name
|
||||||
|
|
||||||
|
|
||||||
|
def test_put_service_add_user(notify_api, notify_db, notify_db_session, sample_service):
|
||||||
|
"""
|
||||||
|
Tests PUT endpoint '/<service_id>' add user to the service.
|
||||||
|
"""
|
||||||
|
with notify_api.test_request_context():
|
||||||
|
with notify_api.test_client() as client:
|
||||||
|
assert Service.query.count() == 1
|
||||||
|
another_user = create_sample_user(
|
||||||
|
notify_db,
|
||||||
|
notify_db_session,
|
||||||
|
"new@digital.cabinet-office.gov.uk")
|
||||||
|
sample_user = User.query.first()
|
||||||
|
old_service = Service.query.first()
|
||||||
|
new_name = 'updated service'
|
||||||
|
data = {
|
||||||
|
'name': new_name,
|
||||||
|
'users': [sample_user.id, another_user.id],
|
||||||
|
'limit': 1000,
|
||||||
|
'restricted': False,
|
||||||
|
'active': False}
|
||||||
|
headers = [('Content-Type', 'application/json')]
|
||||||
|
resp = client.put(
|
||||||
|
url_for('service.update_service', service_id=old_service.id),
|
||||||
|
data=json.dumps(data),
|
||||||
|
headers=headers)
|
||||||
|
assert Service.query.count() == 1
|
||||||
|
assert resp.status_code == 200
|
||||||
|
updated_service = Service.query.first()
|
||||||
|
json_resp = json.loads(resp.get_data(as_text=True))
|
||||||
|
assert len(json_resp['data']['users']) == 2
|
||||||
|
assert sample_user.id in json_resp['data']['users']
|
||||||
|
assert another_user.id in json_resp['data']['users']
|
||||||
|
|
||||||
|
|
||||||
|
def test_put_service_remove_user(notify_api, notify_db, notify_db_session, sample_service):
|
||||||
|
"""
|
||||||
|
Tests PUT endpoint '/<service_id>' add user to the service.
|
||||||
|
"""
|
||||||
|
with notify_api.test_request_context():
|
||||||
|
with notify_api.test_client() as client:
|
||||||
|
sample_user = User.query.first()
|
||||||
|
another_user = create_sample_user(
|
||||||
|
notify_db,
|
||||||
|
notify_db_session,
|
||||||
|
"new@digital.cabinet-office.gov.uk")
|
||||||
|
data = {
|
||||||
|
'name': sample_service.name,
|
||||||
|
'users': [sample_user.id, another_user.id],
|
||||||
|
'limit': sample_service.limit,
|
||||||
|
'restricted': sample_service.restricted,
|
||||||
|
'active': sample_service.active}
|
||||||
|
save_model_service(sample_service, update_dict=data)
|
||||||
|
assert Service.query.count() == 1
|
||||||
|
sample_user = User.query.first()
|
||||||
|
data['users'] = [another_user.id]
|
||||||
|
headers = [('Content-Type', 'application/json')]
|
||||||
|
resp = client.put(
|
||||||
|
url_for('service.update_service', service_id=sample_service.id),
|
||||||
|
data=json.dumps(data),
|
||||||
|
headers=headers)
|
||||||
|
assert Service.query.count() == 1
|
||||||
|
assert resp.status_code == 200
|
||||||
|
updated_service = Service.query.first()
|
||||||
|
json_resp = json.loads(resp.get_data(as_text=True))
|
||||||
|
assert len(json_resp['data']['users']) == 1
|
||||||
|
assert sample_user.id not in json_resp['data']['users']
|
||||||
|
assert another_user.id in json_resp['data']['users']
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user