mirror of
https://github.com/GSA/notifications-admin.git
synced 2026-02-05 10:53:28 -05:00
@@ -27,3 +27,5 @@ deploy:
|
||||
app: notifications-admin
|
||||
on:
|
||||
repo: alphagov/notifications-admin
|
||||
run:
|
||||
- python app.py db upgrade
|
||||
|
||||
@@ -1,14 +1,22 @@
|
||||
import os
|
||||
|
||||
from flask import Flask
|
||||
from flask import Flask, session
|
||||
from flask._compat import string_types
|
||||
from flask.ext import assets
|
||||
from flask.ext.sqlalchemy import SQLAlchemy
|
||||
from flask_login import LoginManager
|
||||
from flask_wtf import CsrfProtect
|
||||
from webassets.filter import get_filter
|
||||
from werkzeug.exceptions import abort
|
||||
|
||||
from app.its_dangerous_session import ItsdangerousSessionInterface
|
||||
import app.proxy_fix
|
||||
from config import configs
|
||||
|
||||
|
||||
db = SQLAlchemy()
|
||||
login_manager = LoginManager()
|
||||
csrf = CsrfProtect()
|
||||
|
||||
|
||||
def create_app(config_name):
|
||||
@@ -18,13 +26,40 @@ def create_app(config_name):
|
||||
application.config.from_object(configs[config_name])
|
||||
db.init_app(application)
|
||||
init_app(application)
|
||||
init_csrf(application)
|
||||
|
||||
login_manager.init_app(application)
|
||||
login_manager.login_view = 'main.sign_in.render_sign_in'
|
||||
|
||||
from app.main import main as main_blueprint
|
||||
application.register_blueprint(main_blueprint)
|
||||
|
||||
proxy_fix.init_app(application)
|
||||
|
||||
application.session_interface = ItsdangerousSessionInterface()
|
||||
|
||||
return application
|
||||
|
||||
|
||||
def init_csrf(application):
|
||||
csrf.init_app(application)
|
||||
|
||||
@csrf.error_handler
|
||||
def csrf_handler(reason):
|
||||
if 'user_id' not in session:
|
||||
application.logger.info(
|
||||
u'csrf.session_expired: Redirecting user to log in page'
|
||||
)
|
||||
|
||||
return application.login_manager.unauthorized()
|
||||
|
||||
application.logger.info(
|
||||
u'csrf.invalid_token: Aborting request, user_id: {user_id}',
|
||||
extra={'user_id': session['user_id']})
|
||||
|
||||
abort(400, reason)
|
||||
|
||||
|
||||
def init_app(app):
|
||||
for key, value in app.config.items():
|
||||
if key in os.environ:
|
||||
|
||||
50
app/its_dangerous_session.py
Normal file
50
app/its_dangerous_session.py
Normal file
@@ -0,0 +1,50 @@
|
||||
from werkzeug.datastructures import CallbackDict
|
||||
from flask.sessions import SessionInterface, SessionMixin
|
||||
from itsdangerous import URLSafeTimedSerializer, BadSignature
|
||||
|
||||
|
||||
class ItsdangerousSession(CallbackDict, SessionMixin):
|
||||
|
||||
def __init__(self, initial=None):
|
||||
def on_update(self):
|
||||
self.modified = True
|
||||
CallbackDict.__init__(self, initial, on_update)
|
||||
self.modified = False
|
||||
|
||||
|
||||
class ItsdangerousSessionInterface(SessionInterface):
|
||||
session_class = ItsdangerousSession
|
||||
|
||||
def get_serializer(self, app):
|
||||
salt = app.config.get('DANGEROUS_SALT')
|
||||
if not app.secret_key:
|
||||
return None
|
||||
return URLSafeTimedSerializer(app.secret_key,
|
||||
salt=salt)
|
||||
|
||||
def open_session(self, app, request):
|
||||
s = self.get_serializer(app)
|
||||
if s is None:
|
||||
return None
|
||||
val = request.cookies.get(app.session_cookie_name)
|
||||
if not val:
|
||||
return self.session_class()
|
||||
max_age = app.permanent_session_lifetime.total_seconds()
|
||||
try:
|
||||
data = s.loads(val, max_age=max_age)
|
||||
return self.session_class(data)
|
||||
except BadSignature:
|
||||
return self.session_class()
|
||||
|
||||
def save_session(self, app, session, response):
|
||||
domain = self.get_cookie_domain(app)
|
||||
if not session:
|
||||
if session.modified:
|
||||
response.delete_cookie(app.session_cookie_name,
|
||||
domain=domain)
|
||||
return
|
||||
expires = self.get_expiration_time(app, session)
|
||||
val = self.get_serializer(app).dumps(dict(session))
|
||||
response.set_cookie(app.session_cookie_name, val,
|
||||
expires=expires, httponly=True,
|
||||
domain=domain)
|
||||
@@ -3,4 +3,4 @@ from flask import Blueprint
|
||||
main = Blueprint('main', __name__)
|
||||
|
||||
|
||||
from app.main.views import index
|
||||
from app.main.views import index, sign_in
|
||||
|
||||
@@ -1,15 +1,32 @@
|
||||
from app import db
|
||||
from app.models import Users
|
||||
from app import db, login_manager
|
||||
from app.models import User
|
||||
from app.main.encryption import hashpw
|
||||
|
||||
|
||||
@login_manager.user_loader
|
||||
def load_user(user_id):
|
||||
return get_user_by_id(user_id)
|
||||
|
||||
|
||||
def insert_user(user):
|
||||
user.password = hashpw(user.password)
|
||||
db.session.add(user)
|
||||
db.session.commit()
|
||||
|
||||
|
||||
def get_user_by_id(id):
|
||||
return Users.query.filter_by(id=id).first()
|
||||
return User.query.filter_by(id=id).first()
|
||||
|
||||
|
||||
def get_all_users():
|
||||
return Users.query.all()
|
||||
return User.query.all()
|
||||
|
||||
|
||||
def get_user_by_email(email_address):
|
||||
return User.query.filter_by(email_address=email_address).first()
|
||||
|
||||
|
||||
def increment_failed_login_count(id):
|
||||
user = User.query.filter_by(id=id).first()
|
||||
user.failed_login_count += 1
|
||||
db.session.commit()
|
||||
|
||||
9
app/main/encryption.py
Normal file
9
app/main/encryption.py
Normal file
@@ -0,0 +1,9 @@
|
||||
from flask.ext.bcrypt import generate_password_hash, check_password_hash
|
||||
|
||||
|
||||
def hashpw(password):
|
||||
return generate_password_hash(password.encode('UTF-8'), 10)
|
||||
|
||||
|
||||
def checkpw(password, hashed_password):
|
||||
return check_password_hash(hashed_password, password)
|
||||
14
app/main/forms.py
Normal file
14
app/main/forms.py
Normal file
@@ -0,0 +1,14 @@
|
||||
from flask_wtf import Form
|
||||
from wtforms import StringField, PasswordField
|
||||
from wtforms.validators import DataRequired, Email, Length
|
||||
|
||||
|
||||
class LoginForm(Form):
|
||||
email_address = StringField('Email address', validators=[
|
||||
Length(min=5, max=255),
|
||||
DataRequired(message='Email cannot be empty'),
|
||||
Email(message='Please enter a valid email address')
|
||||
])
|
||||
password = PasswordField('Password', validators=[
|
||||
DataRequired(message='Please enter your password')
|
||||
])
|
||||
@@ -1,4 +1,5 @@
|
||||
from flask import render_template
|
||||
from flask_login import login_required
|
||||
|
||||
from app.main import main
|
||||
|
||||
@@ -19,41 +20,43 @@ def helloworld():
|
||||
|
||||
|
||||
@main.route("/register")
|
||||
@login_required
|
||||
def register():
|
||||
return render_template('register.html')
|
||||
|
||||
|
||||
@main.route("/register-from-invite")
|
||||
@login_required
|
||||
def registerfrominvite():
|
||||
return render_template('register-from-invite.html')
|
||||
|
||||
|
||||
@main.route("/verify")
|
||||
@login_required
|
||||
def verify():
|
||||
return render_template('verify.html')
|
||||
|
||||
|
||||
@main.route("/verify-mobile")
|
||||
@login_required
|
||||
def verifymobile():
|
||||
return render_template('verify-mobile.html')
|
||||
|
||||
|
||||
@main.route("/dashboard")
|
||||
@login_required
|
||||
def dashboard():
|
||||
return render_template('dashboard.html')
|
||||
|
||||
|
||||
@main.route("/sign-in")
|
||||
def signin():
|
||||
return render_template('signin.html')
|
||||
|
||||
|
||||
@main.route("/add-service")
|
||||
@login_required
|
||||
def addservice():
|
||||
return render_template('add-service.html')
|
||||
|
||||
|
||||
@main.route("/two-factor")
|
||||
@login_required
|
||||
def twofactor():
|
||||
return render_template('two-factor.html')
|
||||
|
||||
|
||||
59
app/main/views/sign_in.py
Normal file
59
app/main/views/sign_in.py
Normal file
@@ -0,0 +1,59 @@
|
||||
from datetime import datetime
|
||||
|
||||
from flask import render_template, redirect, jsonify
|
||||
from flask_login import login_user
|
||||
|
||||
from app.main import main
|
||||
from app.main.forms import LoginForm
|
||||
from app.main.dao import users_dao
|
||||
from app.models import User
|
||||
from app.main.encryption import checkpw
|
||||
|
||||
|
||||
@main.route("/sign-in", methods=(['GET']))
|
||||
def render_sign_in():
|
||||
return render_template('signin.html', form=LoginForm())
|
||||
|
||||
|
||||
@main.route('/sign-in', methods=(['POST']))
|
||||
def process_sign_in():
|
||||
form = LoginForm()
|
||||
if form.validate_on_submit():
|
||||
user = users_dao.get_user_by_email(form.email_address.data)
|
||||
if user.is_locked():
|
||||
return jsonify(locked_out=True), 401
|
||||
if not user.is_active():
|
||||
return jsonify(active_user=False), 401
|
||||
if user is None:
|
||||
return jsonify(authorization=False), 401
|
||||
if checkpw(form.password.data, user.password):
|
||||
login_user(user)
|
||||
else:
|
||||
users_dao.increment_failed_login_count(user.id)
|
||||
return jsonify(authorization=False), 401
|
||||
else:
|
||||
return jsonify(form.errors), 400
|
||||
return redirect('/two-factor')
|
||||
|
||||
|
||||
@main.route('/temp-create-users', methods=(['GET']))
|
||||
def render_create_user():
|
||||
return render_template('temp-create-users.html', form=LoginForm())
|
||||
|
||||
|
||||
@main.route('/temp-create-users', methods=(['POST']))
|
||||
def create_user_for_test():
|
||||
form = LoginForm()
|
||||
if form.validate_on_submit():
|
||||
user = User(email_address=form.email_address.data,
|
||||
name=form.email_address.data,
|
||||
password=form.password.data,
|
||||
created_at=datetime.now(),
|
||||
mobile_number='+447651234534',
|
||||
role_id=1)
|
||||
users_dao.insert_user(user)
|
||||
|
||||
return redirect('/sign-in')
|
||||
else:
|
||||
print(form.errors)
|
||||
return redirect(form.errors), 400
|
||||
@@ -12,7 +12,7 @@ class Roles(db.Model):
|
||||
role = db.Column(db.String, nullable=False, unique=True)
|
||||
|
||||
|
||||
class Users(db.Model):
|
||||
class User(db.Model):
|
||||
__tablename__ = 'users'
|
||||
|
||||
id = db.Column(db.Integer, primary_key=True)
|
||||
@@ -43,6 +43,27 @@ class Users(db.Model):
|
||||
|
||||
return filter_null_value_fields(serialized)
|
||||
|
||||
def is_authenticated(self):
|
||||
return True
|
||||
|
||||
def is_active(self):
|
||||
if self.state == 'inactive':
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
def is_anonymous(self):
|
||||
return False
|
||||
|
||||
def get_id(self):
|
||||
return self.id
|
||||
|
||||
def is_locked(self):
|
||||
if self.failed_login_count < current_app.config['MAX_FAILED_LOGIN_COUNT']:
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
|
||||
def filter_null_value_fields(obj):
|
||||
return dict(
|
||||
|
||||
17
app/proxy_fix.py
Normal file
17
app/proxy_fix.py
Normal file
@@ -0,0 +1,17 @@
|
||||
from werkzeug.contrib.fixers import ProxyFix
|
||||
|
||||
|
||||
class CustomProxyFix(object):
|
||||
def __init__(self, app, forwarded_proto):
|
||||
self.app = ProxyFix(app)
|
||||
self.forwarded_proto = forwarded_proto
|
||||
|
||||
def __call__(self, environ, start_response):
|
||||
environ.update({
|
||||
"HTTP_X_FORWARDED_PROTO": self.forwarded_proto
|
||||
})
|
||||
return self.app(environ, start_response)
|
||||
|
||||
|
||||
def init_app(app):
|
||||
app.wsgi_app = CustomProxyFix(app.wsgi_app, app.config.get('HTTP_PROTOCOL', 'http'))
|
||||
@@ -1,7 +1,7 @@
|
||||
{% extends "admin_template.html" %}
|
||||
|
||||
{% block page_title %}
|
||||
Hello world!
|
||||
Sign in
|
||||
{% endblock %}
|
||||
|
||||
{% block content %}
|
||||
@@ -12,19 +12,24 @@ Hello world!
|
||||
|
||||
<p>If you do not have an account, you can <a href="register">register</a>.</p>
|
||||
|
||||
<p>
|
||||
<label class="form-label" for="email">Email address</label>
|
||||
<input class="form-control-2-3" id="email" type="text"><br>
|
||||
</p>
|
||||
<p>
|
||||
<label class="form-label" for="password">Password</label>
|
||||
<input class="form-control-1-4" id="password" type="password"><br>
|
||||
<span class="font-xsmall"><a href="forgot-password">Forgotten password?</a></span>
|
||||
</p>
|
||||
<form autocomplete="off" action="" method="post">
|
||||
{{ form.hidden_tag() }}
|
||||
<p>
|
||||
<label class="form-label">Email address</label>
|
||||
{{ form.email_address(class="form-control-2-3", autocomplete="off") }} <br>
|
||||
</p>
|
||||
<p>
|
||||
<label class="form-label">Password</label>
|
||||
{{ form.password(class="form-control-1-4", autocomplete="off") }} <br>
|
||||
</p>
|
||||
<p>
|
||||
<span class="font-xsmall"><a href="">Forgotten password?</a></span>
|
||||
</p>
|
||||
|
||||
<p>
|
||||
<a class="button" href="two-factor" role="button">Continue</a>
|
||||
</p>
|
||||
<p>
|
||||
<button class="button" href="two-factor" role="button">Continue</button>
|
||||
</p>
|
||||
</form>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
|
||||
33
app/templates/temp-create-users.html
Normal file
33
app/templates/temp-create-users.html
Normal file
@@ -0,0 +1,33 @@
|
||||
{% extends "admin_template.html" %}
|
||||
|
||||
{% block page_title %}
|
||||
Temp create users
|
||||
{% endblock %}
|
||||
|
||||
{% block content %}
|
||||
|
||||
<div class="grid-row">
|
||||
<div class="column-two-thirds">
|
||||
<h1 class="heading-xlarge">Temporary page to create user</h1>
|
||||
|
||||
<p>This is a temporary page to create users, the name will be the same as the email address.</p>
|
||||
|
||||
<form autocomplete="off" action="" method="post">
|
||||
{{ form.hidden_tag() }}
|
||||
<p>
|
||||
<label class="form-label">Email address</label>
|
||||
{{ form.email_address(class="form-control-2-3", autocomplete="off") }} <br>
|
||||
</p>
|
||||
<p>
|
||||
<label class="form-label">Password</label>
|
||||
{{ form.password(class="form-control-1-4", autocomplete="off") }} <br>
|
||||
</p>
|
||||
|
||||
<p>
|
||||
<button class="button" href="sign-in" role="button">Create User</button>
|
||||
</p>
|
||||
</form>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
{% endblock %}
|
||||
11
config.py
11
config.py
@@ -9,6 +9,12 @@ class Config(object):
|
||||
SQLALCHEMY_RECORD_QUERIES = True
|
||||
SQLALCHEMY_DATABASE_URI = 'postgresql://localhost/notifications_admin'
|
||||
MAX_FAILED_LOGIN_COUNT = 10
|
||||
PASS_SECRET_KEY = 'secret-key-unique-changeme'
|
||||
|
||||
WTF_CSRF_ENABLED = True
|
||||
SECRET_KEY = 'secret-key'
|
||||
HTTP_PROTOCOL = 'http'
|
||||
DANGEROUS_SALT = 'itsdangeroussalt'
|
||||
|
||||
|
||||
class Development(Config):
|
||||
@@ -18,8 +24,13 @@ class Development(Config):
|
||||
class Test(Config):
|
||||
DEBUG = False
|
||||
SQLALCHEMY_DATABASE_URI = 'postgresql://localhost/test_notifications_admin'
|
||||
WTF_CSRF_ENABLED = False
|
||||
|
||||
|
||||
class Live(Config):
|
||||
DEBUG = False
|
||||
HTTP_PROTOCOL = 'https'
|
||||
|
||||
configs = {
|
||||
'development': Development,
|
||||
'test': Test
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
"""empty message
|
||||
|
||||
Revision ID: create_users
|
||||
Revision ID: 10_create_users
|
||||
Revises: None
|
||||
Create Date: 2015-11-24 10:39:19.827534
|
||||
|
||||
@@ -13,6 +13,7 @@ down_revision = None
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
|
||||
def upgrade():
|
||||
op.create_table('roles',
|
||||
sa.Column('id', sa.Integer, primary_key=True),
|
||||
|
||||
14
migrations/versions/20_initialise_data.py
Normal file
14
migrations/versions/20_initialise_data.py
Normal file
@@ -0,0 +1,14 @@
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = '20_initialise_data'
|
||||
down_revision = '10_create_users'
|
||||
from app.models import Roles
|
||||
from alembic import op
|
||||
|
||||
def upgrade():
|
||||
op.execute("insert into roles(role) values('platform_admin')")
|
||||
op.execute("insert into roles(role) values('service_user')")
|
||||
|
||||
|
||||
def downgrade():
|
||||
op.drop_table('users')
|
||||
op.drop_table('roles')
|
||||
@@ -6,3 +6,6 @@ Flask-SQLAlchemy==2.0
|
||||
psycopg2==2.6.1
|
||||
SQLAlchemy==1.0.5
|
||||
SQLAlchemy-Utils==0.30.5
|
||||
Flask-WTF==0.11
|
||||
Flask-Login==0.2.11
|
||||
Flask-Bcrypt==0.6.2
|
||||
@@ -1,32 +0,0 @@
|
||||
from datetime import datetime
|
||||
|
||||
from app.main.dao import users_dao
|
||||
from app.models import Users
|
||||
|
||||
|
||||
def test_get_all_users_returns_all_users(notifications_admin, notifications_admin_db):
|
||||
user1 = Users(name='test one',
|
||||
password='somepassword',
|
||||
email_address='test1@get_all.gov.uk',
|
||||
mobile_number='+441234123412',
|
||||
created_at=datetime.now(),
|
||||
role_id=1)
|
||||
user2 = Users(name='test two',
|
||||
password='some2ndpassword',
|
||||
email_address='test2@get_all.gov.uk',
|
||||
mobile_number='+441234123412',
|
||||
created_at=datetime.now(),
|
||||
role_id=1)
|
||||
user3 = Users(name='test three',
|
||||
password='some2ndpassword',
|
||||
email_address='test2@get_all.gov.uk',
|
||||
mobile_number='+441234123412',
|
||||
created_at=datetime.now(),
|
||||
role_id=1)
|
||||
|
||||
users_dao.insert_user(user1)
|
||||
users_dao.insert_user(user2)
|
||||
users_dao.insert_user(user3)
|
||||
users = users_dao.get_all_users()
|
||||
assert len(users) == 3
|
||||
assert users == [user1, user2, user3]
|
||||
@@ -13,9 +13,12 @@ def test_insert_role_should_be_able_to_get_role(notifications_admin, notificatio
|
||||
assert saved_role == role
|
||||
|
||||
|
||||
def test_insert_role_will_throw_error_if_role_already_exists():
|
||||
def test_insert_role_will_throw_error_if_role_already_exists(notifications_admin, notifications_admin_db):
|
||||
role1 = roles_dao.get_role_by_id(1)
|
||||
assert role1.id == 1
|
||||
|
||||
role = Roles(id=1, role='cannot create a duplicate')
|
||||
|
||||
with pytest.raises(sqlalchemy.exc.IntegrityError) as error:
|
||||
with pytest.raises(sqlalchemy.orm.exc.FlushError) as error:
|
||||
roles_dao.insert_role(role)
|
||||
assert 'duplicate key value violates unique constraint "roles_pkey"' in str(error.value)
|
||||
assert 'conflicts with persistent instance' in str(error.value)
|
||||
|
||||
@@ -3,17 +3,17 @@ from datetime import datetime
|
||||
import pytest
|
||||
import sqlalchemy
|
||||
|
||||
from app.models import Users
|
||||
from app.models import User
|
||||
from app.main.dao import users_dao
|
||||
|
||||
|
||||
def test_insert_user_should_add_user(notifications_admin, notifications_admin_db):
|
||||
user = Users(name='test insert',
|
||||
password='somepassword',
|
||||
email_address='test@insert.gov.uk',
|
||||
mobile_number='+441234123412',
|
||||
created_at=datetime.now(),
|
||||
role_id=1)
|
||||
user = User(name='test insert',
|
||||
password='somepassword',
|
||||
email_address='test@insert.gov.uk',
|
||||
mobile_number='+441234123412',
|
||||
created_at=datetime.now(),
|
||||
role_id=1)
|
||||
|
||||
users_dao.insert_user(user)
|
||||
saved_user = users_dao.get_user_by_id(user.id)
|
||||
@@ -21,12 +21,101 @@ def test_insert_user_should_add_user(notifications_admin, notifications_admin_db
|
||||
|
||||
|
||||
def test_insert_user_with_role_that_does_not_exist_fails(notifications_admin, notifications_admin_db):
|
||||
user = Users(name='role does not exist',
|
||||
password='somepassword',
|
||||
email_address='test@insert.gov.uk',
|
||||
mobile_number='+441234123412',
|
||||
created_at=datetime.now(),
|
||||
role_id=100)
|
||||
user = User(name='role does not exist',
|
||||
password='somepassword',
|
||||
email_address='test@insert.gov.uk',
|
||||
mobile_number='+441234123412',
|
||||
created_at=datetime.now(),
|
||||
role_id=100)
|
||||
with pytest.raises(sqlalchemy.exc.IntegrityError) as error:
|
||||
users_dao.insert_user(user)
|
||||
assert 'insert or update on table "users" violates foreign key constraint "users_role_id_fkey"' in str(error.value)
|
||||
|
||||
|
||||
def test_get_user_by_email(notifications_admin, notifications_admin_db):
|
||||
user = User(name='test_get_by_email',
|
||||
password='somepassword',
|
||||
email_address='email@example.gov.uk',
|
||||
mobile_number='+441234153412',
|
||||
created_at=datetime.now(),
|
||||
role_id=1)
|
||||
|
||||
users_dao.insert_user(user)
|
||||
retrieved = users_dao.get_user_by_email(user.email_address)
|
||||
assert retrieved == user
|
||||
|
||||
|
||||
def test_get_all_users_returns_all_users(notifications_admin, notifications_admin_db):
|
||||
user1 = User(name='test one',
|
||||
password='somepassword',
|
||||
email_address='test1@get_all.gov.uk',
|
||||
mobile_number='+441234123412',
|
||||
created_at=datetime.now(),
|
||||
role_id=1)
|
||||
user2 = User(name='test two',
|
||||
password='some2ndpassword',
|
||||
email_address='test2@get_all.gov.uk',
|
||||
mobile_number='+441234123412',
|
||||
created_at=datetime.now(),
|
||||
role_id=1)
|
||||
user3 = User(name='test three',
|
||||
password='some2ndpassword',
|
||||
email_address='test2@get_all.gov.uk',
|
||||
mobile_number='+441234123412',
|
||||
created_at=datetime.now(),
|
||||
role_id=1)
|
||||
|
||||
users_dao.insert_user(user1)
|
||||
users_dao.insert_user(user2)
|
||||
users_dao.insert_user(user3)
|
||||
users = users_dao.get_all_users()
|
||||
assert len(users) == 3
|
||||
assert users == [user1, user2, user3]
|
||||
|
||||
|
||||
def test_increment_failed_lockout_count_should_increade_count_by_1(notifications_admin, notifications_admin_db):
|
||||
user = User(name='cannot remember password',
|
||||
password='somepassword',
|
||||
email_address='test1@get_all.gov.uk',
|
||||
mobile_number='+441234123412',
|
||||
created_at=datetime.now(),
|
||||
role_id=1)
|
||||
users_dao.insert_user(user)
|
||||
|
||||
savedUser = users_dao.get_user_by_id(user.id)
|
||||
assert savedUser.failed_login_count == 0
|
||||
users_dao.increment_failed_login_count(user.id)
|
||||
assert users_dao.get_user_by_id(user.id).failed_login_count == 1
|
||||
|
||||
|
||||
def test_user_is_locked_if_failed_login_count_is_10_or_greater(notifications_admin, notifications_admin_db):
|
||||
user = User(name='cannot remember password',
|
||||
password='somepassword',
|
||||
email_address='test1@get_all.gov.uk',
|
||||
mobile_number='+441234123412',
|
||||
created_at=datetime.now(),
|
||||
role_id=1)
|
||||
users_dao.insert_user(user)
|
||||
saved_user = users_dao.get_user_by_id(user.id)
|
||||
assert saved_user.is_locked() is False
|
||||
|
||||
for _ in range(10):
|
||||
users_dao.increment_failed_login_count(user.id)
|
||||
|
||||
saved_user = users_dao.get_user_by_id(user.id)
|
||||
assert saved_user.failed_login_count == 10
|
||||
assert saved_user.is_locked() is True
|
||||
|
||||
|
||||
def test_user_is_active_is_false_if_state_is_inactive(notifications_admin, notifications_admin_db):
|
||||
user = User(name='inactive user',
|
||||
password='somepassword',
|
||||
email_address='test1@get_all.gov.uk',
|
||||
mobile_number='+441234123412',
|
||||
created_at=datetime.now(),
|
||||
role_id=1,
|
||||
state='inactive')
|
||||
users_dao.insert_user(user)
|
||||
|
||||
saved_user = users_dao.get_user_by_id(user.id)
|
||||
assert saved_user.is_active() is False
|
||||
|
||||
17
tests/app/main/test_encyption.py
Normal file
17
tests/app/main/test_encyption.py
Normal file
@@ -0,0 +1,17 @@
|
||||
from app.main.encryption import hashpw, checkpw
|
||||
|
||||
|
||||
def test_should_hash_password():
|
||||
password = 'passwordToHash'
|
||||
assert password != hashpw(password)
|
||||
|
||||
|
||||
def test_should_check_password():
|
||||
value = 's3curePassword!'
|
||||
encrypted = hashpw(value)
|
||||
assert checkpw(value, encrypted) is True
|
||||
|
||||
|
||||
def test_checkpw_should_fail_when_pw_does_not_match():
|
||||
value = hashpw('somePassword')
|
||||
assert checkpw('somethingDifferent', value) is False
|
||||
0
tests/app/main/views/__init__.py
Normal file
0
tests/app/main/views/__init__.py
Normal file
81
tests/app/main/views/test_sign_in.py
Normal file
81
tests/app/main/views/test_sign_in.py
Normal file
@@ -0,0 +1,81 @@
|
||||
from datetime import datetime
|
||||
|
||||
from app.main.dao import users_dao
|
||||
from app.models import User
|
||||
|
||||
|
||||
def test_render_sign_in_returns_sign_in_template(notifications_admin):
|
||||
response = notifications_admin.test_client().get('/sign-in')
|
||||
assert response.status_code == 200
|
||||
assert 'Sign in' in response.get_data(as_text=True)
|
||||
assert 'Email address' in response.get_data(as_text=True)
|
||||
assert 'Password' in response.get_data(as_text=True)
|
||||
assert 'Forgotten password?' in response.get_data(as_text=True)
|
||||
|
||||
|
||||
def test_process_sign_in_return_2fa_template(notifications_admin, notifications_admin_db):
|
||||
user = User(email_address='valid@example.gov.uk',
|
||||
password='val1dPassw0rd!',
|
||||
mobile_number='+441234123123',
|
||||
name='valid',
|
||||
created_at=datetime.now(),
|
||||
role_id=1)
|
||||
users_dao.insert_user(user)
|
||||
response = notifications_admin.test_client().post('/sign-in',
|
||||
data={'email_address': 'valid@example.gov.uk',
|
||||
'password': 'val1dPassw0rd!'})
|
||||
assert response.status_code == 302
|
||||
assert response.location == 'http://localhost/two-factor'
|
||||
|
||||
|
||||
def test_temp_create_user(notifications_admin, notifications_admin_db):
|
||||
response = notifications_admin.test_client().post('/temp-create-users',
|
||||
data={'email_address': 'testing@example.gov.uk',
|
||||
'password': 'val1dPassw0rd!'})
|
||||
|
||||
assert response.status_code == 302
|
||||
|
||||
|
||||
def test_should_return_locked_out_true_when_user_is_locked(notifications_admin, notifications_admin_db):
|
||||
user = User(email_address='valid@example.gov.uk',
|
||||
password='val1dPassw0rd!',
|
||||
mobile_number='+441234123123',
|
||||
name='valid',
|
||||
created_at=datetime.now(),
|
||||
role_id=1)
|
||||
users_dao.insert_user(user)
|
||||
for _ in range(10):
|
||||
notifications_admin.test_client().post('/sign-in',
|
||||
data={'email_address': 'valid@example.gov.uk',
|
||||
'password': 'whatIsMyPassword!'})
|
||||
|
||||
response = notifications_admin.test_client().post('/sign-in',
|
||||
data={'email_address': 'valid@example.gov.uk',
|
||||
'password': 'val1dPassw0rd!'})
|
||||
|
||||
assert response.status_code == 401
|
||||
assert '"locked_out": true' in response.get_data(as_text=True)
|
||||
|
||||
another_bad_attempt = notifications_admin.test_client().post('/sign-in',
|
||||
data={'email_address': 'valid@example.gov.uk',
|
||||
'password': 'whatIsMyPassword!'})
|
||||
assert another_bad_attempt.status_code == 401
|
||||
assert '"locked_out": true' in response.get_data(as_text=True)
|
||||
|
||||
|
||||
def test_should_return_active_user_is_false_if_user_is_inactive(notifications_admin, notifications_admin_db):
|
||||
user = User(email_address='inactive_user@example.gov.uk',
|
||||
password='val1dPassw0rd!',
|
||||
mobile_number='+441234123123',
|
||||
name='inactive user',
|
||||
created_at=datetime.now(),
|
||||
role_id=1,
|
||||
state='inactive')
|
||||
users_dao.insert_user(user)
|
||||
|
||||
response = notifications_admin.test_client().post('/sign-in',
|
||||
data={'email_address': 'inactive_user@example.gov.uk',
|
||||
'password': 'val1dPassw0rd!'})
|
||||
|
||||
assert response.status_code == 401
|
||||
assert '"active_user": false' in response.get_data(as_text=True)
|
||||
@@ -5,7 +5,7 @@ from app import create_app, db
|
||||
from app.models import Roles
|
||||
|
||||
|
||||
@pytest.fixture(scope='module')
|
||||
@pytest.fixture(scope='function')
|
||||
def notifications_admin(request):
|
||||
app = create_app('test')
|
||||
ctx = app.app_context()
|
||||
@@ -18,7 +18,7 @@ def notifications_admin(request):
|
||||
return app
|
||||
|
||||
|
||||
@pytest.fixture(scope='module')
|
||||
@pytest.fixture(scope='function')
|
||||
def notifications_admin_db(notifications_admin, request):
|
||||
metadata = MetaData(db.engine)
|
||||
metadata.reflect()
|
||||
|
||||
Reference in New Issue
Block a user