[WIP] Start of api for accepting invite.

This commit is contained in:
Adam Shimali
2016-02-29 17:38:02 +00:00
parent 4e678ac391
commit 3b66745677
7 changed files with 197 additions and 2 deletions

View File

@@ -48,6 +48,7 @@ def create_app():
from app.notifications.rest import notifications as notifications_blueprint from app.notifications.rest import notifications as notifications_blueprint
from app.invite.rest import invite as invite_blueprint from app.invite.rest import invite as invite_blueprint
from app.permission.rest import permission as permission_blueprint from app.permission.rest import permission as permission_blueprint
from app.accept_invite.rest import accept_invite
application.register_blueprint(service_blueprint, url_prefix='/service') application.register_blueprint(service_blueprint, url_prefix='/service')
application.register_blueprint(user_blueprint, url_prefix='/user') application.register_blueprint(user_blueprint, url_prefix='/user')
@@ -57,6 +58,7 @@ def create_app():
application.register_blueprint(job_blueprint) application.register_blueprint(job_blueprint)
application.register_blueprint(invite_blueprint) application.register_blueprint(invite_blueprint)
application.register_blueprint(permission_blueprint, url_prefix='/permission') application.register_blueprint(permission_blueprint, url_prefix='/permission')
application.register_blueprint(accept_invite, url_prefix='/invite')
return application return application

View File

41
app/accept_invite/rest.py Normal file
View File

@@ -0,0 +1,41 @@
from flask import (
Blueprint,
jsonify,
current_app
)
from itsdangerous import SignatureExpired
from utils.url_safe_token import check_token
from app.dao.invited_user_dao import get_invited_user_by_id
from app.errors import register_errors
from app.schemas import invited_user_schema
accept_invite = Blueprint('accept_invite', __name__)
register_errors(accept_invite)
@accept_invite.route('/<token>', methods=['GET'])
def get_invited_user_by_token(token):
max_age_seconds = 60 * 60 * 24 * current_app.config['INVITATION_EXPIRATION_DAYS']
try:
invited_user_id = check_token(token,
current_app.config['SECRET_KEY'],
current_app.config['DANGEROUS_SALT'],
max_age_seconds)
except SignatureExpired:
message = 'Invitation with id {} expired'.format(invited_user_id)
return jsonify(result='error', message=message), 400
invited_user = get_invited_user_by_id(invited_user_id)
if not invited_user:
message = 'Invited user not found with id: {}'.format(invited_user_id)
return jsonify(result='error', message=message), 404
return jsonify(data=invited_user_schema.dump(invited_user).data), 200

View File

@@ -12,5 +12,9 @@ def get_invited_user(service_id, invited_user_id):
return InvitedUser.query.filter_by(service_id=service_id, id=invited_user_id).first() return InvitedUser.query.filter_by(service_id=service_id, id=invited_user_id).first()
def get_invited_user_by_id(invited_user_id):
return InvitedUser.query.filter_by(id=invited_user_id).first()
def get_invited_users_for_service(service_id): def get_invited_users_for_service(service_id):
return InvitedUser.query.filter_by(service_id=service_id).all() return InvitedUser.query.filter_by(service_id=service_id).all()

View File

@@ -155,5 +155,23 @@ def get_users_for_service(service_id):
return jsonify(data=result.data) return jsonify(data=result.data)
@service.route('/<service_id>/users/<user_id>', methods=['POST'])
def add_user_to_service(service_id, user_id):
service = dao_fetch_service_by_id(service_id)
if not service:
return _service_not_found(service_id)
user = get_model_users(user_id=user_id)
if user in service.users:
return jsonify(result='error',
message='User id: {} already part of service id: {}'.format(user_id, service_id)), 400
service.users.append(user)
dao_update_service(service)
data, errors = service_schema.dump(service)
return jsonify(data=data), 201
def _service_not_found(service_id): def _service_not_found(service_id):
return jsonify(result='error', message='Service not found for id: {}'.format(service_id)), 404 return jsonify(result='error', message='Service not found for id: {}'.format(service_id)), 404

View File

@@ -5,7 +5,8 @@ from app.models import InvitedUser
from app.dao.invited_user_dao import ( from app.dao.invited_user_dao import (
save_invited_user, save_invited_user,
get_invited_user, get_invited_user,
get_invited_users_for_service get_invited_users_for_service,
get_invited_user_by_id
) )
@@ -33,11 +34,16 @@ def test_create_invited_user(notify_db, notify_db_session, sample_service):
assert 'manage_service' in permissions assert 'manage_service' in permissions
def test_get_invited_user(notify_db, notify_db_session, sample_invited_user): def test_get_invited_user_by_service_and_id(notify_db, notify_db_session, sample_invited_user):
from_db = get_invited_user(sample_invited_user.service.id, sample_invited_user.id) from_db = get_invited_user(sample_invited_user.service.id, sample_invited_user.id)
assert from_db == sample_invited_user assert from_db == sample_invited_user
def test_get_invited_user_by_id(notify_db, notify_db_session, sample_invited_user):
from_db = get_invited_user_by_id(sample_invited_user.id)
assert from_db == sample_invited_user
def test_get_unknown_invited_user_returns_none(notify_db, notify_db_session, sample_service): def test_get_unknown_invited_user_returns_none(notify_db, notify_db_session, sample_service):
unknown_id = uuid.uuid4() unknown_id = uuid.uuid4()

View File

@@ -456,3 +456,127 @@ def test_default_permissions_are_added_for_user_service(notify_api,
service_permissions = json_resp['data']['permissions'][str(sample_service.id)] service_permissions = json_resp['data']['permissions'][str(sample_service.id)]
from app.dao.permissions_dao import default_service_permissions from app.dao.permissions_dao import default_service_permissions
assert sorted(default_service_permissions) == sorted(service_permissions) assert sorted(default_service_permissions) == sorted(service_permissions)
def test_add_existing_user_to_another_service(notify_api, notify_db, notify_db_session, sample_service):
with notify_api.test_request_context():
with notify_api.test_client() as client:
# check which users part of service
user_already_in_service = sample_service.users[0]
auth_header = create_authorization_header(
path='/service/{}/users'.format(sample_service.id),
method='GET'
)
resp = client.get(
'/service/{}/users'.format(sample_service.id),
headers=[('Content-Type', 'application/json'), auth_header]
)
assert resp.status_code == 200
result = json.loads(resp.get_data(as_text=True))
assert len(result['data']) == 1
assert result['data'][0]['email_address'] == user_already_in_service.email_address
# add new user to service
user_to_add = User(
name='Invited User',
email_address='invited@digital.cabinet-office.gov.uk',
password='password',
mobile_number='+4477123456'
)
# they must exist in db first
save_model_user(user_to_add)
auth_header = create_authorization_header(
path='/service/{}/users/{}'.format(sample_service.id, user_to_add.id),
method='POST'
)
resp = client.post(
'/service/{}/users/{}'.format(sample_service.id, user_to_add.id),
headers=[('Content-Type', 'application/json'), auth_header]
)
assert resp.status_code == 201
# check new user added to service
auth_header = create_authorization_header(
path='/service/{}/users'.format(sample_service.id),
method='GET'
)
resp = client.get(
'/service/{}/users'.format(sample_service.id),
headers=[('Content-Type', 'application/json'), auth_header]
)
assert resp.status_code == 200
result = json.loads(resp.get_data(as_text=True))
assert len(result['data']) == 2
assert _user_email_in_list(result['data'], user_already_in_service.email_address)
assert _user_email_in_list(result['data'], user_to_add.email_address)
def test_add_existing_user_to_non_existing_service_returns404(notify_api, notify_db, notify_db_session):
with notify_api.test_request_context():
with notify_api.test_client() as client:
user_to_add = User(
name='Invited User',
email_address='invited@digital.cabinet-office.gov.uk',
password='password',
mobile_number='+4477123456'
)
save_model_user(user_to_add)
incorrect_id = uuid.uuid4()
auth_header = create_authorization_header(
path='/service/{}/users/{}'.format(incorrect_id, user_to_add.id),
method='POST'
)
resp = client.post(
'/service/{}/users/{}'.format(incorrect_id, user_to_add.id),
headers=[('Content-Type', 'application/json'), auth_header]
)
result = json.loads(resp.get_data(as_text=True))
expected_message = 'Service not found for id: {}'.format(incorrect_id)
assert resp.status_code == 404
assert result['result'] == 'error'
assert result['message'] == expected_message
def test_add_existing_user_of_service_to_service_returns400(notify_api, notify_db, notify_db_session, sample_service):
with notify_api.test_request_context():
with notify_api.test_client() as client:
existing_user_id = sample_service.users[0].id
auth_header = create_authorization_header(
path='/service/{}/users/{}'.format(sample_service.id, existing_user_id),
method='POST'
)
resp = client.post(
'/service/{}/users/{}'.format(sample_service.id, existing_user_id),
headers=[('Content-Type', 'application/json'), auth_header]
)
result = json.loads(resp.get_data(as_text=True))
expected_message = 'User id: {} already part of service id: {}'.format(existing_user_id, sample_service.id)
assert resp.status_code == 400
assert result['result'] == 'error'
assert result['message'] == expected_message
def _user_email_in_list(user_list, email_address):
for user in user_list:
if user['email_address'] == email_address:
return True
return False