From 0146e2d935ddacfd4a385e9f15649788244ebbe9 Mon Sep 17 00:00:00 2001 From: Leo Hemsted Date: Tue, 7 Nov 2017 16:15:49 +0000 Subject: [PATCH] attempt to reduce code duplication --- app/user/rest.py | 83 ++++++++++++++++-------------- tests/app/user/test_rest_verify.py | 17 +++--- 2 files changed, 55 insertions(+), 45 deletions(-) diff --git a/app/user/rest.py b/app/user/rest.py index 49d018354..b6c2c28e0 100644 --- a/app/user/rest.py +++ b/app/user/rest.py @@ -3,7 +3,7 @@ import uuid from datetime import datetime from urllib.parse import urlencode -from flask import (jsonify, request, Blueprint, current_app) +from flask import (jsonify, request, Blueprint, current_app, abort) from app.config import QueueNames from app.dao.users_dao import ( @@ -23,7 +23,7 @@ from app.dao.users_dao import ( from app.dao.permissions_dao import permission_dao from app.dao.services_dao import dao_fetch_service_by_id from app.dao.templates_dao import dao_get_template_by_id -from app.models import SMS_TYPE, KEY_TYPE_NORMAL, EMAIL_TYPE, Service +from app.models import KEY_TYPE_NORMAL, Service, SMS_TYPE, EMAIL_TYPE from app.notifications.process_notifications import ( persist_notification, send_notification_to_queue @@ -147,59 +147,66 @@ def verify_user_code(user_id): return jsonify({}), 204 -@user_blueprint.route('//sms-code', methods=['POST']) -def send_user_sms_code(user_id): - data = request.get_json() - user_to_send_to = validate_2fa_call(user_id, data, post_send_user_sms_code_schema) - if not user_to_send_to: - return jsonify({}), 204 +@user_blueprint.route('//-code', methods=['POST']) +def send_user_2fa_code(user_id, code_type): + user_to_send_to = get_user_by_id(user_id=user_id) + + if count_user_verify_codes(user_to_send_to) >= current_app.config.get('MAX_VERIFY_CODE_COUNT'): + # Prevent more than `MAX_VERIFY_CODE_COUNT` active verify codes at a time + current_app.logger.warn('Too many verify codes created for user {}'.format(user_to_send_to.id)) + else: + data = request.get_json() + if code_type == SMS_TYPE: + validate(data, post_send_user_sms_code_schema) + send_user_sms_code(user_to_send_to, data) + elif code_type == EMAIL_TYPE: + validate(data, post_send_user_email_code_schema) + send_user_email_code(user_to_send_to, data) + else: + abort(404) + + return '{}', 204 + + +def send_user_sms_code(user_to_send_to, data): + recipient = data.get('to') or user_to_send_to.mobile_number secret_code = create_secret_code() - create_user_code(user_to_send_to, secret_code, SMS_TYPE) - - mobile = data.get('to') or user_to_send_to.mobile_number - template = dao_get_template_by_id(current_app.config['SMS_CODE_TEMPLATE_ID']) - personalisation = {'verify_code': secret_code} - create_2fa_code(template, mobile, personalisation) - return jsonify({}), 204 + create_2fa_code( + current_app.config['SMS_CODE_TEMPLATE_ID'], + user_to_send_to, + secret_code, + recipient, + personalisation + ) -@user_blueprint.route('//email-code', methods=['POST']) -def send_user_email_code(user_id): - data = request.get_json() - user_to_send_to = validate_2fa_call(user_id, data, post_send_user_email_code_schema) - if not user_to_send_to: - return jsonify({}), 204 +def send_user_email_code(user_to_send_to, data): + recipient = user_to_send_to.email_address secret_code = str(uuid.uuid4()) - create_user_code(user_to_send_to, secret_code, EMAIL_TYPE) - - template = dao_get_template_by_id(current_app.config['EMAIL_2FA_TEMPLATE_ID']) personalisation = { 'name': user_to_send_to.name, 'url': _create_2fa_url(user_to_send_to, secret_code, data.get('next')) } - create_2fa_code(template, user_to_send_to.email_address, personalisation) - - return '{}', 204 + create_2fa_code( + current_app.config['EMAIL_2FA_TEMPLATE_ID'], + user_to_send_to, + secret_code, + recipient, + personalisation + ) -def validate_2fa_call(user_id, data, schema): - validate(data, schema) - user_to_send_to = get_user_by_id(user_id=user_id) +def create_2fa_code(template_id, user_to_send_to, secret_code, recipient, personalisation): + template = dao_get_template_by_id(template_id) - if count_user_verify_codes(user_to_send_to) >= current_app.config.get('MAX_VERIFY_CODE_COUNT'): - # Prevent more than `MAX_VERIFY_CODE_COUNT` active verify codes at a time - current_app.logger.warn('Max verify code has exceeded for user {}'.format(user_to_send_to.id)) - return + # save the code in the VerifyCode table + create_user_code(user_to_send_to, secret_code, template.template_type) - return user_to_send_to - - -def create_2fa_code(template, recipient, personalisation): saved_notification = persist_notification( template_id=template.id, template_version=template.version, diff --git a/tests/app/user/test_rest_verify.py b/tests/app/user/test_rest_verify.py index 4d83ee9f1..76a9cf204 100644 --- a/tests/app/user/test_rest_verify.py +++ b/tests/app/user/test_rest_verify.py @@ -185,7 +185,7 @@ def test_send_user_sms_code(client, mocker.patch('app.celery.provider_tasks.deliver_sms.apply_async') resp = client.post( - url_for('user.send_user_sms_code', user_id=sample_user.id), + url_for('user.send_user_2fa_code', code_type='sms', user_id=sample_user.id), data=json.dumps({}), headers=[('Content-Type', 'application/json'), auth_header]) assert resp.status_code == 204 @@ -218,7 +218,7 @@ def test_send_user_code_for_sms_with_optional_to_field(client, auth_header = create_authorization_header() resp = client.post( - url_for('user.send_user_sms_code', user_id=sample_user.id), + url_for('user.send_user_2fa_code', code_type='sms', user_id=sample_user.id), data=json.dumps({'to': to_number}), headers=[('Content-Type', 'application/json'), auth_header]) @@ -236,7 +236,7 @@ def test_send_sms_code_returns_404_for_bad_input_data(client): uuid_ = uuid.uuid4() auth_header = create_authorization_header() resp = client.post( - url_for('user.send_user_sms_code', user_id=uuid_), + url_for('user.send_user_2fa_code', code_type='sms', user_id=uuid_), data=json.dumps({}), headers=[('Content-Type', 'application/json'), auth_header]) assert resp.status_code == 404 @@ -257,7 +257,7 @@ def test_send_sms_code_returns_204_when_too_many_codes_already_created(client, s assert VerifyCode.query.count() == 10 auth_header = create_authorization_header() resp = client.post( - url_for('user.send_user_sms_code', user_id=sample_user.id), + url_for('user.send_user_2fa_code', code_type='sms', user_id=sample_user.id), data=json.dumps({}), headers=[('Content-Type', 'application/json'), auth_header]) assert resp.status_code == 204 @@ -347,7 +347,8 @@ def test_send_user_email_code(admin_request, mocker, sample_user, email_2fa_code 'to': None } admin_request.post( - 'user.send_user_email_code', + 'user.send_user_2fa_code', + code_type='email', user_id=sample_user.id, _data=data, _expected_status=204 @@ -370,7 +371,8 @@ def test_send_user_email_code_with_urlencoded_next_param(admin_request, mocker, 'next': '/services' } admin_request.post( - 'user.send_user_email_code', + 'user.send_user_2fa_code', + code_type='email', user_id=sample_user.id, _data=data, _expected_status=204 @@ -382,7 +384,8 @@ def test_send_user_email_code_with_urlencoded_next_param(admin_request, mocker, def test_send_email_code_returns_404_for_bad_input_data(admin_request): resp = admin_request.post( - 'user.send_user_email_code', + 'user.send_user_2fa_code', + code_type='email', user_id=uuid.uuid4(), _data={}, _expected_status=404