From 588730d59471b30deec2d3f95d354d36ecad0360 Mon Sep 17 00:00:00 2001 From: Rebecca Law Date: Thu, 10 Dec 2015 14:48:01 +0000 Subject: [PATCH] 109526036: Persist the verify code to the db. The codes are hashed and saved to the db. The code is marked as used once a valid code is submitted. The code is valid for 1 hour. The codes are no longer saved to the session. --- README | 24 ------ app/main/dao/verify_codes_dao.py | 41 +++++++++ app/main/forms.py | 18 +++- app/main/views/__init__.py | 7 +- app/main/views/register.py | 8 +- app/main/views/sign_in.py | 2 +- app/models.py | 13 +++ tests/app/main/__init__.py | 16 ++++ tests/app/main/dao/test_roles_dao.py | 6 +- tests/app/main/dao/test_users_dao.py | 23 +++-- tests/app/main/dao/test_verify_codes_dao.py | 62 +++++++++++++ tests/app/main/views/__init__.py | 16 ---- tests/app/main/views/test_register.py | 16 ++-- tests/app/main/views/test_sign_in.py | 12 ++- tests/app/main/views/test_two_factor.py | 24 +++--- tests/app/main/views/test_verify.py | 96 ++++++++++++++------- tests/conftest.py | 57 +++++++----- 17 files changed, 306 insertions(+), 135 deletions(-) delete mode 100644 README create mode 100644 app/main/dao/verify_codes_dao.py create mode 100644 tests/app/main/dao/test_verify_codes_dao.py diff --git a/README b/README deleted file mode 100644 index 1404d2bf4..000000000 --- a/README +++ /dev/null @@ -1,24 +0,0 @@ -This Python app creates a frontend skeleton based on Flask, Flask-Assets and the govuk_frontend_toolkit (and GOV.UK template). This renders pages with the GOV.UK style. Webpages can be coded using styles from GOV.UK Elements. - -govuk_frontend_toolkit is pulled in as a git submodule. -To get the contents of the submodule run the following git commands. -git submodule init -git submodule update - -INSTALLATION - -In a virtual environment, install the Python requirements: -pip install -r requirements.txt - -You will need sass installed: -bundle install - -RUNNING THE APP - -To run the app: -python app.py - -Initial URL - -http://localhost:5000/helloworld - -(template for this page lives in /templates/hello-world.html, which adds a content block to govuk_template.html) diff --git a/app/main/dao/verify_codes_dao.py b/app/main/dao/verify_codes_dao.py new file mode 100644 index 000000000..2d1a0605d --- /dev/null +++ b/app/main/dao/verify_codes_dao.py @@ -0,0 +1,41 @@ +from datetime import datetime, timedelta + +from app import db +from app.main.encryption import hashpw +from app.models import VerifyCodes + + +def add_code(user_id, code, code_type): + code = VerifyCodes(user_id=user_id, + code=hashpw(code), + code_type=code_type, + expiry_datetime=datetime.now() + timedelta(hours=1)) + + db.session.add(code) + db.session.commit() + + +def get_code(user_id, code_type): + verify_code = VerifyCodes.query.filter_by(user_id=user_id, code_type=code_type, code_used=False).first() + return verify_code + + +def get_code_by_code(user_id, code_type): + return VerifyCodes.query.filter_by(user_id=user_id, code_type=code_type).first() + + +def use_code(id): + verify_code = VerifyCodes.query.filter_by(id=id).first() + verify_code.code_used = True + db.session.add(verify_code) + db.session.commit() + + +def add_code_with_expiry(user_id, code, code_type, expiry): + code = VerifyCodes(user_id=user_id, + code=code, + code_type=code_type, + expiry_datetime=expiry) + + db.session.add(code) + db.session.commit() diff --git a/app/main/forms.py b/app/main/forms.py index ae90443c9..110e236c7 100644 --- a/app/main/forms.py +++ b/app/main/forms.py @@ -1,8 +1,11 @@ +from datetime import datetime + from flask import session from flask_wtf import Form from wtforms import StringField, PasswordField from wtforms.validators import DataRequired, Email, Length, Regexp +from app.main.dao import verify_codes_dao from app.main.encryption import checkpw from app.main.validators import Blacklist @@ -46,7 +49,8 @@ class TwoFactorForm(Form): Regexp(regex=verify_code, message='Code must be 5 digits')]) def validate_sms_code(self, a): - validate_code(self.sms_code, session['sms_code']) + code = verify_codes_dao.get_code(session['user_id'], 'sms') + validate_code(self.sms_code, code) class VerifyForm(Form): @@ -58,18 +62,24 @@ class VerifyForm(Form): Regexp(regex=verify_code, message='Code must be 5 digits')]) def validate_email_code(self, a): - validate_code(self.email_code, session['email_code']) + code = verify_codes_dao.get_code(session['user_id'], 'email') + validate_code(self.email_code, code) def validate_sms_code(self, a): - validate_code(self.sms_code, session['sms_code']) + code = verify_codes_dao.get_code(session['user_id'], 'sms') + validate_code(self.sms_code, code) def validate_code(field, code): + if code.expiry_datetime < datetime.now(): + field.errors.append('Code has expired') + return False if field.data is not None: - if checkpw(str(field.data), code) is False: + if checkpw(field.data, code.code) is False: field.errors.append('Code does not match') return False else: + verify_codes_dao.use_code(code.id) return True else: return True diff --git a/app/main/views/__init__.py b/app/main/views/__init__.py index 11bc7331b..e7db79518 100644 --- a/app/main/views/__init__.py +++ b/app/main/views/__init__.py @@ -1,24 +1,27 @@ from random import randint from app import admin_api_client from app.main.exceptions import AdminApiClientException +from app.main.dao import verify_codes_dao def create_verify_code(): return ''.join(["%s" % randint(0, 9) for _ in range(0, 5)]) -def send_sms_code(mobile_number): +def send_sms_code(user_id, mobile_number): sms_code = create_verify_code() try: + verify_codes_dao.add_code(user_id=user_id, code=sms_code, code_type='sms') admin_api_client.send_sms(mobile_number, message=sms_code, token=admin_api_client.auth_token) except: raise AdminApiClientException('Exception when sending sms.') return sms_code -def send_email_code(email): +def send_email_code(user_id, email): email_code = create_verify_code() try: + verify_codes_dao.add_code(user_id=user_id, code=email_code, code_type='email') admin_api_client.send_email(email_address=email, from_str='notify@digital.cabinet-office.gov.uk', message=email_code, diff --git a/app/main/views/register.py b/app/main/views/register.py index cde44b1c5..44476b2ca 100644 --- a/app/main/views/register.py +++ b/app/main/views/register.py @@ -29,12 +29,10 @@ def process_register(): created_at=datetime.now(), role_id=1) try: - sms_code = send_sms_code(form.mobile_number.data) - email_code = send_email_code(form.email_address.data) - session['sms_code'] = hashpw(sms_code) - session['email_code'] = hashpw(email_code) - session['expiry_date'] = str(datetime.now() + timedelta(hours=1)) users_dao.insert_user(user) + send_sms_code(user_id=user.id, mobile_number=form.mobile_number.data) + send_email_code(user_id=user.id, email=form.email_address.data) + session['expiry_date'] = str(datetime.now() + timedelta(hours=1)) session['user_id'] = user.id except AdminApiClientException as e: return jsonify(admin_api_client_error=e.value) diff --git a/app/main/views/sign_in.py b/app/main/views/sign_in.py index 8deebecd4..f7bfeccc7 100644 --- a/app/main/views/sign_in.py +++ b/app/main/views/sign_in.py @@ -26,7 +26,7 @@ def process_sign_in(): if not user.is_active(): return jsonify(active_user=False), 401 if checkpw(form.password.data, user.password): - sms_code = send_sms_code(user.mobile_number) + sms_code = send_sms_code(user.id, user.mobile_number) session['user_id'] = user.id session['sms_code'] = hashpw(sms_code) else: diff --git a/app/models.py b/app/models.py index 1006335bb..cb99c26a0 100644 --- a/app/models.py +++ b/app/models.py @@ -5,6 +5,19 @@ DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ" DATE_FORMAT = "%Y-%m-%d" +class VerifyCodes(db.Model): + __tablename__ = 'verify_codes' + + code_types = ['email', 'sms'] + + id = db.Column(db.Integer, primary_key=True) + user_id = db.Column(db.Integer, db.ForeignKey('users.id'), index=True, unique=False, nullable=False) + code = db.Column(db.String, nullable=False) + code_type = db.Column(db.Enum(code_types, name='verify_code_types'), index=False, unique=False, nullable=False) + expiry_datetime = db.Column(db.DateTime, nullable=False) + code_used = db.Column(db.Boolean, default=False) + + class Roles(db.Model): __tablename__ = 'roles' diff --git a/tests/app/main/__init__.py b/tests/app/main/__init__.py index e69de29bb..b1fbeaa07 100644 --- a/tests/app/main/__init__.py +++ b/tests/app/main/__init__.py @@ -0,0 +1,16 @@ +from datetime import datetime + +from app.main.dao import users_dao +from app.models import User + + +def create_test_user(): + user = User(name='Test User', + password='somepassword', + email_address='test@user.gov.uk', + mobile_number='+441234123412', + created_at=datetime.now(), + role_id=1, + state='pending') + users_dao.insert_user(user) + return user diff --git a/tests/app/main/dao/test_roles_dao.py b/tests/app/main/dao/test_roles_dao.py index 14d28102d..991e8620c 100644 --- a/tests/app/main/dao/test_roles_dao.py +++ b/tests/app/main/dao/test_roles_dao.py @@ -5,7 +5,7 @@ from app.models import Roles from app.main.dao import roles_dao -def test_insert_role_should_be_able_to_get_role(notifications_admin, notifications_admin_db): +def test_insert_role_should_be_able_to_get_role(notifications_admin, notifications_admin_db, notify_db_session): role = Roles(id=1000, role='some role for test') roles_dao.insert_role(role) @@ -13,7 +13,9 @@ def test_insert_role_should_be_able_to_get_role(notifications_admin, notificatio assert saved_role == role -def test_insert_role_will_throw_error_if_role_already_exists(notifications_admin, notifications_admin_db): +def test_insert_role_will_throw_error_if_role_already_exists(notifications_admin, + notifications_admin_db, + notify_db_session): role1 = roles_dao.get_role_by_id(1) assert role1.id == 1 diff --git a/tests/app/main/dao/test_users_dao.py b/tests/app/main/dao/test_users_dao.py index f6d8d5f87..10b01052f 100644 --- a/tests/app/main/dao/test_users_dao.py +++ b/tests/app/main/dao/test_users_dao.py @@ -7,7 +7,7 @@ from app.models import User from app.main.dao import users_dao -def test_insert_user_should_add_user(notifications_admin, notifications_admin_db): +def test_insert_user_should_add_user(notifications_admin, notifications_admin_db, notify_db_session): user = User(name='test insert', password='somepassword', email_address='test@insert.gov.uk', @@ -20,7 +20,9 @@ def test_insert_user_should_add_user(notifications_admin, notifications_admin_db assert saved_user == user -def test_insert_user_with_role_that_does_not_exist_fails(notifications_admin, notifications_admin_db): +def test_insert_user_with_role_that_does_not_exist_fails(notifications_admin, + notifications_admin_db, + notify_db_session): user = User(name='role does not exist', password='somepassword', email_address='test@insert.gov.uk', @@ -32,7 +34,7 @@ def test_insert_user_with_role_that_does_not_exist_fails(notifications_admin, no assert 'insert or update on table "users" violates foreign key constraint "users_role_id_fkey"' in str(error.value) -def test_get_user_by_email(notifications_admin, notifications_admin_db): +def test_get_user_by_email(notifications_admin, notifications_admin_db, notify_db_session): user = User(name='test_get_by_email', password='somepassword', email_address='email@example.gov.uk', @@ -45,7 +47,7 @@ def test_get_user_by_email(notifications_admin, notifications_admin_db): assert retrieved == user -def test_get_all_users_returns_all_users(notifications_admin, notifications_admin_db): +def test_get_all_users_returns_all_users(notifications_admin, notifications_admin_db, notify_db_session): user1 = User(name='test one', password='somepassword', email_address='test1@get_all.gov.uk', @@ -73,7 +75,9 @@ def test_get_all_users_returns_all_users(notifications_admin, notifications_admi assert users == [user1, user2, user3] -def test_increment_failed_lockout_count_should_increade_count_by_1(notifications_admin, notifications_admin_db): +def test_increment_failed_lockout_count_should_increade_count_by_1(notifications_admin, + notifications_admin_db, + notify_db_session): user = User(name='cannot remember password', password='somepassword', email_address='test1@get_all.gov.uk', @@ -88,7 +92,8 @@ def test_increment_failed_lockout_count_should_increade_count_by_1(notifications assert users_dao.get_user_by_id(user.id).failed_login_count == 1 -def test_user_is_locked_if_failed_login_count_is_10_or_greater(notifications_admin, notifications_admin_db): +def test_user_is_locked_if_failed_login_count_is_10_or_greater(notifications_admin, + notifications_admin_db, notify_db_session): user = User(name='cannot remember password', password='somepassword', email_address='test1@get_all.gov.uk', @@ -107,7 +112,7 @@ def test_user_is_locked_if_failed_login_count_is_10_or_greater(notifications_adm assert saved_user.is_locked() is True -def test_user_is_active_is_false_if_state_is_inactive(notifications_admin, notifications_admin_db): +def test_user_is_active_is_false_if_state_is_inactive(notifications_admin, notifications_admin_db, notify_db_session): user = User(name='inactive user', password='somepassword', email_address='test1@get_all.gov.uk', @@ -121,7 +126,7 @@ def test_user_is_active_is_false_if_state_is_inactive(notifications_admin, notif assert saved_user.is_active() is False -def test_should_update_user_to_active(notifications_admin, notifications_admin_db): +def test_should_update_user_to_active(notifications_admin, notifications_admin_db, notify_db_session): user = User(name='Make user active', password='somepassword', email_address='activate@user.gov.uk', @@ -135,7 +140,7 @@ def test_should_update_user_to_active(notifications_admin, notifications_admin_d assert updated_user.state == 'active' -def test_should_throws_error_when_id_does_not_exist(notifications_admin, notifications_admin_db): +def test_should_throws_error_when_id_does_not_exist(notifications_admin, notifications_admin_db, notify_db_session): with pytest.raises(AttributeError) as error: users_dao.activate_user(123) assert '''object has no attribute 'state''''' in str(error.value) diff --git a/tests/app/main/dao/test_verify_codes_dao.py b/tests/app/main/dao/test_verify_codes_dao.py new file mode 100644 index 000000000..f9b2f0cb8 --- /dev/null +++ b/tests/app/main/dao/test_verify_codes_dao.py @@ -0,0 +1,62 @@ +import sqlalchemy +from pytest import fail + +from app.main.dao import verify_codes_dao +from app.main.encryption import checkpw +from tests.app.main import create_test_user + + +def test_insert_new_code_and_get_it_back(notifications_admin, notifications_admin_db, notify_db_session): + user = create_test_user() + + verify_codes_dao.add_code(user_id=user.id, code='12345', code_type='email') + saved_code = verify_codes_dao.get_code(user_id=user.id, code_type='email') + assert saved_code.user_id == user.id + assert checkpw('12345', saved_code.code) + assert saved_code.code_type == 'email' + assert saved_code.code_used is False + + +def test_insert_new_code_should_thrw_exception_when_type_does_not_exist(notifications_admin, + notifications_admin_db, + notify_db_session): + user = create_test_user() + try: + verify_codes_dao.add_code(user_id=user.id, code='23545', code_type='not_real') + fail('Should have thrown an exception') + except sqlalchemy.exc.DataError as e: + assert 'invalid input value for enum verify_code_types: "not_real"' in e.orig.pgerror + + +def test_should_throw_exception_when_user_does_not_exist(notifications_admin, + notifications_admin_db, + notify_db_session): + try: + verify_codes_dao.add_code(user_id=1, code='12345', code_type='email') + fail('Should throw exception') + except sqlalchemy.exc.IntegrityError as e: + assert 'ERROR: insert or update on table "verify_codes" violates ' \ + 'foreign key constraint "verify_codes_user_id_fkey"' in e.orig.pgerror + + +def test_should_return_none_if_code_is_used(notifications_admin, + notifications_admin_db, + notify_db_session): + user = create_test_user() + + verify_codes_dao.add_code(user_id=user.id, code='12345', code_type='email') + verify_codes_dao.use_code(user_id=user.id, code='12345', code_type='email') + saved_code = verify_codes_dao.get_code_by_code(user_id=user.id, code_type='email', code='12345') + assert saved_code.code_used is True + + +def test_should_return_none_if_code_is_used(notifications_admin, + notifications_admin_db, + notify_db_session): + user = create_test_user() + + verify_codes_dao.add_code(user_id=user.id, code='12345', code_type='sms') + code = verify_codes_dao.get_code(user_id=user.id, code_type='sms') + verify_codes_dao.use_code(code.id) + code = verify_codes_dao.get_code(user_id=user.id, code_type='sms') + assert code is None diff --git a/tests/app/main/views/__init__.py b/tests/app/main/views/__init__.py index b1fbeaa07..e69de29bb 100644 --- a/tests/app/main/views/__init__.py +++ b/tests/app/main/views/__init__.py @@ -1,16 +0,0 @@ -from datetime import datetime - -from app.main.dao import users_dao -from app.models import User - - -def create_test_user(): - user = User(name='Test User', - password='somepassword', - email_address='test@user.gov.uk', - mobile_number='+441234123412', - created_at=datetime.now(), - role_id=1, - state='pending') - users_dao.insert_user(user) - return user diff --git a/tests/app/main/views/test_register.py b/tests/app/main/views/test_register.py index a40b93481..8fffb9d8a 100644 --- a/tests/app/main/views/test_register.py +++ b/tests/app/main/views/test_register.py @@ -1,13 +1,13 @@ -def test_render_register_returns_template_with_form(notifications_admin, notifications_admin_db): +def test_render_register_returns_template_with_form(notifications_admin, notifications_admin_db, notify_db_session): response = notifications_admin.test_client().get('/register') assert response.status_code == 200 assert 'Create an account' in response.get_data(as_text=True) -def test_process_register_creates_new_user(notifications_admin, notifications_admin_db, mocker): +def test_process_register_creates_new_user(notifications_admin, notifications_admin_db, mocker, notify_db_session): _set_up_mocker(mocker) response = notifications_admin.test_client().post('/register', @@ -21,7 +21,8 @@ def test_process_register_creates_new_user(notifications_admin, notifications_ad def test_process_register_returns_400_when_mobile_number_is_invalid(notifications_admin, notifications_admin_db, - mocker): + mocker, + notify_db_session): _set_up_mocker(mocker) response = notifications_admin.test_client().post('/register', data={'name': 'Bad Mobile', @@ -33,7 +34,10 @@ def test_process_register_returns_400_when_mobile_number_is_invalid(notification assert 'Please enter a +44 mobile number' in response.get_data(as_text=True) -def test_should_return_400_when_email_is_not_gov_uk(notifications_admin, notifications_admin_db, mocker): +def test_should_return_400_when_email_is_not_gov_uk(notifications_admin, + notifications_admin_db, + mocker, + notify_db_session): _set_up_mocker(mocker) response = notifications_admin.test_client().post('/register', data={'name': 'Bad Mobile', @@ -45,7 +49,7 @@ def test_should_return_400_when_email_is_not_gov_uk(notifications_admin, notific assert 'Please enter a gov.uk email address' in response.get_data(as_text=True) -def test_should_add_verify_codes_on_session(notifications_admin, notifications_admin_db, mocker): +def test_should_add_verify_codes_on_session(notifications_admin, notifications_admin_db, mocker, notify_db_session): _set_up_mocker(mocker) with notifications_admin.test_client() as client: response = client.post('/register', @@ -62,7 +66,7 @@ def _set_up_mocker(mocker): mocker.patch("app.admin_api_client.send_email") -def test_should_return_400_if_password_is_blacklisted(notifications_admin, notifications_admin_db): +def test_should_return_400_if_password_is_blacklisted(notifications_admin, notifications_admin_db, notify_db_session): response = notifications_admin.test_client().post('/register', data={'name': 'Bad Mobile', 'email_address': 'bad_mobile@example.not.right', diff --git a/tests/app/main/views/test_sign_in.py b/tests/app/main/views/test_sign_in.py index 0a68ec14c..bad65e5c9 100644 --- a/tests/app/main/views/test_sign_in.py +++ b/tests/app/main/views/test_sign_in.py @@ -13,7 +13,7 @@ def test_render_sign_in_returns_sign_in_template(notifications_admin): assert 'Forgotten password?' in response.get_data(as_text=True) -def test_process_sign_in_return_2fa_template(notifications_admin, notifications_admin_db, mocker): +def test_process_sign_in_return_2fa_template(notifications_admin, notifications_admin_db, mocker, notify_db_session): _set_up_mocker(mocker) user = User(email_address='valid@example.gov.uk', password='val1dPassw0rd!', @@ -29,7 +29,9 @@ def test_process_sign_in_return_2fa_template(notifications_admin, notifications_ assert response.location == 'http://localhost/two-factor' -def test_should_return_locked_out_true_when_user_is_locked(notifications_admin, notifications_admin_db): +def test_should_return_locked_out_true_when_user_is_locked(notifications_admin, + notifications_admin_db, + notify_db_session): user = User(email_address='valid@example.gov.uk', password='val1dPassw0rd!', mobile_number='+441234123123', @@ -56,7 +58,9 @@ def test_should_return_locked_out_true_when_user_is_locked(notifications_admin, assert '"locked_out": true' in response.get_data(as_text=True) -def test_should_return_active_user_is_false_if_user_is_inactive(notifications_admin, notifications_admin_db): +def test_should_return_active_user_is_false_if_user_is_inactive(notifications_admin, + notifications_admin_db, + notify_db_session): user = User(email_address='inactive_user@example.gov.uk', password='val1dPassw0rd!', mobile_number='+441234123123', @@ -74,7 +78,7 @@ def test_should_return_active_user_is_false_if_user_is_inactive(notifications_ad assert '"active_user": false' in response.get_data(as_text=True) -def test_should_return_401_when_user_does_not_exist(notifications_admin, notifications_admin_db): +def test_should_return_401_when_user_does_not_exist(notifications_admin, notifications_admin_db, notify_db_session): response = notifications_admin.test_client().post('/sign-in', data={'email_address': 'does_not_exist@gov.uk', 'password': 'doesNotExist!'}) diff --git a/tests/app/main/views/test_two_factor.py b/tests/app/main/views/test_two_factor.py index 56f85c151..8865219df 100644 --- a/tests/app/main/views/test_two_factor.py +++ b/tests/app/main/views/test_two_factor.py @@ -1,21 +1,21 @@ from flask import json -from app.main.encryption import hashpw -from tests.app.main.views import create_test_user +from app.main.dao import verify_codes_dao +from tests.app.main import create_test_user -def test_should_render_two_factor_page(notifications_admin, notifications_admin_db): +def test_should_render_two_factor_page(notifications_admin, notifications_admin_db, notify_db_session): response = notifications_admin.test_client().get('/two-factor') assert response.status_code == 200 assert '''We've sent you a text message with a verification code.''' in response.get_data(as_text=True) -def test_should_login_user_and_redirect_to_dashboard(notifications_admin, notifications_admin_db): +def test_should_login_user_and_redirect_to_dashboard(notifications_admin, notifications_admin_db, notify_db_session): with notifications_admin.test_client() as client: with client.session_transaction() as session: user = create_test_user() session['user_id'] = user.id - session['sms_code'] = hashpw('12345') + verify_codes_dao.add_code(user_id=user.id, code='12345', code_type='sms') response = client.post('/two-factor', data={'sms_code': '12345'}) @@ -23,35 +23,37 @@ def test_should_login_user_and_redirect_to_dashboard(notifications_admin, notifi assert response.location == 'http://localhost/dashboard' -def test_should_return_400_with_sms_code_error_when_sms_code_is_wrong(notifications_admin, notifications_admin_db): +def test_should_return_400_with_sms_code_error_when_sms_code_is_wrong(notifications_admin, + notifications_admin_db, + notify_db_session): with notifications_admin.test_client() as client: with client.session_transaction() as session: user = create_test_user() session['user_id'] = user.id - session['sms_code'] = hashpw('12345') + verify_codes_dao.add_code(user_id=user.id, code='12345', code_type='sms') response = client.post('/two-factor', data={'sms_code': '23456'}) assert response.status_code == 400 assert {'sms_code': ['Code does not match']} == json.loads(response.get_data(as_text=True)) -def test_should_return_400_when_sms_code_is_empty(notifications_admin, notifications_admin_db): +def test_should_return_400_when_sms_code_is_empty(notifications_admin, notifications_admin_db, notify_db_session): with notifications_admin.test_client() as client: with client.session_transaction() as session: user = create_test_user() session['user_id'] = user.id - session['sms_code'] = hashpw('12345') + verify_codes_dao.add_code(user_id=user.id, code='12345', code_type='sms') response = client.post('/two-factor') assert response.status_code == 400 assert {'sms_code': ['Please enter your code']} == json.loads(response.get_data(as_text=True)) -def test_should_return_400_when_sms_code_is_too_short(notifications_admin, notifications_admin_db): +def test_should_return_400_when_sms_code_is_too_short(notifications_admin, notifications_admin_db, notify_db_session): with notifications_admin.test_client() as client: with client.session_transaction() as session: user = create_test_user() session['user_id'] = user.id - session['sms_code'] = hashpw('23467') + verify_codes_dao.add_code(user_id=user.id, code='23467', code_type='sms') response = client.post('/two-factor', data={'sms_code': '2346'}) assert response.status_code == 400 data = json.loads(response.get_data(as_text=True)) diff --git a/tests/app/main/views/test_verify.py b/tests/app/main/views/test_verify.py index 3235d1206..04636c733 100644 --- a/tests/app/main/views/test_verify.py +++ b/tests/app/main/views/test_verify.py @@ -1,23 +1,26 @@ +from datetime import datetime, timedelta + from flask import json -from app.main.dao import users_dao -from app.main.encryption import hashpw -from tests.app.main.views import create_test_user +from app.main.dao import users_dao, verify_codes_dao +from tests.app.main import create_test_user -def test_should_return_verify_template(notifications_admin, notifications_admin_db): +def test_should_return_verify_template(notifications_admin, notifications_admin_db, notify_db_session): response = notifications_admin.test_client().get('/verify') assert response.status_code == 200 assert 'Activate your account' in response.get_data(as_text=True) -def test_should_redirect_to_add_service_when_code_are_correct(notifications_admin, notifications_admin_db): +def test_should_redirect_to_add_service_when_code_are_correct(notifications_admin, + notifications_admin_db, + notify_db_session): with notifications_admin.test_client() as client: with client.session_transaction() as session: user = create_test_user() session['user_id'] = user.id - session['sms_code'] = hashpw('12345') - session['email_code'] = hashpw('23456') + verify_codes_dao.add_code(user_id=user.id, code='12345', code_type='sms') + verify_codes_dao.add_code(user_id=user.id, code='23456', code_type='email') response = client.post('/verify', data={'sms_code': '12345', 'email_code': '23456'}) @@ -25,13 +28,13 @@ def test_should_redirect_to_add_service_when_code_are_correct(notifications_admi assert response.location == 'http://localhost/add-service' -def test_should_activate_user_after_verify(notifications_admin, notifications_admin_db): +def test_should_activate_user_after_verify(notifications_admin, notifications_admin_db, notify_db_session): with notifications_admin.test_client() as client: with client.session_transaction() as session: user = create_test_user() session['user_id'] = user.id - session['sms_code'] = hashpw('12345') - session['email_code'] = hashpw('23456') + verify_codes_dao.add_code(user_id=user.id, code='12345', code_type='sms') + verify_codes_dao.add_code(user_id=user.id, code='23456', code_type='email') client.post('/verify', data={'sms_code': '12345', 'email_code': '23456'}) @@ -40,13 +43,13 @@ def test_should_activate_user_after_verify(notifications_admin, notifications_ad assert after_verify.state == 'active' -def test_should_return_400_when_sms_code_is_wrong(notifications_admin, notifications_admin_db): +def test_should_return_400_when_sms_code_is_wrong(notifications_admin, notifications_admin_db, notify_db_session): with notifications_admin.test_client() as client: with client.session_transaction() as session: user = create_test_user() session['user_id'] = user.id - session['sms_code'] = hashpw('12345') - session['email_code'] = hashpw('23456') + verify_codes_dao.add_code(user_id=user.id, code='12345', code_type='sms') + verify_codes_dao.add_code(user_id=user.id, code='23456', code_type='email') response = client.post('/verify', data={'sms_code': '98765', 'email_code': '23456'}) @@ -54,13 +57,13 @@ def test_should_return_400_when_sms_code_is_wrong(notifications_admin, notificat assert {'sms_code': ['Code does not match']} == json.loads(response.get_data(as_text=True)) -def test_should_return_400_when_email_code_is_wrong(notifications_admin, notifications_admin_db): +def test_should_return_400_when_email_code_is_wrong(notifications_admin, notifications_admin_db, notify_db_session): with notifications_admin.test_client() as client: with client.session_transaction() as session: user = create_test_user() session['user_id'] = user.id - session['sms_code'] = hashpw('12345') - session['email_code'] = hashpw('98456') + verify_codes_dao.add_code(user_id=user.id, code='12345', code_type='sms') + verify_codes_dao.add_code(user_id=user.id, code='98456', code_type='email') response = client.post('/verify', data={'sms_code': '12345', 'email_code': '23456'}) @@ -68,39 +71,39 @@ def test_should_return_400_when_email_code_is_wrong(notifications_admin, notific assert {'email_code': ['Code does not match']} == json.loads(response.get_data(as_text=True)) -def test_should_return_400_when_sms_code_is_missing(notifications_admin, notifications_admin_db): +def test_should_return_400_when_sms_code_is_missing(notifications_admin, notifications_admin_db, notify_db_session): with notifications_admin.test_client() as client: with client.session_transaction() as session: user = create_test_user() session['user_id'] = user.id - session['sms_code'] = hashpw('12345') - session['email_code'] = hashpw('98456') + verify_codes_dao.add_code(user_id=user.id, code='12345', code_type='sms') + verify_codes_dao.add_code(user_id=user.id, code='98456', code_type='email') response = client.post('/verify', data={'email_code': '98456'}) assert response.status_code == 400 assert {'sms_code': ['SMS code can not be empty']} == json.loads(response.get_data(as_text=True)) -def test_should_return_400_when_email_code_is_missing(notifications_admin, notifications_admin_db): +def test_should_return_400_when_email_code_is_missing(notifications_admin, notifications_admin_db, notify_db_session): with notifications_admin.test_client() as client: with client.session_transaction() as session: user = create_test_user() session['user_id'] = user.id - session['sms_code'] = hashpw('23456') - session['email_code'] = hashpw('23456') + verify_codes_dao.add_code(user_id=user.id, code='23456', code_type='email') + verify_codes_dao.add_code(user_id=user.id, code='23456', code_type='sms') response = client.post('/verify', data={'sms_code': '23456'}) assert response.status_code == 400 assert {'email_code': ['Email code can not be empty']} == json.loads(response.get_data(as_text=True)) -def test_should_return_400_when_email_code_has_letter(notifications_admin, notifications_admin_db): +def test_should_return_400_when_email_code_has_letter(notifications_admin, notifications_admin_db, notify_db_session): with notifications_admin.test_client() as client: with client.session_transaction() as session: user = create_test_user() session['user_id'] = user.id - session['sms_code'] = hashpw('23456') - session['email_code'] = hashpw('23456') + verify_codes_dao.add_code(user_id=user.id, code='23456', code_type='email') + verify_codes_dao.add_code(user_id=user.id, code='23456', code_type='sms') response = client.post('/verify', data={'sms_code': '23456', 'email_code': 'abcde'}) @@ -112,13 +115,13 @@ def test_should_return_400_when_email_code_has_letter(notifications_admin, notif assert data['email_code'].sort() == expected['email_code'].sort() -def test_should_return_400_when_sms_code_is_too_short(notifications_admin, notifications_admin_db): +def test_should_return_400_when_sms_code_is_too_short(notifications_admin, notifications_admin_db, notify_db_session): with notifications_admin.test_client() as client: with client.session_transaction() as session: user = create_test_user() session['user_id'] = user.id - session['sms_code'] = hashpw('23456') - session['email_code'] = hashpw('23456') + verify_codes_dao.add_code(user_id=user.id, code='23456', code_type='email') + verify_codes_dao.add_code(user_id=user.id, code='23456', code_type='sms') response = client.post('/verify', data={'sms_code': '2345', 'email_code': '23456'}) @@ -130,15 +133,46 @@ def test_should_return_400_when_sms_code_is_too_short(notifications_admin, notif assert data['sms_code'].sort() == expected['sms_code'].sort() -def test_should_return_302_when_email_code_starts_with_zero(notifications_admin, notifications_admin_db): +def test_should_return_302_when_email_code_starts_with_zero(notifications_admin, + notifications_admin_db, + notify_db_session): with notifications_admin.test_client() as client: with client.session_transaction() as session: user = create_test_user() session['user_id'] = user.id - session['sms_code'] = hashpw('23456') - session['email_code'] = hashpw('09765') + verify_codes_dao.add_code(user_id=user.id, code='23456', code_type='sms') + verify_codes_dao.add_code(user_id=user.id, code='09765', code_type='email') response = client.post('/verify', data={'sms_code': '23456', 'email_code': '09765'}) assert response.status_code == 302 assert response.location == 'http://localhost/add-service' + + +def test_should_return_400_when_verify_code_has_expired(notifications_admin, + notifications_admin_db, + notify_db_session): + with notifications_admin.test_client() as client: + with client.session_transaction() as session: + user = create_test_user() + session['user_id'] = user.id + verify_codes_dao.add_code_with_expiry(user_id=user.id, + code='23456', + code_type='email', + expiry=datetime.now() + timedelta(hours=-2)) + verify_codes_dao.add_code_with_expiry(user_id=user.id, + code='23456', + code_type='sms', + expiry=datetime.now() + timedelta(hours=-2)) + response = client.post('/verify', + data={'sms_code': '23456', + 'email_code': '23456'}) + assert response.status_code == 400 + data = json.loads(response.get_data(as_text=True)) + expected = {'sms_code': ['Code has expired'], + 'email_code': ['Code has expired']} + assert len(data.keys()) == 2 + assert 'sms_code' in data + assert data['sms_code'].sort() == expected['sms_code'].sort() + assert 'email_code' in data + assert data['email_code'].sort() == expected['email_code'].sort() diff --git a/tests/conftest.py b/tests/conftest.py index 8712733e3..33e3ea9a3 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,14 +1,19 @@ +import os + import pytest -from _pytest.monkeypatch import monkeypatch -from sqlalchemy.schema import MetaData, DropConstraint +from alembic.command import upgrade +from alembic.config import Config +from flask.ext.migrate import Migrate, MigrateCommand +from flask.ext.script import Manager +from sqlalchemy.schema import MetaData from app import create_app, db -from app.models import Roles -@pytest.fixture(scope='function') +@pytest.fixture(scope='session') def notifications_admin(request): app = create_app('test') + ctx = app.app_context() ctx.push() @@ -19,22 +24,34 @@ def notifications_admin(request): return app -@pytest.fixture(scope='function') +@pytest.fixture(scope='session') def notifications_admin_db(notifications_admin, request): - metadata = MetaData(db.engine) - metadata.reflect() - for table in metadata.tables.values(): - for fk in table.foreign_keys: - db.engine.execute(DropConstraint(fk.constraint)) - metadata.drop_all() + Migrate(notifications_admin, db) + Manager(db, MigrateCommand) + BASE_DIR = os.path.dirname(os.path.dirname(__file__)) + ALEMBIC_CONFIG = os.path.join(BASE_DIR, 'migrations') + config = Config(ALEMBIC_CONFIG + '/alembic.ini') + config.set_main_option("script_location", ALEMBIC_CONFIG) - # Create the tables based on the current model - db.create_all() + with notifications_admin.app_context(): + upgrade(config, 'head') - # Add base data here - role = Roles(id=1, role='test_role') - db.session.add(role) - db.session.commit() - db.session.flush() - db.session.expunge_all() - db.session.commit() + def teardown(): + db.session.remove() + db.drop_all() + db.engine.execute("drop table alembic_version") + db.get_engine(notifications_admin).dispose() + + request.addfinalizer(teardown) + + +@pytest.fixture(scope='function') +def notify_db_session(request): + def teardown(): + db.session.remove() + for tbl in reversed(meta.sorted_tables): + if tbl.fullname not in ['roles']: + db.engine.execute(tbl.delete()) + + meta = MetaData(bind=db.engine, reflect=True) + request.addfinalizer(teardown)