add email code verification

by hitting POST /<user_id>/email-code, we create an email two factor
code to send to the user. That email contains a link with a token that
will sign the user in when opened.

Also some other things:

"email verification" (aka when you first create an account) doesn't
hit the API anymore

refactor 2fa code verification and sending to use jsonschema, and share code between sms and email

Die marshmallow die!
This commit is contained in:
Leo Hemsted
2017-11-03 09:51:50 +00:00
parent 8b2c242355
commit b2756ac99d
6 changed files with 109 additions and 78 deletions

View File

@@ -47,12 +47,7 @@ def get_user_code(user, code, code_type):
codes = VerifyCode.query.filter_by( codes = VerifyCode.query.filter_by(
user=user, code_type=code_type).order_by( user=user, code_type=code_type).order_by(
VerifyCode.created_at.desc()) VerifyCode.created_at.desc())
retval = None return next((x for x in codes if x.check_code(code)), None)
for x in codes:
if x.check_code(code):
retval = x
break
return retval
def delete_codes_older_created_more_than_a_day_ago(): def delete_codes_older_created_more_than_a_day_ago():

View File

@@ -366,14 +366,6 @@ class JobSchema(BaseSchema):
strict = True strict = True
class RequestVerifyCodeSchema(ma.Schema):
class Meta:
strict = True
to = fields.Str(required=False)
class NotificationSchema(ma.Schema): class NotificationSchema(ma.Schema):
class Meta: class Meta:
@@ -653,7 +645,6 @@ api_key_schema = ApiKeySchema()
api_key_schema_load_json = ApiKeySchema(load_json=True) api_key_schema_load_json = ApiKeySchema(load_json=True)
job_schema = JobSchema() job_schema = JobSchema()
job_schema_load_json = JobSchema(load_json=True) job_schema_load_json = JobSchema(load_json=True)
request_verify_code_schema = RequestVerifyCodeSchema()
sms_admin_notification_schema = SmsAdminNotificationSchema() sms_admin_notification_schema = SmsAdminNotificationSchema()
sms_template_notification_schema = SmsTemplateNotificationSchema() sms_template_notification_schema = SmsTemplateNotificationSchema()
job_sms_template_notification_schema = JobSmsTemplateNotificationSchema() job_sms_template_notification_schema = JobSmsTemplateNotificationSchema()

View File

@@ -30,7 +30,6 @@ from app.notifications.process_notifications import (
from app.schemas import ( from app.schemas import (
email_data_request_schema, email_data_request_schema,
user_schema, user_schema,
request_verify_code_schema,
permission_schema, permission_schema,
user_schema_load_json, user_schema_load_json,
user_update_schema_load_json, user_update_schema_load_json,
@@ -41,6 +40,12 @@ from app.errors import (
InvalidRequest InvalidRequest
) )
from app.utils import url_with_token from app.utils import url_with_token
from app.user.users_schema import (
post_verify_code_schema,
post_send_user_sms_code_schema,
post_send_user_email_code_schema,
)
from app.schema_validation import validate
user_blueprint = Blueprint('user', __name__) user_blueprint = Blueprint('user', __name__)
register_errors(user_blueprint) register_errors(user_blueprint)
@@ -115,24 +120,12 @@ def verify_user_password(user_id):
@user_blueprint.route('/<uuid:user_id>/verify/code', methods=['POST']) @user_blueprint.route('/<uuid:user_id>/verify/code', methods=['POST'])
def verify_user_code(user_id): def verify_user_code(user_id):
data = request.get_json()
validate(data, post_verify_code_schema)
user_to_verify = get_user_by_id(user_id=user_id) user_to_verify = get_user_by_id(user_id=user_id)
req_json = request.get_json() code = get_user_code(user_to_verify, data['code'], data['code_type'])
verify_code = None
code_type = None
errors = {}
try:
verify_code = req_json['code']
except KeyError:
errors.update({'code': ['Required field missing data']})
try:
code_type = req_json['code_type']
except KeyError:
errors.update({'code_type': ['Required field missing data']})
if errors:
raise InvalidRequest(errors, status_code=400)
code = get_user_code(user_to_verify, verify_code, code_type)
if user_to_verify.failed_login_count >= current_app.config.get('MAX_VERIFY_CODE_COUNT'): if user_to_verify.failed_login_count >= current_app.config.get('MAX_VERIFY_CODE_COUNT'):
raise InvalidRequest("Code not found", status_code=404) raise InvalidRequest("Code not found", status_code=404)
if not code: if not code:
@@ -142,11 +135,10 @@ def verify_user_code(user_id):
increment_failed_login_count(user_to_verify) increment_failed_login_count(user_to_verify)
raise InvalidRequest("Code has expired", status_code=400) raise InvalidRequest("Code has expired", status_code=400)
if code_type == 'sms': user_to_verify.current_session_id = str(uuid.uuid4())
user_to_verify.current_session_id = str(uuid.uuid4()) user_to_verify.logged_in_at = datetime.utcnow()
user_to_verify.logged_in_at = datetime.utcnow() user_to_verify.failed_login_count = 0
user_to_verify.failed_login_count = 0 save_model_user(user_to_verify)
save_model_user(user_to_verify)
use_user_code(code.id) use_user_code(code.id)
return jsonify({}), 204 return jsonify({}), 204
@@ -154,29 +146,59 @@ def verify_user_code(user_id):
@user_blueprint.route('/<uuid:user_id>/sms-code', methods=['POST']) @user_blueprint.route('/<uuid:user_id>/sms-code', methods=['POST'])
def send_user_sms_code(user_id): def send_user_sms_code(user_id):
user_to_send_to = get_user_by_id(user_id=user_id) data = request.get_json()
verify_code, errors = request_verify_code_schema.load(request.get_json()) user_to_send_to = validate_2fa_call(user_id, data, post_send_user_sms_code_schema)
if not user_to_send_to:
if count_user_verify_codes(user_to_send_to) >= current_app.config.get('MAX_VERIFY_CODE_COUNT'):
# Prevent more than `MAX_VERIFY_CODE_COUNT` active verify codes at a time
current_app.logger.warn('Max verify code has exceeded for user {}'.format(user_to_send_to.id))
return jsonify({}), 204 return jsonify({}), 204
secret_code = create_secret_code() secret_code = create_secret_code()
create_user_code(user_to_send_to, secret_code, SMS_TYPE) create_user_code(user_to_send_to, secret_code, SMS_TYPE)
mobile = user_to_send_to.mobile_number if verify_code.get('to', None) is None else verify_code.get('to') mobile = data.get('to') or user_to_send_to.mobile_number
sms_code_template_id = current_app.config['SMS_CODE_TEMPLATE_ID'] template = dao_get_template_by_id(current_app.config['SMS_CODE_TEMPLATE_ID'])
sms_code_template = dao_get_template_by_id(sms_code_template_id)
service = Service.query.get(current_app.config['NOTIFY_SERVICE_ID'])
personalisation = {'verify_code': secret_code},
create_2fa_code(template, mobile, personalisation)
return jsonify({}), 204
@user_blueprint.route('/<uuid:user_id>/email-code', methods=['POST'])
def send_user_email_code(user_id):
user_to_send_to = validate_2fa_call(user_id, request.get_json(), post_send_user_email_code_schema)
if not user_to_send_to:
return jsonify({}), 204
create_user_code(user_to_send_to, uuid.uuid4(), EMAIL_TYPE)
template = dao_get_template_by_id(current_app.config['EMAIL_CODE_TEMPLATE_ID'])
personalisation = {'url': _create_2fa_url(user_to_send_to)},
create_2fa_code(template, user_to_send_to.email_address, personalisation)
return '{}', 204
def validate_2fa_call(user_id, data, schema):
validate(data, schema)
user_to_send_to = get_user_by_id(user_id=user_id)
if count_user_verify_codes(user_to_send_to) >= current_app.config.get('MAX_VERIFY_CODE_COUNT'):
# Prevent more than `MAX_VERIFY_CODE_COUNT` active verify codes at a time
current_app.logger.warn('Max verify code has exceeded for user {}'.format(user_to_send_to.id))
return
return user_to_send_to
def create_2fa_code(template, recipient, personalisation):
saved_notification = persist_notification( saved_notification = persist_notification(
template_id=sms_code_template_id, template_id=template.id,
template_version=sms_code_template.version, template_version=template.version,
recipient=mobile, recipient=recipient,
service=service, service=template.service,
personalisation={'verify_code': secret_code}, personalisation=personalisation,
notification_type=SMS_TYPE, notification_type=template.template_type,
api_key_id=None, api_key_id=None,
key_type=KEY_TYPE_NORMAL key_type=KEY_TYPE_NORMAL
) )
@@ -185,8 +207,6 @@ def send_user_sms_code(user_id):
# admin even if we're doing user research using this service: # admin even if we're doing user research using this service:
send_notification_to_queue(saved_notification, False, queue=QueueNames.NOTIFY) send_notification_to_queue(saved_notification, False, queue=QueueNames.NOTIFY)
return jsonify({}), 204
@user_blueprint.route('/<uuid:user_id>/change-email-verification', methods=['POST']) @user_blueprint.route('/<uuid:user_id>/change-email-verification', methods=['POST'])
def send_user_confirm_new_email(user_id): def send_user_confirm_new_email(user_id):
@@ -219,6 +239,7 @@ def send_user_confirm_new_email(user_id):
@user_blueprint.route('/<uuid:user_id>/email-verification', methods=['POST']) @user_blueprint.route('/<uuid:user_id>/email-verification', methods=['POST'])
def send_user_email_verification(user_id): def send_user_email_verification(user_id):
# when registering, we verify all users' email addresses using this function
user_to_send_to = get_user_by_id(user_id=user_id) user_to_send_to = get_user_by_id(user_id=user_id)
secret_code = create_secret_code() secret_code = create_secret_code()
create_user_code(user_to_send_to, secret_code, 'email') create_user_code(user_to_send_to, secret_code, 'email')
@@ -361,3 +382,12 @@ def _create_confirmation_url(user, email_address):
data = json.dumps({'user_id': str(user.id), 'email': email_address}) data = json.dumps({'user_id': str(user.id), 'email': email_address})
url = '/user-profile/email/confirm/' url = '/user-profile/email/confirm/'
return url_with_token(data, url, current_app.config) return url_with_token(data, url, current_app.config)
def _create_2fa_url(user, next_redir=None):
data = json.dumps({'user_id': str(user.id), 'email': user.email_address})
url = '/email-auth/'
ret = url_with_token(data, url, current_app.config)
if next_redir:
ret += '?next={}'.format(next_redir)
return ret

35
app/user/users_schema.py Normal file
View File

@@ -0,0 +1,35 @@
post_verify_code_schema = {
'$schema': 'http://json-schema.org/draft-04/schema#',
'description': 'POST schema for verifying a 2fa code',
'type': 'object',
'properties': {
'code': {'type': 'string'},
'code_type': {'type': 'string'},
},
'required': ['code', 'code_type']
}
post_send_user_email_code_schema = {
'$schema': 'http://json-schema.org/draft-04/schema#',
'description': 'POST schema for generating a 2fa email',
'type': 'object',
'properties': {
# doesn't need 'to' as we'll just grab user.email_address
'next': {'type': ['string', 'null']},
},
'required': [],
'additionalProperties': []
}
post_send_user_sms_code_schema = {
'$schema': 'http://json-schema.org/draft-04/schema#',
'description': 'POST schema for generating a 2fa email',
'type': 'object',
'properties': {
'to': {'type': ['string', 'null']},
},
'required': [],
'additionalProperties': []
}

View File

@@ -115,6 +115,7 @@ def test_get_all_invited_users_by_service(client, notify_db, notify_db_session,
for invite in json_resp['data']: for invite in json_resp['data']:
assert invite['service'] == str(sample_service.id) assert invite['service'] == str(sample_service.id)
assert invite['from_user'] == str(invite_from.id) assert invite['from_user'] == str(invite_from.id)
assert invite['auth_type'] == SMS_AUTH_TYPE
assert invite['id'] assert invite['id']

View File

@@ -40,25 +40,6 @@ def test_user_verify_sms_code(client, sample_sms_code):
assert sample_sms_code.user.current_session_id is not None assert sample_sms_code.user.current_session_id is not None
@freeze_time('2016-01-01T12:00:00')
def test_user_verify_email_code(client, sample_email_code):
sample_email_code.user.logged_in_at = datetime.utcnow() - timedelta(days=1)
assert not VerifyCode.query.first().code_used
assert sample_email_code.user.current_session_id is None
data = json.dumps({
'code_type': sample_email_code.code_type,
'code': sample_email_code.txt_code})
auth_header = create_authorization_header()
resp = client.post(
url_for('user.verify_user_code', user_id=sample_email_code.user.id),
data=data,
headers=[('Content-Type', 'application/json'), auth_header])
assert resp.status_code == 204
assert VerifyCode.query.first().code_used
assert sample_email_code.user.logged_in_at == datetime.utcnow() - timedelta(days=1)
assert sample_email_code.user.current_session_id is None
def test_user_verify_code_missing_code(client, def test_user_verify_code_missing_code(client,
sample_sms_code): sample_sms_code):
assert not VerifyCode.query.first().code_used assert not VerifyCode.query.first().code_used
@@ -207,11 +188,9 @@ def test_send_user_sms_code(client,
assert resp.status_code == 204 assert resp.status_code == 204
assert mocked.call_count == 1 assert mocked.call_count == 1
assert VerifyCode.query.count() == 1 assert VerifyCode.query.one().check_code('11111')
assert VerifyCode.query.first().check_code('11111')
assert Notification.query.count() == 1 notification = Notification.query.one()
notification = Notification.query.first()
assert notification.personalisation == {'verify_code': '11111'} assert notification.personalisation == {'verify_code': '11111'}
assert notification.to == sample_user.mobile_number assert notification.to == sample_user.mobile_number
assert str(notification.service_id) == current_app.config['NOTIFY_SERVICE_ID'] assert str(notification.service_id) == current_app.config['NOTIFY_SERVICE_ID']