Files
notifications-api/app/user/rest.py
Adam Shimali b33312b855 Change endpoint responses where there are marshalling, unmarshalling
or param errors to raise invalid data exception. That will cause
those responses to be handled in by errors.py, which will log
the errors.

Set most of schemas to strict mode so that marshmallow will raise
exception rather than checking for errors in return tuple from load.

Added handler to errors.py for marshmallow validation errors.
2016-06-15 14:37:51 +01:00

243 lines
8.5 KiB
Python

import json
import uuid
from datetime import datetime
from flask import (jsonify, request, Blueprint, current_app)
from app import encryption, DATETIME_FORMAT
from app.dao.users_dao import (
get_model_users,
save_model_user,
create_user_code,
get_user_code,
use_user_code,
increment_failed_login_count,
reset_failed_login_count,
get_user_by_email,
create_secret_code
)
from app.dao.permissions_dao import permission_dao
from app.dao.services_dao import dao_fetch_service_by_id
from app.dao.templates_dao import dao_get_template_by_id
from app.schemas import (
email_data_request_schema,
user_schema,
request_verify_code_schema,
user_schema_load_json,
permission_schema
)
from app.celery.tasks import (
send_sms,
email_reset_password,
send_email
)
from app.errors import (
register_errors,
InvalidRequest
)
user = Blueprint('user', __name__)
register_errors(user)
@user.route('', methods=['POST'])
def create_user():
user_to_create, errors = user_schema.load(request.get_json())
req_json = request.get_json()
# TODO password policy, what is valid password
if not req_json.get('password', None):
errors.update({'password': ['Missing data for required field.']})
raise InvalidRequest(errors, status_code=400)
save_model_user(user_to_create, pwd=req_json.get('password'))
return jsonify(data=user_schema.dump(user_to_create).data), 201
@user.route('/<uuid:user_id>', methods=['PUT'])
def update_user(user_id):
user_to_update = get_model_users(user_id=user_id)
req_json = request.get_json()
update_dct, errors = user_schema_load_json.load(req_json)
pwd = req_json.get('password', None)
# TODO password validation, it is already done on the admin app
# but would be good to have the same validation here.
if pwd is not None and not pwd:
errors.update({'password': ['Invalid data for field']})
raise InvalidRequest(errors, status_code=400)
save_model_user(user_to_update, update_dict=update_dct, pwd=pwd)
return jsonify(data=user_schema.dump(user_to_update).data), 200
@user.route('/<uuid:user_id>/verify/password', methods=['POST'])
def verify_user_password(user_id):
user_to_verify = get_model_users(user_id=user_id)
txt_pwd = None
try:
txt_pwd = request.get_json()['password']
except KeyError:
message = 'Required field missing data'
errors = {'password': [message]}
raise InvalidRequest(errors, status_code=400)
if user_to_verify.check_password(txt_pwd):
user_to_verify.logged_in_at = datetime.utcnow()
save_model_user(user_to_verify)
reset_failed_login_count(user_to_verify)
return jsonify({}), 204
else:
increment_failed_login_count(user_to_verify)
message = 'Incorrect password'
errors = {'password': [message]}
raise InvalidRequest(errors, status_code=400)
@user.route('/<uuid:user_id>/verify/code', methods=['POST'])
def verify_user_code(user_id):
user_to_verify = get_model_users(user_id=user_id)
txt_code = None
resp_json = request.get_json()
txt_type = None
errors = {}
try:
txt_code = resp_json['code']
except KeyError:
errors.update({'code': ['Required field missing data']})
try:
txt_type = resp_json['code_type']
except KeyError:
errors.update({'code_type': ['Required field missing data']})
if errors:
raise InvalidRequest(errors, status_code=400)
code = get_user_code(user_to_verify, txt_code, txt_type)
if not code:
raise InvalidRequest("Code not found", status_code=404)
if datetime.utcnow() > code.expiry_datetime or code.code_used:
raise InvalidRequest("Code has expired", status_code=400)
use_user_code(code.id)
return jsonify({}), 204
@user.route('/<uuid:user_id>/sms-code', methods=['POST'])
def send_user_sms_code(user_id):
user_to_send_to = get_model_users(user_id=user_id)
verify_code, errors = request_verify_code_schema.load(request.get_json())
secret_code = create_secret_code()
create_user_code(user_to_send_to, secret_code, 'sms')
mobile = user_to_send_to.mobile_number if verify_code.get('to', None) is None else verify_code.get('to')
sms_code_template_id = current_app.config['SMS_CODE_TEMPLATE_ID']
sms_code_template = dao_get_template_by_id(sms_code_template_id)
verification_message = encryption.encrypt({
'template': sms_code_template_id,
'template_version': sms_code_template.version,
'to': mobile,
'personalisation': {
'verify_code': secret_code
}
})
send_sms.apply_async([current_app.config['NOTIFY_SERVICE_ID'],
str(uuid.uuid4()),
verification_message,
datetime.utcnow().strftime(DATETIME_FORMAT)
], queue='sms-code')
return jsonify({}), 204
@user.route('/<uuid:user_id>/email-verification', methods=['POST'])
def send_user_email_verification(user_id):
user_to_send_to = get_model_users(user_id=user_id)
verify_code, errors = request_verify_code_schema.load(request.get_json())
secret_code = create_secret_code()
create_user_code(user_to_send_to, secret_code, 'email')
template = dao_get_template_by_id(current_app.config['EMAIL_VERIFY_CODE_TEMPLATE_ID'])
message = {
'template': str(template.id),
'template_version': template.version,
'to': user_to_send_to.email_address,
'personalisation': {
'name': user_to_send_to.name,
'url': _create_verification_url(user_to_send_to, secret_code)
}
}
send_email.apply_async((
current_app.config['NOTIFY_SERVICE_ID'],
str(uuid.uuid4()),
encryption.encrypt(message),
datetime.utcnow().strftime(DATETIME_FORMAT)
), queue='email-registration-verification')
return jsonify({}), 204
@user.route('/<uuid:user_id>', methods=['GET'])
@user.route('', methods=['GET'])
def get_user(user_id=None):
users = get_model_users(user_id=user_id)
result = user_schema.dump(users, many=True) if isinstance(users, list) else user_schema.dump(users)
return jsonify(data=result.data)
@user.route('/<uuid:user_id>/service/<uuid:service_id>/permission', methods=['POST'])
def set_permissions(user_id, service_id):
# TODO fix security hole, how do we verify that the user
# who is making this request has permission to make the request.
user = get_model_users(user_id=user_id)
service = dao_fetch_service_by_id(service_id=service_id)
permissions, errors = permission_schema.load(request.get_json(), many=True)
for p in permissions:
p.user = user
p.service = service
permission_dao.set_user_service_permission(user, service, permissions, _commit=True, replace=True)
return jsonify({}), 204
@user.route('/email', methods=['GET'])
def get_by_email():
email = request.args.get('email')
if not email:
error = 'Invalid request. Email query string param required'
raise InvalidRequest(error, status_code=400)
fetched_user = get_user_by_email(email)
result = user_schema.dump(fetched_user)
return jsonify(data=result.data)
@user.route('/reset-password', methods=['POST'])
def send_user_reset_password():
email, errors = email_data_request_schema.load(request.get_json())
user_to_send_to = get_user_by_email(email['email'])
reset_password_message = {'to': user_to_send_to.email_address,
'name': user_to_send_to.name,
'reset_password_url': _create_reset_password_url(user_to_send_to.email_address)}
email_reset_password.apply_async([encryption.encrypt(reset_password_message)], queue='email-reset-password')
return jsonify({}), 204
def _create_reset_password_url(email):
from notifications_utils.url_safe_token import generate_token
data = json.dumps({'email': email, 'created_at': str(datetime.utcnow())})
token = generate_token(data, current_app.config['SECRET_KEY'], current_app.config['DANGEROUS_SALT'])
return current_app.config['ADMIN_BASE_URL'] + '/new-password/' + token
def _create_verification_url(user, secret_code):
from notifications_utils.url_safe_token import generate_token
data = json.dumps({'user_id': str(user.id), 'email': user.email_address, 'secret_code': secret_code})
token = generate_token(data, current_app.config['SECRET_KEY'], current_app.config['DANGEROUS_SALT'])
return current_app.config['ADMIN_BASE_URL'] + '/verify-email/' + token