109526036: Persist the verify code to the db.

The codes are hashed and saved to the db.
The code is marked as used once a valid code is submitted.
The code is valid for 1 hour.
The codes are no longer saved to the session.
This commit is contained in:
Rebecca Law
2015-12-10 14:48:01 +00:00
parent 3b327c9986
commit 588730d594
17 changed files with 306 additions and 135 deletions

View File

@@ -0,0 +1,16 @@
from datetime import datetime
from app.main.dao import users_dao
from app.models import User
def create_test_user():
user = User(name='Test User',
password='somepassword',
email_address='test@user.gov.uk',
mobile_number='+441234123412',
created_at=datetime.now(),
role_id=1,
state='pending')
users_dao.insert_user(user)
return user

View File

@@ -5,7 +5,7 @@ from app.models import Roles
from app.main.dao import roles_dao
def test_insert_role_should_be_able_to_get_role(notifications_admin, notifications_admin_db):
def test_insert_role_should_be_able_to_get_role(notifications_admin, notifications_admin_db, notify_db_session):
role = Roles(id=1000, role='some role for test')
roles_dao.insert_role(role)
@@ -13,7 +13,9 @@ def test_insert_role_should_be_able_to_get_role(notifications_admin, notificatio
assert saved_role == role
def test_insert_role_will_throw_error_if_role_already_exists(notifications_admin, notifications_admin_db):
def test_insert_role_will_throw_error_if_role_already_exists(notifications_admin,
notifications_admin_db,
notify_db_session):
role1 = roles_dao.get_role_by_id(1)
assert role1.id == 1

View File

@@ -7,7 +7,7 @@ from app.models import User
from app.main.dao import users_dao
def test_insert_user_should_add_user(notifications_admin, notifications_admin_db):
def test_insert_user_should_add_user(notifications_admin, notifications_admin_db, notify_db_session):
user = User(name='test insert',
password='somepassword',
email_address='test@insert.gov.uk',
@@ -20,7 +20,9 @@ def test_insert_user_should_add_user(notifications_admin, notifications_admin_db
assert saved_user == user
def test_insert_user_with_role_that_does_not_exist_fails(notifications_admin, notifications_admin_db):
def test_insert_user_with_role_that_does_not_exist_fails(notifications_admin,
notifications_admin_db,
notify_db_session):
user = User(name='role does not exist',
password='somepassword',
email_address='test@insert.gov.uk',
@@ -32,7 +34,7 @@ def test_insert_user_with_role_that_does_not_exist_fails(notifications_admin, no
assert 'insert or update on table "users" violates foreign key constraint "users_role_id_fkey"' in str(error.value)
def test_get_user_by_email(notifications_admin, notifications_admin_db):
def test_get_user_by_email(notifications_admin, notifications_admin_db, notify_db_session):
user = User(name='test_get_by_email',
password='somepassword',
email_address='email@example.gov.uk',
@@ -45,7 +47,7 @@ def test_get_user_by_email(notifications_admin, notifications_admin_db):
assert retrieved == user
def test_get_all_users_returns_all_users(notifications_admin, notifications_admin_db):
def test_get_all_users_returns_all_users(notifications_admin, notifications_admin_db, notify_db_session):
user1 = User(name='test one',
password='somepassword',
email_address='test1@get_all.gov.uk',
@@ -73,7 +75,9 @@ def test_get_all_users_returns_all_users(notifications_admin, notifications_admi
assert users == [user1, user2, user3]
def test_increment_failed_lockout_count_should_increade_count_by_1(notifications_admin, notifications_admin_db):
def test_increment_failed_lockout_count_should_increade_count_by_1(notifications_admin,
notifications_admin_db,
notify_db_session):
user = User(name='cannot remember password',
password='somepassword',
email_address='test1@get_all.gov.uk',
@@ -88,7 +92,8 @@ def test_increment_failed_lockout_count_should_increade_count_by_1(notifications
assert users_dao.get_user_by_id(user.id).failed_login_count == 1
def test_user_is_locked_if_failed_login_count_is_10_or_greater(notifications_admin, notifications_admin_db):
def test_user_is_locked_if_failed_login_count_is_10_or_greater(notifications_admin,
notifications_admin_db, notify_db_session):
user = User(name='cannot remember password',
password='somepassword',
email_address='test1@get_all.gov.uk',
@@ -107,7 +112,7 @@ def test_user_is_locked_if_failed_login_count_is_10_or_greater(notifications_adm
assert saved_user.is_locked() is True
def test_user_is_active_is_false_if_state_is_inactive(notifications_admin, notifications_admin_db):
def test_user_is_active_is_false_if_state_is_inactive(notifications_admin, notifications_admin_db, notify_db_session):
user = User(name='inactive user',
password='somepassword',
email_address='test1@get_all.gov.uk',
@@ -121,7 +126,7 @@ def test_user_is_active_is_false_if_state_is_inactive(notifications_admin, notif
assert saved_user.is_active() is False
def test_should_update_user_to_active(notifications_admin, notifications_admin_db):
def test_should_update_user_to_active(notifications_admin, notifications_admin_db, notify_db_session):
user = User(name='Make user active',
password='somepassword',
email_address='activate@user.gov.uk',
@@ -135,7 +140,7 @@ def test_should_update_user_to_active(notifications_admin, notifications_admin_d
assert updated_user.state == 'active'
def test_should_throws_error_when_id_does_not_exist(notifications_admin, notifications_admin_db):
def test_should_throws_error_when_id_does_not_exist(notifications_admin, notifications_admin_db, notify_db_session):
with pytest.raises(AttributeError) as error:
users_dao.activate_user(123)
assert '''object has no attribute 'state''''' in str(error.value)

View File

@@ -0,0 +1,62 @@
import sqlalchemy
from pytest import fail
from app.main.dao import verify_codes_dao
from app.main.encryption import checkpw
from tests.app.main import create_test_user
def test_insert_new_code_and_get_it_back(notifications_admin, notifications_admin_db, notify_db_session):
user = create_test_user()
verify_codes_dao.add_code(user_id=user.id, code='12345', code_type='email')
saved_code = verify_codes_dao.get_code(user_id=user.id, code_type='email')
assert saved_code.user_id == user.id
assert checkpw('12345', saved_code.code)
assert saved_code.code_type == 'email'
assert saved_code.code_used is False
def test_insert_new_code_should_thrw_exception_when_type_does_not_exist(notifications_admin,
notifications_admin_db,
notify_db_session):
user = create_test_user()
try:
verify_codes_dao.add_code(user_id=user.id, code='23545', code_type='not_real')
fail('Should have thrown an exception')
except sqlalchemy.exc.DataError as e:
assert 'invalid input value for enum verify_code_types: "not_real"' in e.orig.pgerror
def test_should_throw_exception_when_user_does_not_exist(notifications_admin,
notifications_admin_db,
notify_db_session):
try:
verify_codes_dao.add_code(user_id=1, code='12345', code_type='email')
fail('Should throw exception')
except sqlalchemy.exc.IntegrityError as e:
assert 'ERROR: insert or update on table "verify_codes" violates ' \
'foreign key constraint "verify_codes_user_id_fkey"' in e.orig.pgerror
def test_should_return_none_if_code_is_used(notifications_admin,
notifications_admin_db,
notify_db_session):
user = create_test_user()
verify_codes_dao.add_code(user_id=user.id, code='12345', code_type='email')
verify_codes_dao.use_code(user_id=user.id, code='12345', code_type='email')
saved_code = verify_codes_dao.get_code_by_code(user_id=user.id, code_type='email', code='12345')
assert saved_code.code_used is True
def test_should_return_none_if_code_is_used(notifications_admin,
notifications_admin_db,
notify_db_session):
user = create_test_user()
verify_codes_dao.add_code(user_id=user.id, code='12345', code_type='sms')
code = verify_codes_dao.get_code(user_id=user.id, code_type='sms')
verify_codes_dao.use_code(code.id)
code = verify_codes_dao.get_code(user_id=user.id, code_type='sms')
assert code is None

View File

@@ -1,16 +0,0 @@
from datetime import datetime
from app.main.dao import users_dao
from app.models import User
def create_test_user():
user = User(name='Test User',
password='somepassword',
email_address='test@user.gov.uk',
mobile_number='+441234123412',
created_at=datetime.now(),
role_id=1,
state='pending')
users_dao.insert_user(user)
return user

View File

@@ -1,13 +1,13 @@
def test_render_register_returns_template_with_form(notifications_admin, notifications_admin_db):
def test_render_register_returns_template_with_form(notifications_admin, notifications_admin_db, notify_db_session):
response = notifications_admin.test_client().get('/register')
assert response.status_code == 200
assert 'Create an account' in response.get_data(as_text=True)
def test_process_register_creates_new_user(notifications_admin, notifications_admin_db, mocker):
def test_process_register_creates_new_user(notifications_admin, notifications_admin_db, mocker, notify_db_session):
_set_up_mocker(mocker)
response = notifications_admin.test_client().post('/register',
@@ -21,7 +21,8 @@ def test_process_register_creates_new_user(notifications_admin, notifications_ad
def test_process_register_returns_400_when_mobile_number_is_invalid(notifications_admin,
notifications_admin_db,
mocker):
mocker,
notify_db_session):
_set_up_mocker(mocker)
response = notifications_admin.test_client().post('/register',
data={'name': 'Bad Mobile',
@@ -33,7 +34,10 @@ def test_process_register_returns_400_when_mobile_number_is_invalid(notification
assert 'Please enter a +44 mobile number' in response.get_data(as_text=True)
def test_should_return_400_when_email_is_not_gov_uk(notifications_admin, notifications_admin_db, mocker):
def test_should_return_400_when_email_is_not_gov_uk(notifications_admin,
notifications_admin_db,
mocker,
notify_db_session):
_set_up_mocker(mocker)
response = notifications_admin.test_client().post('/register',
data={'name': 'Bad Mobile',
@@ -45,7 +49,7 @@ def test_should_return_400_when_email_is_not_gov_uk(notifications_admin, notific
assert 'Please enter a gov.uk email address' in response.get_data(as_text=True)
def test_should_add_verify_codes_on_session(notifications_admin, notifications_admin_db, mocker):
def test_should_add_verify_codes_on_session(notifications_admin, notifications_admin_db, mocker, notify_db_session):
_set_up_mocker(mocker)
with notifications_admin.test_client() as client:
response = client.post('/register',
@@ -62,7 +66,7 @@ def _set_up_mocker(mocker):
mocker.patch("app.admin_api_client.send_email")
def test_should_return_400_if_password_is_blacklisted(notifications_admin, notifications_admin_db):
def test_should_return_400_if_password_is_blacklisted(notifications_admin, notifications_admin_db, notify_db_session):
response = notifications_admin.test_client().post('/register',
data={'name': 'Bad Mobile',
'email_address': 'bad_mobile@example.not.right',

View File

@@ -13,7 +13,7 @@ def test_render_sign_in_returns_sign_in_template(notifications_admin):
assert 'Forgotten password?' in response.get_data(as_text=True)
def test_process_sign_in_return_2fa_template(notifications_admin, notifications_admin_db, mocker):
def test_process_sign_in_return_2fa_template(notifications_admin, notifications_admin_db, mocker, notify_db_session):
_set_up_mocker(mocker)
user = User(email_address='valid@example.gov.uk',
password='val1dPassw0rd!',
@@ -29,7 +29,9 @@ def test_process_sign_in_return_2fa_template(notifications_admin, notifications_
assert response.location == 'http://localhost/two-factor'
def test_should_return_locked_out_true_when_user_is_locked(notifications_admin, notifications_admin_db):
def test_should_return_locked_out_true_when_user_is_locked(notifications_admin,
notifications_admin_db,
notify_db_session):
user = User(email_address='valid@example.gov.uk',
password='val1dPassw0rd!',
mobile_number='+441234123123',
@@ -56,7 +58,9 @@ def test_should_return_locked_out_true_when_user_is_locked(notifications_admin,
assert '"locked_out": true' in response.get_data(as_text=True)
def test_should_return_active_user_is_false_if_user_is_inactive(notifications_admin, notifications_admin_db):
def test_should_return_active_user_is_false_if_user_is_inactive(notifications_admin,
notifications_admin_db,
notify_db_session):
user = User(email_address='inactive_user@example.gov.uk',
password='val1dPassw0rd!',
mobile_number='+441234123123',
@@ -74,7 +78,7 @@ def test_should_return_active_user_is_false_if_user_is_inactive(notifications_ad
assert '"active_user": false' in response.get_data(as_text=True)
def test_should_return_401_when_user_does_not_exist(notifications_admin, notifications_admin_db):
def test_should_return_401_when_user_does_not_exist(notifications_admin, notifications_admin_db, notify_db_session):
response = notifications_admin.test_client().post('/sign-in',
data={'email_address': 'does_not_exist@gov.uk',
'password': 'doesNotExist!'})

View File

@@ -1,21 +1,21 @@
from flask import json
from app.main.encryption import hashpw
from tests.app.main.views import create_test_user
from app.main.dao import verify_codes_dao
from tests.app.main import create_test_user
def test_should_render_two_factor_page(notifications_admin, notifications_admin_db):
def test_should_render_two_factor_page(notifications_admin, notifications_admin_db, notify_db_session):
response = notifications_admin.test_client().get('/two-factor')
assert response.status_code == 200
assert '''We've sent you a text message with a verification code.''' in response.get_data(as_text=True)
def test_should_login_user_and_redirect_to_dashboard(notifications_admin, notifications_admin_db):
def test_should_login_user_and_redirect_to_dashboard(notifications_admin, notifications_admin_db, notify_db_session):
with notifications_admin.test_client() as client:
with client.session_transaction() as session:
user = create_test_user()
session['user_id'] = user.id
session['sms_code'] = hashpw('12345')
verify_codes_dao.add_code(user_id=user.id, code='12345', code_type='sms')
response = client.post('/two-factor',
data={'sms_code': '12345'})
@@ -23,35 +23,37 @@ def test_should_login_user_and_redirect_to_dashboard(notifications_admin, notifi
assert response.location == 'http://localhost/dashboard'
def test_should_return_400_with_sms_code_error_when_sms_code_is_wrong(notifications_admin, notifications_admin_db):
def test_should_return_400_with_sms_code_error_when_sms_code_is_wrong(notifications_admin,
notifications_admin_db,
notify_db_session):
with notifications_admin.test_client() as client:
with client.session_transaction() as session:
user = create_test_user()
session['user_id'] = user.id
session['sms_code'] = hashpw('12345')
verify_codes_dao.add_code(user_id=user.id, code='12345', code_type='sms')
response = client.post('/two-factor',
data={'sms_code': '23456'})
assert response.status_code == 400
assert {'sms_code': ['Code does not match']} == json.loads(response.get_data(as_text=True))
def test_should_return_400_when_sms_code_is_empty(notifications_admin, notifications_admin_db):
def test_should_return_400_when_sms_code_is_empty(notifications_admin, notifications_admin_db, notify_db_session):
with notifications_admin.test_client() as client:
with client.session_transaction() as session:
user = create_test_user()
session['user_id'] = user.id
session['sms_code'] = hashpw('12345')
verify_codes_dao.add_code(user_id=user.id, code='12345', code_type='sms')
response = client.post('/two-factor')
assert response.status_code == 400
assert {'sms_code': ['Please enter your code']} == json.loads(response.get_data(as_text=True))
def test_should_return_400_when_sms_code_is_too_short(notifications_admin, notifications_admin_db):
def test_should_return_400_when_sms_code_is_too_short(notifications_admin, notifications_admin_db, notify_db_session):
with notifications_admin.test_client() as client:
with client.session_transaction() as session:
user = create_test_user()
session['user_id'] = user.id
session['sms_code'] = hashpw('23467')
verify_codes_dao.add_code(user_id=user.id, code='23467', code_type='sms')
response = client.post('/two-factor', data={'sms_code': '2346'})
assert response.status_code == 400
data = json.loads(response.get_data(as_text=True))

View File

@@ -1,23 +1,26 @@
from datetime import datetime, timedelta
from flask import json
from app.main.dao import users_dao
from app.main.encryption import hashpw
from tests.app.main.views import create_test_user
from app.main.dao import users_dao, verify_codes_dao
from tests.app.main import create_test_user
def test_should_return_verify_template(notifications_admin, notifications_admin_db):
def test_should_return_verify_template(notifications_admin, notifications_admin_db, notify_db_session):
response = notifications_admin.test_client().get('/verify')
assert response.status_code == 200
assert 'Activate your account' in response.get_data(as_text=True)
def test_should_redirect_to_add_service_when_code_are_correct(notifications_admin, notifications_admin_db):
def test_should_redirect_to_add_service_when_code_are_correct(notifications_admin,
notifications_admin_db,
notify_db_session):
with notifications_admin.test_client() as client:
with client.session_transaction() as session:
user = create_test_user()
session['user_id'] = user.id
session['sms_code'] = hashpw('12345')
session['email_code'] = hashpw('23456')
verify_codes_dao.add_code(user_id=user.id, code='12345', code_type='sms')
verify_codes_dao.add_code(user_id=user.id, code='23456', code_type='email')
response = client.post('/verify',
data={'sms_code': '12345',
'email_code': '23456'})
@@ -25,13 +28,13 @@ def test_should_redirect_to_add_service_when_code_are_correct(notifications_admi
assert response.location == 'http://localhost/add-service'
def test_should_activate_user_after_verify(notifications_admin, notifications_admin_db):
def test_should_activate_user_after_verify(notifications_admin, notifications_admin_db, notify_db_session):
with notifications_admin.test_client() as client:
with client.session_transaction() as session:
user = create_test_user()
session['user_id'] = user.id
session['sms_code'] = hashpw('12345')
session['email_code'] = hashpw('23456')
verify_codes_dao.add_code(user_id=user.id, code='12345', code_type='sms')
verify_codes_dao.add_code(user_id=user.id, code='23456', code_type='email')
client.post('/verify',
data={'sms_code': '12345',
'email_code': '23456'})
@@ -40,13 +43,13 @@ def test_should_activate_user_after_verify(notifications_admin, notifications_ad
assert after_verify.state == 'active'
def test_should_return_400_when_sms_code_is_wrong(notifications_admin, notifications_admin_db):
def test_should_return_400_when_sms_code_is_wrong(notifications_admin, notifications_admin_db, notify_db_session):
with notifications_admin.test_client() as client:
with client.session_transaction() as session:
user = create_test_user()
session['user_id'] = user.id
session['sms_code'] = hashpw('12345')
session['email_code'] = hashpw('23456')
verify_codes_dao.add_code(user_id=user.id, code='12345', code_type='sms')
verify_codes_dao.add_code(user_id=user.id, code='23456', code_type='email')
response = client.post('/verify',
data={'sms_code': '98765',
'email_code': '23456'})
@@ -54,13 +57,13 @@ def test_should_return_400_when_sms_code_is_wrong(notifications_admin, notificat
assert {'sms_code': ['Code does not match']} == json.loads(response.get_data(as_text=True))
def test_should_return_400_when_email_code_is_wrong(notifications_admin, notifications_admin_db):
def test_should_return_400_when_email_code_is_wrong(notifications_admin, notifications_admin_db, notify_db_session):
with notifications_admin.test_client() as client:
with client.session_transaction() as session:
user = create_test_user()
session['user_id'] = user.id
session['sms_code'] = hashpw('12345')
session['email_code'] = hashpw('98456')
verify_codes_dao.add_code(user_id=user.id, code='12345', code_type='sms')
verify_codes_dao.add_code(user_id=user.id, code='98456', code_type='email')
response = client.post('/verify',
data={'sms_code': '12345',
'email_code': '23456'})
@@ -68,39 +71,39 @@ def test_should_return_400_when_email_code_is_wrong(notifications_admin, notific
assert {'email_code': ['Code does not match']} == json.loads(response.get_data(as_text=True))
def test_should_return_400_when_sms_code_is_missing(notifications_admin, notifications_admin_db):
def test_should_return_400_when_sms_code_is_missing(notifications_admin, notifications_admin_db, notify_db_session):
with notifications_admin.test_client() as client:
with client.session_transaction() as session:
user = create_test_user()
session['user_id'] = user.id
session['sms_code'] = hashpw('12345')
session['email_code'] = hashpw('98456')
verify_codes_dao.add_code(user_id=user.id, code='12345', code_type='sms')
verify_codes_dao.add_code(user_id=user.id, code='98456', code_type='email')
response = client.post('/verify',
data={'email_code': '98456'})
assert response.status_code == 400
assert {'sms_code': ['SMS code can not be empty']} == json.loads(response.get_data(as_text=True))
def test_should_return_400_when_email_code_is_missing(notifications_admin, notifications_admin_db):
def test_should_return_400_when_email_code_is_missing(notifications_admin, notifications_admin_db, notify_db_session):
with notifications_admin.test_client() as client:
with client.session_transaction() as session:
user = create_test_user()
session['user_id'] = user.id
session['sms_code'] = hashpw('23456')
session['email_code'] = hashpw('23456')
verify_codes_dao.add_code(user_id=user.id, code='23456', code_type='email')
verify_codes_dao.add_code(user_id=user.id, code='23456', code_type='sms')
response = client.post('/verify',
data={'sms_code': '23456'})
assert response.status_code == 400
assert {'email_code': ['Email code can not be empty']} == json.loads(response.get_data(as_text=True))
def test_should_return_400_when_email_code_has_letter(notifications_admin, notifications_admin_db):
def test_should_return_400_when_email_code_has_letter(notifications_admin, notifications_admin_db, notify_db_session):
with notifications_admin.test_client() as client:
with client.session_transaction() as session:
user = create_test_user()
session['user_id'] = user.id
session['sms_code'] = hashpw('23456')
session['email_code'] = hashpw('23456')
verify_codes_dao.add_code(user_id=user.id, code='23456', code_type='email')
verify_codes_dao.add_code(user_id=user.id, code='23456', code_type='sms')
response = client.post('/verify',
data={'sms_code': '23456',
'email_code': 'abcde'})
@@ -112,13 +115,13 @@ def test_should_return_400_when_email_code_has_letter(notifications_admin, notif
assert data['email_code'].sort() == expected['email_code'].sort()
def test_should_return_400_when_sms_code_is_too_short(notifications_admin, notifications_admin_db):
def test_should_return_400_when_sms_code_is_too_short(notifications_admin, notifications_admin_db, notify_db_session):
with notifications_admin.test_client() as client:
with client.session_transaction() as session:
user = create_test_user()
session['user_id'] = user.id
session['sms_code'] = hashpw('23456')
session['email_code'] = hashpw('23456')
verify_codes_dao.add_code(user_id=user.id, code='23456', code_type='email')
verify_codes_dao.add_code(user_id=user.id, code='23456', code_type='sms')
response = client.post('/verify',
data={'sms_code': '2345',
'email_code': '23456'})
@@ -130,15 +133,46 @@ def test_should_return_400_when_sms_code_is_too_short(notifications_admin, notif
assert data['sms_code'].sort() == expected['sms_code'].sort()
def test_should_return_302_when_email_code_starts_with_zero(notifications_admin, notifications_admin_db):
def test_should_return_302_when_email_code_starts_with_zero(notifications_admin,
notifications_admin_db,
notify_db_session):
with notifications_admin.test_client() as client:
with client.session_transaction() as session:
user = create_test_user()
session['user_id'] = user.id
session['sms_code'] = hashpw('23456')
session['email_code'] = hashpw('09765')
verify_codes_dao.add_code(user_id=user.id, code='23456', code_type='sms')
verify_codes_dao.add_code(user_id=user.id, code='09765', code_type='email')
response = client.post('/verify',
data={'sms_code': '23456',
'email_code': '09765'})
assert response.status_code == 302
assert response.location == 'http://localhost/add-service'
def test_should_return_400_when_verify_code_has_expired(notifications_admin,
notifications_admin_db,
notify_db_session):
with notifications_admin.test_client() as client:
with client.session_transaction() as session:
user = create_test_user()
session['user_id'] = user.id
verify_codes_dao.add_code_with_expiry(user_id=user.id,
code='23456',
code_type='email',
expiry=datetime.now() + timedelta(hours=-2))
verify_codes_dao.add_code_with_expiry(user_id=user.id,
code='23456',
code_type='sms',
expiry=datetime.now() + timedelta(hours=-2))
response = client.post('/verify',
data={'sms_code': '23456',
'email_code': '23456'})
assert response.status_code == 400
data = json.loads(response.get_data(as_text=True))
expected = {'sms_code': ['Code has expired'],
'email_code': ['Code has expired']}
assert len(data.keys()) == 2
assert 'sms_code' in data
assert data['sms_code'].sort() == expected['sms_code'].sort()
assert 'email_code' in data
assert data['email_code'].sort() == expected['email_code'].sort()

View File

@@ -1,14 +1,19 @@
import os
import pytest
from _pytest.monkeypatch import monkeypatch
from sqlalchemy.schema import MetaData, DropConstraint
from alembic.command import upgrade
from alembic.config import Config
from flask.ext.migrate import Migrate, MigrateCommand
from flask.ext.script import Manager
from sqlalchemy.schema import MetaData
from app import create_app, db
from app.models import Roles
@pytest.fixture(scope='function')
@pytest.fixture(scope='session')
def notifications_admin(request):
app = create_app('test')
ctx = app.app_context()
ctx.push()
@@ -19,22 +24,34 @@ def notifications_admin(request):
return app
@pytest.fixture(scope='function')
@pytest.fixture(scope='session')
def notifications_admin_db(notifications_admin, request):
metadata = MetaData(db.engine)
metadata.reflect()
for table in metadata.tables.values():
for fk in table.foreign_keys:
db.engine.execute(DropConstraint(fk.constraint))
metadata.drop_all()
Migrate(notifications_admin, db)
Manager(db, MigrateCommand)
BASE_DIR = os.path.dirname(os.path.dirname(__file__))
ALEMBIC_CONFIG = os.path.join(BASE_DIR, 'migrations')
config = Config(ALEMBIC_CONFIG + '/alembic.ini')
config.set_main_option("script_location", ALEMBIC_CONFIG)
# Create the tables based on the current model
db.create_all()
with notifications_admin.app_context():
upgrade(config, 'head')
# Add base data here
role = Roles(id=1, role='test_role')
db.session.add(role)
db.session.commit()
db.session.flush()
db.session.expunge_all()
db.session.commit()
def teardown():
db.session.remove()
db.drop_all()
db.engine.execute("drop table alembic_version")
db.get_engine(notifications_admin).dispose()
request.addfinalizer(teardown)
@pytest.fixture(scope='function')
def notify_db_session(request):
def teardown():
db.session.remove()
for tbl in reversed(meta.sorted_tables):
if tbl.fullname not in ['roles']:
db.engine.execute(tbl.delete())
meta = MetaData(bind=db.engine, reflect=True)
request.addfinalizer(teardown)