diff --git a/app/config.py b/app/config.py index 125b91834..71a1e21ad 100644 --- a/app/config.py +++ b/app/config.py @@ -43,6 +43,7 @@ class Config(object): } EMAIL_EXPIRY_SECONDS = 3600 # 1 hour INVITATION_EXPIRY_SECONDS = 3600 * 24 * 2 # 2 days - also set on api + EMAIL_2FA_EXPIRY_SECONDS = 1800 # 30 Minutes HEADER_COLOUR = '#FFBF47' # $yellow HTTP_PROTOCOL = 'http' MAX_FAILED_LOGIN_COUNT = 10 diff --git a/app/main/views/code_not_received.py b/app/main/views/code_not_received.py index 2bef44abb..d8316a2ea 100644 --- a/app/main/views/code_not_received.py +++ b/app/main/views/code_not_received.py @@ -46,3 +46,17 @@ def check_and_resend_verification_code(): return redirect(url_for('main.verify')) else: return redirect(url_for('main.two_factor')) + + +@main.route('/email-not-received', methods=['GET']) +@redirect_to_sign_in +def email_not_received(): + return render_template('views/email-not-received.html') + + +@main.route('/send-new-email-token', methods=['GET']) +@redirect_to_sign_in +def resend_email_link(): + user_api_client.send_verify_code(session['user_details']['id'], 'email', None) + session.pop('user_details') + return redirect(url_for('main.two_factor_email_sent', email_resent=True)) diff --git a/app/main/views/sign_in.py b/app/main/views/sign_in.py index 69fc36a29..8721c7451 100644 --- a/app/main/views/sign_in.py +++ b/app/main/views/sign_in.py @@ -48,11 +48,11 @@ def sign_in(): if user: session['user_details'] = {"email": user.email_address, "id": user.id} if user.is_active: - user_api_client.send_verify_code(user.id, 'sms', user.mobile_number) - if request.args.get('next'): - return redirect(url_for('.two_factor', next=request.args.get('next'))) + if user.auth_type == "email_auth": + return sign_in_email(user.id, user.email_address) else: - return redirect(url_for('.two_factor')) + return sign_in_sms(user.id, user.mobile_number) + # Vague error message for login in case of user not known, locked, inactive or password not verified flash(Markup( ( @@ -70,6 +70,22 @@ def sign_in(): ) +def sign_in_email(user_id, to): + if request.args.get('next'): + user_api_client.send_verify_code(user_id, 'email', None, request.args.get('next')) + else: + user_api_client.send_verify_code(user_id, 'email', None) + return redirect(url_for('.two_factor_email_sent')) + + +def sign_in_sms(user_id, to): + user_api_client.send_verify_code(user_id, 'sms', to) + if request.args.get('next'): + return redirect(url_for('.two_factor', next=request.args.get('next'))) + else: + return redirect(url_for('.two_factor')) + + @login_manager.unauthorized_handler def sign_in_again(): return redirect( diff --git a/app/main/views/two_factor.py b/app/main/views/two_factor.py index a531a376f..a95ca4494 100644 --- a/app/main/views/two_factor.py +++ b/app/main/views/two_factor.py @@ -1,16 +1,64 @@ +import json from flask import ( render_template, redirect, session, url_for, - request + request, + current_app, + flash ) - from flask_login import login_user, current_user from app.main import main from app.main.forms import TwoFactorForm from app import service_api_client, user_api_client from app.utils import redirect_to_sign_in +from notifications_utils.url_safe_token import check_token +from itsdangerous import SignatureExpired + + +@main.route('/two-factor-email-sent', methods=['GET']) +def two_factor_email_sent(): + title = 'Email resent' if request.args.get('email_resent') else 'Check your email' + return render_template( + 'views/two-factor-email.html', + title=title + ) + + +@main.route('/email-auth/', methods=['GET']) +def two_factor_email(token): + if current_user.is_authenticated: + return redirect_when_logged_in(current_user.id) + + # checks url is valid, and hasn't timed out + try: + token_data = json.loads(check_token( + token, + current_app.config['SECRET_KEY'], + current_app.config['DANGEROUS_SALT'], + current_app.config['EMAIL_2FA_EXPIRY_SECONDS'] + )) + except SignatureExpired as exc: + # lets decode again, without the expiry, to get the user id out + orig_data = json.loads(check_token( + token, + current_app.config['SECRET_KEY'], + current_app.config['DANGEROUS_SALT'], + None + )) + session['user_details'] = {'id': orig_data['user_id']} + flash("The link in the email we sent you has expired. We’ve sent you a new one.") + return redirect(url_for('.resend_email_link')) + + user_id = token_data['user_id'] + # checks if code was already used + logged_in, msg = user_api_client.check_verify_code(user_id, token_data['secret_code'], "email") + + if not logged_in: + flash("There’s something wrong with the code") + return redirect(url_for('.two_factor_email_sent')) + return log_in_user(user_id) @main.route('/two-factor', methods=['GET', 'POST']) @@ -24,29 +72,7 @@ def two_factor(): form = TwoFactorForm(_check_code) if form.validate_on_submit(): - try: - user = user_api_client.get_user(user_id) - # the user will have a new current_session_id set by the API - store it in the cookie for future requests - session['current_session_id'] = user.current_session_id - services = service_api_client.get_active_services({'user_id': str(user_id)}).get('data', []) - # Check if coming from new password page - if 'password' in session['user_details']: - user = user_api_client.update_password(user.id, password=session['user_details']['password']) - activated_user = user_api_client.activate_user(user) - login_user(activated_user) - finally: - del session['user_details'] - - next_url = request.args.get('next') - if next_url and _is_safe_redirect_url(next_url): - return redirect(next_url) - - if current_user.platform_admin: - return redirect(url_for('main.platform_admin')) - if len(services) == 1: - return redirect(url_for('main.service_dashboard', service_id=services[0]['id'])) - else: - return redirect(url_for('main.choose_service')) + return log_in_user(user_id) return render_template('views/two-factor.html', form=form) @@ -58,3 +84,34 @@ def _is_safe_redirect_url(target): redirect_url = urlparse(urljoin(request.host_url, target)) return redirect_url.scheme in ('http', 'https') and \ host_url.netloc == redirect_url.netloc + + +def log_in_user(user_id): + try: + user = user_api_client.get_user(user_id) + # the user will have a new current_session_id set by the API - store it in the cookie for future requests + session['current_session_id'] = user.current_session_id + # Check if coming from new password page + if 'password' in session.get('user_details', {}): + user = user_api_client.update_password(user.id, password=session['user_details']['password']) + activated_user = user_api_client.activate_user(user) + login_user(activated_user) + finally: + session.pop("user_details", None) + + return redirect_when_logged_in(user_id) + + +def redirect_when_logged_in(user_id): + next_url = request.args.get('next') + if next_url and _is_safe_redirect_url(next_url): + return redirect(next_url) + if current_user.platform_admin: + return redirect(url_for('main.platform_admin')) + + services = service_api_client.get_active_services({'user_id': str(user_id)}).get('data', []) + + if len(services) == 1: + return redirect(url_for('main.service_dashboard', service_id=services[0]['id'])) + else: + return redirect(url_for('main.choose_service')) diff --git a/app/notify_client/user_api_client.py b/app/notify_client/user_api_client.py index 8784673f6..2579b5e5b 100644 --- a/app/notify_client/user_api_client.py +++ b/app/notify_client/user_api_client.py @@ -88,8 +88,10 @@ class UserApiClient(NotifyAdminAPIClient): if e.status_code == 400 or e.status_code == 404: return False - def send_verify_code(self, user_id, code_type, to): + def send_verify_code(self, user_id, code_type, to, next_string=None): data = {'to': to} + if next_string: + data['next'] = next_string endpoint = '/user/{0}/{1}-code'.format(user_id, code_type) self.post(endpoint, data=data) diff --git a/app/templates/views/email-not-received.html b/app/templates/views/email-not-received.html new file mode 100644 index 000000000..4b5548628 --- /dev/null +++ b/app/templates/views/email-not-received.html @@ -0,0 +1,24 @@ +{% extends "withoutnav_template.html" %} +{% from "components/page-footer.html" import page_footer %} + +{% block per_page_title %} + Resend email link +{% endblock %} + +{% block maincolumn_content %} + +
+
+

Resend email link

+ +

Email messages sometimes take a few minutes to arrive. If you do not recieve the email, you can resend it.

+

If you no longer have access to the email address you registered for this service, speak to your service manager to reset the email.

+ +

+ Resend email link +

+ +
+
+ +{% endblock %} \ No newline at end of file diff --git a/app/templates/views/two-factor-email.html b/app/templates/views/two-factor-email.html new file mode 100644 index 000000000..0d6278534 --- /dev/null +++ b/app/templates/views/two-factor-email.html @@ -0,0 +1,21 @@ +{% extends "withoutnav_template.html" %} +{% from "components/page-footer.html" import page_footer %} + +{% block per_page_title %} + Email verification +{% endblock %} + +{% block maincolumn_content %} + +
+
+

{{ title }}

+

We’ve sent you an email with your login link

+ {{ page_footer( + secondary_link=url_for('main.email_not_received'), + secondary_link_text='Not received an email?' + ) }} +
+
+ +{% endblock %} \ No newline at end of file diff --git a/tests/app/main/views/test_sign_in.py b/tests/app/main/views/test_sign_in.py index a8d2f1d93..97b1956ac 100644 --- a/tests/app/main/views/test_sign_in.py +++ b/tests/app/main/views/test_sign_in.py @@ -77,7 +77,7 @@ def test_logged_in_user_redirects_to_choose_service( assert response.location == url_for('main.choose_service', _external=True) -def test_process_sign_in_return_2fa_template( +def test_process_sms_auth_sign_in_return_2fa_template( client, api_user_active, mock_send_verify_code, @@ -94,6 +94,25 @@ def test_process_sign_in_return_2fa_template( mock_verify_password.assert_called_with(api_user_active.id, 'val1dPassw0rd!') +def test_process_email_auth_sign_in_return_2fa_template( + client, + api_user_active_email_auth, + mock_send_verify_code, + mock_verify_password, + mocker +): + mocker.patch('app.user_api_client.get_user', return_value=api_user_active_email_auth) + mocker.patch('app.user_api_client.get_user_by_email', return_value=api_user_active_email_auth) + + response = client.post( + url_for('main.sign_in'), data={ + 'email_address': 'valid@example.gov.uk', + 'password': 'val1dPassw0rd!'}) + assert response.status_code == 302 + assert response.location == url_for('.two_factor_email_sent', _external=True) + mock_verify_password.assert_called_with(api_user_active_email_auth.id, 'val1dPassw0rd!') + + def test_should_return_locked_out_true_when_user_is_locked( client, mock_get_user_by_email_locked, diff --git a/tests/app/main/views/test_two_factor.py b/tests/app/main/views/test_two_factor.py index 5d7223709..97d9ef939 100644 --- a/tests/app/main/views/test_two_factor.py +++ b/tests/app/main/views/test_two_factor.py @@ -1,8 +1,10 @@ -from flask import url_for -from tests.conftest import SERVICE_ONE_ID - from unittest.mock import ANY +from flask import url_for +from bs4 import BeautifulSoup + +from tests.conftest import SERVICE_ONE_ID, normalize_spaces, set_config + def test_should_render_two_factor_page( client, @@ -213,3 +215,108 @@ def test_two_factor_should_activate_pending_user( assert mock_activate_user.called assert api_user_pending.is_active + + +def test_valid_two_factor_email_link_logs_in_user( + client, + valid_token, + mock_get_user, + mock_get_services_with_one_service, + mocker +): + mocker.patch('app.user_api_client.check_verify_code', return_value=(True, '')) + + response = client.get( + url_for('main.two_factor_email', token=valid_token), + ) + + assert response.status_code == 302 + assert response.location == url_for('main.service_dashboard', service_id=SERVICE_ONE_ID, _external=True) + + +def test_two_factor_email_link_has_expired( + app_, + valid_token, + client, + mock_send_verify_code, + fake_uuid +): + + with set_config(app_, 'EMAIL_2FA_EXPIRY_SECONDS', -1): + response = client.get( + url_for('main.two_factor_email', token=valid_token), + follow_redirects=True, + ) + + assert response.status_code == 200 + page = BeautifulSoup(response.data.decode('utf-8'), 'html.parser') + + assert normalize_spaces( + page.select_one('.banner-dangerous').text + ) == "The link in the email we sent you has expired. We’ve sent you a new one." + assert page.h1.text.strip() == 'Email resent' + mock_send_verify_code.assert_called_once_with(fake_uuid, 'email', None) + + +def test_two_factor_email_link_is_invalid( + client +): + token = 12345 + response = client.get( + url_for('main.two_factor_email', token=token), + follow_redirects=True + ) + page = BeautifulSoup(response.data.decode('utf-8'), 'html.parser') + assert normalize_spaces( + page.select_one('.banner-dangerous').text + ) == "There’s something wrong with the link you’ve used." + assert response.status_code == 404 + + +def test_two_factor_email_link_is_already_used( + client, + valid_token, + mocker +): + mocker.patch('app.user_api_client.check_verify_code', return_value=(False, 'Code has expired')) + + response = client.get( + url_for('main.two_factor_email', token=valid_token), + follow_redirects=True + ) + + page = BeautifulSoup(response.data.decode('utf-8'), 'html.parser') + assert normalize_spaces( + page.select_one('.banner-dangerous').text + ) == "There’s something wrong with the code" + assert response.status_code == 200 + + +def test_two_factor_email_link_when_user_is_locked_out( + client, + valid_token, + mocker +): + mocker.patch('app.user_api_client.check_verify_code', return_value=(False, 'Code not found')) + + response = client.get( + url_for('main.two_factor_email', token=valid_token), + follow_redirects=True + ) + + page = BeautifulSoup(response.data.decode('utf-8'), 'html.parser') + assert normalize_spaces( + page.select_one('.banner-dangerous').text + ) == "There’s something wrong with the code" + assert response.status_code == 200 + + +def test_two_factor_email_link_used_when_user_already_logged_in( + logged_in_client, + valid_token +): + response = logged_in_client.get( + url_for('main.two_factor_email', token=valid_token) + ) + assert response.status_code == 302 + assert response.location == url_for('main.choose_service', _external=True) diff --git a/tests/conftest.py b/tests/conftest.py index 82e840cce..2a1958781 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -28,6 +28,9 @@ from . import ( single_notification_json ) +from notifications_utils.url_safe_token import generate_token +import json + @pytest.fixture(scope='session') def app_(request): @@ -1089,6 +1092,25 @@ def api_user_active(fake_uuid, email_address='test@user.gov.uk'): return user +@pytest.fixture(scope='function') +def api_user_active_email_auth(fake_uuid, email_address='test@user.gov.uk'): + from app.notify_client.user_api_client import User + user_data = {'id': fake_uuid, + 'name': 'Test User', + 'password': 'somepassword', + 'email_address': email_address, + 'mobile_number': '07700 900762', + 'state': 'active', + 'failed_login_count': 0, + 'permissions': {}, + 'platform_admin': False, + 'auth_type': 'email_auth', + 'password_changed_at': str(datetime.utcnow()) + } + user = User(user_data) + return user + + @pytest.fixture(scope='function') def api_nongov_user_active(fake_uuid): from app.notify_client.user_api_client import User @@ -2425,3 +2447,20 @@ def mock_get_aggregate_platform_stats(mocker): } return mocker.patch('app.service_api_client.get_aggregate_platform_stats', return_value=stats) + + +@contextmanager +def set_config(app, name, value): + old_val = app.config.get(name) + app.config[name] = value + yield + app.config[name] = old_val + + +@pytest.fixture(scope='function') +def valid_token(app_, fake_uuid): + return generate_token( + json.dumps({'user_id': fake_uuid, 'secret_code': 'my secret'}), + app_.config['SECRET_KEY'], + app_.config['DANGEROUS_SALT'] + )