All tests working, second time around.

This commit is contained in:
Nicholas Staples
2016-01-21 17:29:24 +00:00
parent 439127e897
commit 6b035cd324
9 changed files with 453 additions and 89 deletions

View File

@@ -1,9 +1,15 @@
from datetime import datetime import random
from datetime import (datetime, timedelta)
from . import DAOException from . import DAOException
from sqlalchemy.orm import load_only from sqlalchemy.orm import load_only
from app import db from app import db
from app.models import User from app.models import (User, VerifyCode)
from app.encryption import hashpw
def create_secret_code():
return ''.join(map(str, random.sample(range(9), 5)))
def save_model_user(usr, update_dict={}): def save_model_user(usr, update_dict={}):
@@ -16,6 +22,37 @@ def save_model_user(usr, update_dict={}):
db.session.commit() db.session.commit()
def create_user_code(user, code, code_type):
verify_code = VerifyCode(code_type=code_type,
expiry_datetime=datetime.now() + timedelta(hours=1),
user=user)
verify_code.code = code
db.session.add(verify_code)
db.session.commit()
return verify_code
def get_user_code(user, code, code_type):
# Get the most recent codes to try and reduce the
# time searching for the correct code.
codes = VerifyCode.query.filter_by(
user=user, code_type=code_type).order_by(
VerifyCode.created_at.desc())
retval = None
for x in codes:
if x.check_code(code):
retval = x
break
return retval
def use_user_code(id):
verify_code = VerifyCode.query.get(id)
verify_code.code_used = True
db.session.add(verify_code)
db.session.commit()
def delete_model_user(user): def delete_model_user(user):
db.session.delete(user) db.session.delete(user)
db.session.commit() db.session.commit()

View File

@@ -146,3 +146,34 @@ class Job(db.Model):
unique=False, unique=False,
nullable=True, nullable=True,
onupdate=datetime.datetime.now) onupdate=datetime.datetime.now)
class VerifyCode(db.Model):
__tablename__ = 'verify_codes'
code_types = ['email', 'sms']
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey('users.id'), index=True, nullable=False)
user = db.relationship('User', backref=db.backref('verify_codes', lazy='dynamic'))
_code = db.Column(db.String, nullable=False)
code_type = db.Column(db.Enum(*code_types, name='verify_code_types'), index=False, unique=False, nullable=False)
expiry_datetime = db.Column(db.DateTime, nullable=False)
code_used = db.Column(db.Boolean, default=False)
created_at = db.Column(
db.DateTime,
index=False,
unique=False,
nullable=False,
default=datetime.datetime.now)
@property
def code(self):
raise AttributeError("Code not readable")
@code.setter
def code(self, cde):
self._code = hashpw(cde)
def check_code(self, cde):
return check_hash(cde, self._code)

View File

@@ -10,7 +10,9 @@ from . import models
class UserSchema(ma.ModelSchema): class UserSchema(ma.ModelSchema):
class Meta: class Meta:
model = models.User model = models.User
exclude = ("updated_at", "created_at", "user_to_service", "_password") exclude = (
"updated_at", "created_at", "user_to_service",
"_password", "verify_codes")
# TODO process users list, to return a list of user.id # TODO process users list, to return a list of user.id
@@ -38,6 +40,12 @@ class JobSchema(ma.ModelSchema):
model = models.Job model = models.Job
class VerifyCodeSchema(ma.ModelSchema):
class Meta:
model = models.VerifyCode
exclude = ('user', "_code", "expiry_datetime", "code_used", "created_at")
user_schema = UserSchema() user_schema = UserSchema()
users_schema = UserSchema(many=True) users_schema = UserSchema(many=True)
service_schema = ServiceSchema() service_schema = ServiceSchema()
@@ -48,3 +56,4 @@ api_key_schema = ApiKeySchema()
api_keys_schema = ApiKeySchema(many=True) api_keys_schema = ApiKeySchema(many=True)
job_schema = JobSchema() job_schema = JobSchema()
jobs_schema = JobSchema(many=True) jobs_schema = JobSchema(many=True)
verify_code_schema = VerifyCodeSchema()

View File

@@ -1,15 +1,19 @@
from datetime import datetime
from flask import (jsonify, request) from flask import (jsonify, request)
from sqlalchemy.exc import DataError from sqlalchemy.exc import DataError
from sqlalchemy.orm.exc import NoResultFound from sqlalchemy.orm.exc import NoResultFound
from app.dao.services_dao import get_model_services from app.dao.services_dao import get_model_services
from app.dao.users_dao import ( from app.dao.users_dao import (
get_model_users, save_model_user, delete_model_user) get_model_users, save_model_user, delete_model_user,
create_user_code, get_user_code, use_user_code,
create_secret_code)
from app.schemas import ( from app.schemas import (
user_schema, users_schema, service_schema, services_schema) user_schema, users_schema, service_schema, services_schema,
verify_code_schema)
from app import db from app import db
from flask import Blueprint from flask import Blueprint
user = Blueprint('user', __name__) user = Blueprint('user', __name__)
@@ -57,19 +61,81 @@ def verify_user_password(user_id):
return jsonify(result="error", message="Invalid user id"), 400 return jsonify(result="error", message="Invalid user id"), 400
except NoResultFound: except NoResultFound:
return jsonify(result="error", message="User not found"), 404 return jsonify(result="error", message="User not found"), 404
text_pwd = None txt_pwd = None
try: try:
text_pwd = request.get_json()['password'] txt_pwd = request.get_json()['password']
except KeyError: except KeyError:
return jsonify( return jsonify(
result="error", result="error",
message={'password': ['Required field missing data']}), 400 message={'password': ['Required field missing data']}), 400
if user.check_password(text_pwd): if user.check_password(txt_pwd):
return jsonify(), 204 return jsonify(), 204
else: else:
return jsonify(result='error', message={'password': ['Incorrect password']}), 400 return jsonify(result='error', message={'password': ['Incorrect password']}), 400
@user.route('/<int:user_id>/verify/code', methods=['POST'])
def verify_user_code(user_id):
try:
user = get_model_users(user_id=user_id)
except DataError:
return jsonify(result="error", message="Invalid user id"), 400
except NoResultFound:
return jsonify(result="error", message="User not found"), 404
txt_code = None
resp_json = request.get_json()
txt_type = None
errors = {}
try:
txt_code = resp_json['code']
except KeyError:
errors.update({'code': ['Required field missing data']})
try:
txt_type = resp_json['code_type']
except KeyError:
errors.update({'code_type': ['Required field missing data']})
if errors:
return jsonify(result="error", message=errors), 400
code = get_user_code(user, txt_code, txt_type)
if not code:
return jsonify(result="error", message="Code not found"), 404
if datetime.now() > code.expiry_datetime or code.code_used:
return jsonify(result="error", message="Code has expired"), 400
use_user_code(code.id)
return jsonify(), 204
@user.route('/<int:user_id>/code/', methods=['POST'])
def send_user_code(user_id):
try:
user = get_model_users(user_id=user_id)
except DataError:
return jsonify(result="error", message="Invalid user id"), 400
except NoResultFound:
return jsonify(result="error", message="User not found"), 404
text_pwd = None
verify_code, errors = verify_code_schema.load(request.get_json())
if errors:
return jsonify(result="error", message=errors), 400
code = create_user_code(
user, create_secret_code(), verify_code.code_type)
# TODO this will need to fixed up when we stop using
# notify_alpha_client
if verify_code.code_type == 'sms':
notify_alpha_client.send_sms(
mobile_number=user.mobile_number,
message=code.code)
elif verify_code.code_type == 'email':
notify_alpha_client.send_email(
user.email_address,
code.code,
'notify@digital.cabinet-office.gov.uk',
'Verification code')
else:
abort(500)
return jsonify(), 204
@user.route('/<int:user_id>', methods=['GET']) @user.route('/<int:user_id>', methods=['GET'])
@user.route('/', methods=['GET']) @user.route('/', methods=['GET'])
def get_user(user_id=None): def get_user(user_id=None):

View File

@@ -0,0 +1,38 @@
"""empty message
Revision ID: 0008_add_verify_codes
Revises: 0007_change_to_api_keys
Create Date: 2016-01-21 16:59:05.818017
"""
# revision identifiers, used by Alembic.
revision = '0008_add_verify_codes'
down_revision = '0007_change_to_api_keys'
from alembic import op
import sqlalchemy as sa
def upgrade():
### commands auto generated by Alembic - please adjust! ###
op.create_table('verify_codes',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('user_id', sa.Integer(), nullable=False),
sa.Column('_code', sa.String(), nullable=False),
sa.Column('code_type', sa.Enum('email', 'sms', name='verify_code_types'), nullable=False),
sa.Column('expiry_datetime', sa.DateTime(), nullable=False),
sa.Column('code_used', sa.Boolean(), nullable=True),
sa.Column('created_at', sa.DateTime(), nullable=False),
sa.ForeignKeyConstraint(['user_id'], ['users.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_verify_codes_user_id'), 'verify_codes', ['user_id'], unique=False)
### end Alembic commands ###
def downgrade():
### commands auto generated by Alembic - please adjust! ###
op.drop_index(op.f('ix_verify_codes_user_id'), table_name='verify_codes')
op.drop_table('verify_codes')
### end Alembic commands ###

View File

@@ -13,7 +13,7 @@ itsdangerous==0.24
Flask-Bcrypt==0.6.2 Flask-Bcrypt==0.6.2
credstash==1.8.0 credstash==1.8.0
git+https://github.com/alphagov/notifications-python-client.git@0.1.9#egg=notifications-python-client==0.1.9 git+https://github.com/alphagov/notifications-python-client.git@0.2.0#egg=notifications-python-client==0.2.0
git+https://github.com/alphagov/notifications-utils.git@0.0.3#egg=notifications-utils==0.0.3 git+https://github.com/alphagov/notifications-utils.git@0.0.3#egg=notifications-utils==0.0.3

View File

@@ -1,6 +1,6 @@
import pytest import pytest
from app.models import (User, Service, Template, ApiKey, Job) from app.models import (User, Service, Template, ApiKey, Job, VerifyCode)
from app.dao.users_dao import (save_model_user) from app.dao.users_dao import (save_model_user, create_user_code, create_secret_code)
from app.dao.services_dao import save_model_service from app.dao.services_dao import save_model_service
from app.dao.templates_dao import save_model_template from app.dao.templates_dao import save_model_template
from app.dao.api_key_dao import save_model_api_key from app.dao.api_key_dao import save_model_api_key
@@ -19,9 +19,49 @@ def sample_user(notify_db,
'mobile_number': '+44 7700 900986', 'mobile_number': '+44 7700 900986',
'state': 'active' 'state': 'active'
} }
user = User(**data) usr = User.query.filter_by(email_address=email).first()
save_model_user(user) if not usr:
return user usr = User(**data)
save_model_user(usr)
return usr
def create_code(notify_db, notify_db_session, code_type, usr=None, code=None):
if code is None:
code = create_secret_code()
if usr is None:
usr = sample_user(notify_db, notify_db_session)
return create_user_code(usr, code, code_type), code
@pytest.fixture(scope='function')
def sample_email_code(notify_db,
notify_db_session,
code=None,
code_type="email",
usr=None):
code, txt_code = create_code(notify_db,
notify_db_session,
code_type,
usr=usr,
code=code)
code.txt_code = txt_code
return code
@pytest.fixture(scope='function')
def sample_sms_code(notify_db,
notify_db_session,
code=None,
code_type="sms",
usr=None):
code, txt_code = create_code(notify_db,
notify_db_session,
code_type,
usr=usr,
code=code)
code.txt_code = txt_code
return code
@pytest.fixture(scope='function') @pytest.fixture(scope='function')

View File

@@ -359,76 +359,3 @@ def test_delete_user_not_exists(notify_api, notify_db, notify_db_session, sample
headers=[('Content-Type', 'application/json'), auth_header]) headers=[('Content-Type', 'application/json'), auth_header])
assert resp.status_code == 404 assert resp.status_code == 404
assert User.query.count() == 2 assert User.query.count() == 2
def test_user_verify_password(notify_api,
notify_db,
notify_db_session,
sample_user,
sample_admin_service_id):
"""
Tests POST endpoint '/<user_id>/verify/password'
"""
with notify_api.test_request_context():
with notify_api.test_client() as client:
data = json.dumps({'password': 'password'})
auth_header = create_authorization_header(
service_id=sample_admin_service_id,
path=url_for('user.verify_user_password', user_id=sample_user.id),
method='POST',
request_body=data)
resp = client.post(
url_for('user.verify_user_password', user_id=sample_user.id),
data=data,
headers=[('Content-Type', 'application/json'), auth_header])
assert resp.status_code == 204
def test_user_verify_password_invalid_password(notify_api,
notify_db,
notify_db_session,
sample_user,
sample_admin_service_id):
"""
Tests POST endpoint '/<user_id>/verify/password' invalid endpoint.
"""
with notify_api.test_request_context():
with notify_api.test_client() as client:
data = json.dumps({'password': 'bad password'})
auth_header = create_authorization_header(
service_id=sample_admin_service_id,
path=url_for('user.verify_user_password', user_id=sample_user.id),
method='POST',
request_body=data)
resp = client.post(
url_for('user.verify_user_password', user_id=sample_user.id),
data=data,
headers=[('Content-Type', 'application/json'), auth_header])
assert resp.status_code == 400
json_resp = json.loads(resp.get_data(as_text=True))
assert 'Incorrect password' in json_resp['message']['password']
def test_user_verify_password_missing_password(notify_api,
notify_db,
notify_db_session,
sample_user,
sample_admin_service_id):
"""
Tests POST endpoint '/<user_id>/verify/password' missing password.
"""
with notify_api.test_request_context():
with notify_api.test_client() as client:
data = json.dumps({'bingo': 'bongo'})
auth_header = create_authorization_header(
service_id=sample_admin_service_id,
path=url_for('user.verify_user_password', user_id=sample_user.id),
method='POST',
request_body=data)
resp = client.post(
url_for('user.verify_user_password', user_id=sample_user.id),
data=data,
headers=[('Content-Type', 'application/json'), auth_header])
assert resp.status_code == 400
json_resp = json.loads(resp.get_data(as_text=True))
assert 'Required field missing data' in json_resp['message']['password']

View File

@@ -0,0 +1,216 @@
import json
from datetime import (datetime, timedelta)
from flask import url_for
from app.models import (User, Service, VerifyCode)
from app import db
from tests import create_authorization_header
def test_user_verify_code_sms(notify_api,
notify_db,
notify_db_session,
sample_admin_service_id,
sample_sms_code):
"""
Tests POST endpoint '/<user_id>/verify/code'
"""
with notify_api.test_request_context():
with notify_api.test_client() as client:
assert not VerifyCode.query.first().code_used
data = json.dumps({
'code_type': sample_sms_code.code_type,
'code': sample_sms_code.txt_code})
auth_header = create_authorization_header(
service_id=sample_admin_service_id,
path=url_for('user.verify_user_code', user_id=sample_sms_code.user.id),
method='POST',
request_body=data)
resp = client.post(
url_for('user.verify_user_code', user_id=sample_sms_code.user.id),
data=data,
headers=[('Content-Type', 'application/json'), auth_header])
assert resp.status_code == 204
assert VerifyCode.query.first().code_used
def test_user_verify_code_sms_missing_code(notify_api,
notify_db,
notify_db_session,
sample_admin_service_id,
sample_sms_code):
"""
Tests POST endpoint '/<user_id>/verify/code'
"""
with notify_api.test_request_context():
with notify_api.test_client() as client:
assert not VerifyCode.query.first().code_used
data = json.dumps({'code_type': sample_sms_code.code_type})
auth_header = create_authorization_header(
service_id=sample_admin_service_id,
path=url_for('user.verify_user_code', user_id=sample_sms_code.user.id),
method='POST',
request_body=data)
resp = client.post(
url_for('user.verify_user_code', user_id=sample_sms_code.user.id),
data=data,
headers=[('Content-Type', 'application/json'), auth_header])
assert resp.status_code == 400
assert not VerifyCode.query.first().code_used
def test_user_verify_code_email(notify_api,
notify_db,
notify_db_session,
sample_admin_service_id,
sample_email_code):
"""
Tests POST endpoint '/<user_id>/verify/code'
"""
with notify_api.test_request_context():
with notify_api.test_client() as client:
assert not VerifyCode.query.first().code_used
data = json.dumps({
'code_type': sample_email_code.code_type,
'code': sample_email_code.txt_code})
auth_header = create_authorization_header(
service_id=sample_admin_service_id,
path=url_for('user.verify_user_code', user_id=sample_email_code.user.id),
method='POST',
request_body=data)
resp = client.post(
url_for('user.verify_user_code', user_id=sample_email_code.user.id),
data=data,
headers=[('Content-Type', 'application/json'), auth_header])
assert resp.status_code == 204
assert VerifyCode.query.first().code_used
def test_user_verify_code_email_bad_code(notify_api,
notify_db,
notify_db_session,
sample_admin_service_id,
sample_email_code):
"""
Tests POST endpoint '/<user_id>/verify/code'
"""
with notify_api.test_request_context():
with notify_api.test_client() as client:
assert not VerifyCode.query.first().code_used
data = json.dumps({
'code_type': sample_email_code.code_type,
'code': "blah"})
auth_header = create_authorization_header(
service_id=sample_admin_service_id,
path=url_for('user.verify_user_code', user_id=sample_email_code.user.id),
method='POST',
request_body=data)
resp = client.post(
url_for('user.verify_user_code', user_id=sample_email_code.user.id),
data=data,
headers=[('Content-Type', 'application/json'), auth_header])
assert resp.status_code == 404
assert not VerifyCode.query.first().code_used
def test_user_verify_code_email_expired_code(notify_api,
notify_db,
notify_db_session,
sample_admin_service_id,
sample_email_code):
"""
Tests POST endpoint '/<user_id>/verify/code'
"""
with notify_api.test_request_context():
with notify_api.test_client() as client:
assert not VerifyCode.query.first().code_used
sample_email_code.expiry_datetime = (
datetime.now() - timedelta(hours=1))
db.session.add(sample_email_code)
db.session.commit()
data = json.dumps({
'code_type': sample_email_code.code_type,
'code': sample_email_code.txt_code})
auth_header = create_authorization_header(
service_id=sample_admin_service_id,
path=url_for('user.verify_user_code', user_id=sample_email_code.user.id),
method='POST',
request_body=data)
resp = client.post(
url_for('user.verify_user_code', user_id=sample_email_code.user.id),
data=data,
headers=[('Content-Type', 'application/json'), auth_header])
assert resp.status_code == 400
assert not VerifyCode.query.first().code_used
def test_user_verify_password(notify_api,
notify_db,
notify_db_session,
sample_user,
sample_admin_service_id):
"""
Tests POST endpoint '/<user_id>/verify/password'
"""
with notify_api.test_request_context():
with notify_api.test_client() as client:
data = json.dumps({'password': 'password'})
auth_header = create_authorization_header(
service_id=sample_admin_service_id,
path=url_for('user.verify_user_password', user_id=sample_user.id),
method='POST',
request_body=data)
resp = client.post(
url_for('user.verify_user_password', user_id=sample_user.id),
data=data,
headers=[('Content-Type', 'application/json'), auth_header])
assert resp.status_code == 204
def test_user_verify_password_invalid_password(notify_api,
notify_db,
notify_db_session,
sample_user,
sample_admin_service_id):
"""
Tests POST endpoint '/<user_id>/verify/password' invalid endpoint.
"""
with notify_api.test_request_context():
with notify_api.test_client() as client:
data = json.dumps({'password': 'bad password'})
auth_header = create_authorization_header(
service_id=sample_admin_service_id,
path=url_for('user.verify_user_password', user_id=sample_user.id),
method='POST',
request_body=data)
resp = client.post(
url_for('user.verify_user_password', user_id=sample_user.id),
data=data,
headers=[('Content-Type', 'application/json'), auth_header])
assert resp.status_code == 400
json_resp = json.loads(resp.get_data(as_text=True))
assert 'Incorrect password' in json_resp['message']['password']
def test_user_verify_password_missing_password(notify_api,
notify_db,
notify_db_session,
sample_user,
sample_admin_service_id):
"""
Tests POST endpoint '/<user_id>/verify/password' missing password.
"""
with notify_api.test_request_context():
with notify_api.test_client() as client:
data = json.dumps({'bingo': 'bongo'})
auth_header = create_authorization_header(
service_id=sample_admin_service_id,
path=url_for('user.verify_user_password', user_id=sample_user.id),
method='POST',
request_body=data)
resp = client.post(
url_for('user.verify_user_password', user_id=sample_user.id),
data=data,
headers=[('Content-Type', 'application/json'), auth_header])
assert resp.status_code == 400
json_resp = json.loads(resp.get_data(as_text=True))
assert 'Required field missing data' in json_resp['message']['password']