Implementation of the new_password endpoint.

Found a way to create the token that does not need to persist it to the database.
This requires proper error messages, written by people who speak menglis good.
This commit is contained in:
Rebecca Law
2016-01-07 17:13:49 +00:00
parent 8057a138a8
commit a860f713d2
17 changed files with 87 additions and 156 deletions

View File

@@ -20,6 +20,7 @@
@import '../govuk_elements/public/sass/elements/details'; @import '../govuk_elements/public/sass/elements/details';
@import '../govuk_elements/public/sass/elements/elements-typography'; @import '../govuk_elements/public/sass/elements/elements-typography';
@import '../govuk_elements/public/sass/elements/forms'; @import '../govuk_elements/public/sass/elements/forms';
@import '../govuk_elements/public/sass/elements/forms/form-validation';
@import '../govuk_elements/public/sass/elements/forms/form-block-labels'; @import '../govuk_elements/public/sass/elements/forms/form-block-labels';
@import '../govuk_elements/public/sass/elements/icons'; @import '../govuk_elements/public/sass/elements/icons';
@import '../govuk_elements/public/sass/elements/layout'; @import '../govuk_elements/public/sass/elements/layout';

View File

@@ -1,19 +0,0 @@
from datetime import datetime, timedelta
from app import db
from app.models import PasswordResetToken
def insert(token, user_id):
password_reset_token = PasswordResetToken(token=token,
user_id=user_id,
expiry_date=datetime.now() + timedelta(hours=1))
insert_token(password_reset_token)
def insert_token(password_reset_token):
db.session.add(password_reset_token)
db.session.commit()
def get_token(token):
return PasswordResetToken.query.filter_by(token=token).first()

View File

@@ -59,12 +59,13 @@ def update_mobile_number(id, mobile_number):
db.session.commit() db.session.commit()
def update_password(id, password): def update_password(email, password):
user = get_user_by_id(id) user = get_user_by_email(email)
user.password = hashpw(password) user.password = hashpw(password)
user.password_changed_at = datetime.now() user.password_changed_at = datetime.now()
db.session.add(user) db.session.add(user)
db.session.commit() db.session.commit()
return user
def find_all_email_address(): def find_all_email_address():

View File

@@ -24,7 +24,6 @@ verify_code = '^\d{5}$'
class RegisterUserForm(Form): class RegisterUserForm(Form):
def __init__(self, existing_email_addresses, existing_mobile_numbers, *args, **kwargs): def __init__(self, existing_email_addresses, existing_mobile_numbers, *args, **kwargs):
self.existing_emails = existing_email_addresses self.existing_emails = existing_email_addresses
self.existing_mobiles = existing_mobile_numbers self.existing_mobiles = existing_mobile_numbers
@@ -125,10 +124,6 @@ class AddServiceForm(Form):
class ForgotPasswordForm(Form): class ForgotPasswordForm(Form):
def __init__(self, email_addresses, *args, **kargs):
self.email_addresses = email_addresses
super(ForgotPasswordForm, self).__init__(*args, **kargs)
email_address = StringField('Email address', email_address = StringField('Email address',
validators=[Length(min=5, max=255), validators=[Length(min=5, max=255),
DataRequired(message='Email cannot be empty'), DataRequired(message='Email cannot be empty'),
@@ -136,10 +131,6 @@ class ForgotPasswordForm(Form):
Regexp(regex=gov_uk_email, message='Please enter a gov.uk email address') Regexp(regex=gov_uk_email, message='Please enter a gov.uk email address')
]) ])
def validate_email_address(self, a):
if self.email_address.data not in self.email_addresses:
raise ValidationError('Please enter the email address that you registered with')
class NewPasswordForm(Form): class NewPasswordForm(Form):
new_password = StringField('Create a password', new_password = StringField('Create a password',

View File

@@ -1,12 +1,11 @@
import traceback import traceback
import uuid
from random import randint from random import randint
from flask import url_for from flask import url_for, current_app
from app import admin_api_client from app import admin_api_client
from app.main.dao import verify_codes_dao
from app.main.exceptions import AdminApiClientException from app.main.exceptions import AdminApiClientException
from app.main.dao import verify_codes_dao, password_reset_token_dao
def create_verify_code(): def create_verify_code():
@@ -38,11 +37,9 @@ def send_email_code(user_id, email):
return email_code return email_code
def send_change_password_email(email, user_id): def send_change_password_email(email):
try: try:
reset_password_token = str(uuid.uuid4()).replace('-', '') link_to_change_password = url_for('.new_password', token=generate_token(email), _external=True)
link_to_change_password = url_for('.new_password', token=reset_password_token, _external=True)
password_reset_token_dao.insert(reset_password_token, user_id)
admin_api_client.send_email(email_address=email, admin_api_client.send_email(email_address=email,
from_str='notify@digital.cabinet-office.gov.uk', from_str='notify@digital.cabinet-office.gov.uk',
message=link_to_change_password, message=link_to_change_password,
@@ -51,3 +48,20 @@ def send_change_password_email(email, user_id):
except: except:
traceback.print_exc() traceback.print_exc()
raise AdminApiClientException('Exception when sending email.') raise AdminApiClientException('Exception when sending email.')
def generate_token(email):
from itsdangerous import TimestampSigner
signer = TimestampSigner(current_app.config['SECRET_KEY'])
return signer.sign(email).decode('utf8')
def check_token(token):
from itsdangerous import TimestampSigner, SignatureExpired
signer = TimestampSigner(current_app.config['SECRET_KEY'])
try:
email = signer.unsign(token, max_age=current_app.config['TOKEN_MAX_AGE_SECONDS'])
return email
except SignatureExpired as e:
current_app.logger.info('token expired %s' % e)
return None

View File

@@ -1,5 +1,4 @@
from flask import render_template from flask import render_template, flash
from app.main import main from app.main import main
from app.main.dao import users_dao from app.main.dao import users_dao
from app.main.forms import ForgotPasswordForm from app.main.forms import ForgotPasswordForm
@@ -8,10 +7,13 @@ from app.main.views import send_change_password_email
@main.route('/forgot-password', methods=['GET', 'POST']) @main.route('/forgot-password', methods=['GET', 'POST'])
def forgot_password(): def forgot_password():
form = ForgotPasswordForm(users_dao.find_all_email_address()) form = ForgotPasswordForm()
if form.validate_on_submit(): if form.validate_on_submit():
user = users_dao.get_user_by_email(form.email_address.data) if users_dao.get_user_by_email(form.email_address.data):
send_change_password_email(form.email_address.data, user.id) send_change_password_email(form.email_address.data)
return render_template('views/password-reset-sent.html') return render_template('views/password-reset-sent.html')
else:
flash('The email address is not recognized. Try again.')
return render_template('views/forgot-password.html', form=form)
else: else:
return render_template('views/forgot-password.html', form=form) return render_template('views/forgot-password.html', form=form)

View File

@@ -1,26 +1,26 @@
from datetime import datetime from datetime import datetime
from flask import (Markup, render_template, url_for, redirect) from flask import (render_template, url_for, redirect, flash, current_app)
from app.main import main from app.main import main
from app.main.dao import (password_reset_token_dao, users_dao) from app.main.dao import users_dao
from app.main.forms import NewPasswordForm from app.main.forms import NewPasswordForm
from app.main.views import send_sms_code from app.main.views import send_sms_code, check_token
@main.route('/new-password/<path:token>', methods=['GET', 'POST']) @main.route('/new-password/<path:token>', methods=['GET', 'POST'])
def new_password(token): def new_password(token):
form = NewPasswordForm() form = NewPasswordForm()
if form.validate_on_submit(): if form.validate_on_submit():
password_reset_token = password_reset_token_dao.get_token(str(Markup.escape(token))) email_address = check_token(token)
if not valid_token(password_reset_token): if email_address:
form.new_password.errors.append('token is invalid') # Is there a better way user = users_dao.update_password(email_address.decode('utf-8'), form.new_password.data)
return render_template('views/new-password.html', form=form)
else:
users_dao.update_password(password_reset_token.user_id, form.new_password.data)
user = users_dao.get_user_by_id(password_reset_token.user_id)
send_sms_code(user.id, user.mobile_number) send_sms_code(user.id, user.mobile_number)
return redirect(url_for('main.two_factor')) return redirect(url_for('main.two_factor'))
else:
flash('expired token request again')
current_app.logger.info('we got here')
return redirect(url_for('.forgot_password'))
else: else:
return render_template('views/new-password.html', toke=token, form=form) return render_template('views/new-password.html', toke=token, form=form)

View File

@@ -111,13 +111,6 @@ class Service(db.Model):
return filter_null_value_fields(serialized) return filter_null_value_fields(serialized)
class PasswordResetToken(db.Model):
id = db.Column(db.Integer, primary_key=True)
token = db.Column(db.String, unique=True, index=True, nullable=False)
user_id = db.Column(db.Integer, db.ForeignKey('users.id'), unique=False, nullable=False)
expiry_date = db.Column(db.DateTime, nullable=False)
def filter_null_value_fields(obj): def filter_null_value_fields(obj):
return dict( return dict(
filter(lambda x: x[1] is not None, obj.items()) filter(lambda x: x[1] is not None, obj.items())

View File

@@ -43,6 +43,7 @@
</div> </div>
{% endblock %} {% endblock %}
{% set global_header_text = "GOV.UK Notify" %} {% set global_header_text = "GOV.UK Notify" %}
@@ -53,9 +54,20 @@
{% endif %} {% endif %}
{% block content %} {% block content %}
<main id="content" role="main" class="page-container"> <main id="content" role="main" class="page-container">
{% with messages = get_flashed_messages() %}
{% if messages %}
<div class="error-summary">
<ul>
{% for message in messages %}
<li>{{ message }}</li>
{% endfor %}
</ul>
</div>
{% endif %}
{% endwith %}
{% block fullwidth_content %}{% endblock %} {% block fullwidth_content %}{% endblock %}
</main> </main>
{% endblock %} {% endblock %}
{% block body_end %} {% block body_end %}

View File

@@ -2,7 +2,7 @@
<dt>{{ field.label }} <dt>{{ field.label }}
<dd>{{ field(**kwargs)|safe }} <dd>{{ field(**kwargs)|safe }}
{% if field.errors %} {% if field.errors %}
<ul class=error-summary> <ul class=error>
{% for error in field.errors %} {% for error in field.errors %}
<li>{{ error }}</li> <li>{{ error }}</li>
{% endfor %} {% endfor %}

View File

@@ -29,6 +29,7 @@ class Config(object):
SECRET_KEY = 'secret-key' SECRET_KEY = 'secret-key'
HTTP_PROTOCOL = 'http' HTTP_PROTOCOL = 'http'
DANGEROUS_SALT = 'itsdangeroussalt' DANGEROUS_SALT = 'itsdangeroussalt'
TOKEN_MAX_AGE_SECONDS = 86400
class Development(Config): class Development(Config):

View File

@@ -1,35 +0,0 @@
"""empty message
Revision ID: 80_password_reset_token
Revises: 70_unique_email
Create Date: 2016-01-05 17:47:46.395959
"""
# revision identifiers, used by Alembic.
revision = '80_password_reset_token'
down_revision = '70_unique_email'
from alembic import op
import sqlalchemy as sa
def upgrade():
### commands auto generated by Alembic - please adjust! ###
op.create_table('password_reset_token',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('token', sa.String(), nullable=False),
sa.Column('user_id', sa.Integer(), nullable=False),
sa.Column('expiry_date', sa.DateTime(), nullable=False),
sa.ForeignKeyConstraint(['user_id'], ['users.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_password_reset_token_token'), 'password_reset_token', ['token'], unique=True)
### end Alembic commands ###
def downgrade():
### commands auto generated by Alembic - please adjust! ###
op.drop_index(op.f('ix_password_reset_token_token'), table_name='password_reset_token')
op.drop_table('password_reset_token')
### end Alembic commands ###

View File

@@ -1,12 +0,0 @@
import uuid
from app.main.dao import password_reset_token_dao
from tests.app.main import create_test_user
def test_should_insert_and_return_token(notifications_admin, notifications_admin_db, notify_db_session):
user = create_test_user('active')
token_id = str(uuid.uuid4())
password_reset_token_dao.insert(token=token_id, user_id=user.id)
saved_token = password_reset_token_dao.get_token(token_id)
assert saved_token.token == token_id

View File

@@ -176,7 +176,7 @@ def test_should_update_password(notifications_admin, notifications_admin_db, not
saved = users_dao.get_user_by_id(user.id) saved = users_dao.get_user_by_id(user.id)
assert check_hash('somepassword', saved.password) assert check_hash('somepassword', saved.password)
assert saved.password_changed_at is None assert saved.password_changed_at is None
users_dao.update_password(saved.id, 'newpassword') users_dao.update_password(saved.email_address, 'newpassword')
updated = users_dao.get_user_by_id(user.id) updated = users_dao.get_user_by_id(user.id)
assert check_hash('newpassword', updated.password) assert check_hash('newpassword', updated.password)
assert updated.password_changed_at < datetime.now() assert updated.password_changed_at < datetime.now()

View File

@@ -1,13 +0,0 @@
from werkzeug.datastructures import MultiDict
from app.main.forms import ForgotPasswordForm
def test_should_return_validation_error_if_email_address_does_not_exist(notifications_admin,
notifications_admin_db,
notify_db_session):
with notifications_admin.test_request_context():
form = ForgotPasswordForm(['first@it.gov.uk', 'second@it.gov.uk'],
formdata=MultiDict([('email_address', 'not_found@it.gov.uk')]))
form.validate()
assert {'email_address': ['Please enter the email address that you registered with']} == form.errors

View File

@@ -8,16 +8,6 @@ def test_should_render_forgot_password(notifications_admin, notifications_admin_
in response.get_data(as_text=True) in response.get_data(as_text=True)
def test_should_have_validate_error_when_email_does_not_exist(notifications_admin,
notifications_admin_db,
notify_db_session):
create_test_user('active')
response = notifications_admin.test_client().post('/forgot-password',
data={'email_address': 'email_does_not@exist.gov.uk'})
assert response.status_code == 200
assert 'Please enter the email address that you registered with' in response.get_data(as_text=True)
def test_should_redirect_to_password_reset_sent(notifications_admin, def test_should_redirect_to_password_reset_sent(notifications_admin,
notifications_admin_db, notifications_admin_db,
mocker, mocker,

View File

@@ -1,19 +1,13 @@
from datetime import datetime, timedelta from app.main.dao import users_dao
from app.main.dao import password_reset_token_dao, users_dao
from app.models import PasswordResetToken
from tests.app.main import create_test_user
from app.main.encryption import check_hash from app.main.encryption import check_hash
from app.main.views import generate_token
from tests.app.main import create_test_user
def test_should_render_new_password_template(notifications_admin, notifications_admin_db, notify_db_session): def test_should_render_new_password_template(notifications_admin, notifications_admin_db, notify_db_session):
with notifications_admin.test_request_context(): response = notifications_admin.test_client().get('/new-password/some_token')
with notifications_admin.test_client() as client: assert response.status_code == 200
user = create_test_user('active') assert ' You can now create a new password for your account.' in response.get_data(as_text=True)
password_reset_token_dao.insert('some_token', user.id)
response = client.get('/new-password/some_token')
assert response.status_code == 200
assert ' You can now create a new password for your account.' in response.get_data(as_text=True)
def test_should_redirect_to_two_factor_when_password_reset_is_successful(notifications_admin, notifications_admin_db, def test_should_redirect_to_two_factor_when_password_reset_is_successful(notifications_admin, notifications_admin_db,
@@ -21,8 +15,8 @@ def test_should_redirect_to_two_factor_when_password_reset_is_successful(notific
with notifications_admin.test_request_context(): with notifications_admin.test_request_context():
with notifications_admin.test_client() as client: with notifications_admin.test_client() as client:
user = create_test_user('active') user = create_test_user('active')
password_reset_token_dao.insert('some_token', user.id) token = generate_token(user.email_address)
response = client.post('/new-password/some_token', response = client.post('/new-password/{}'.format(token),
data={'new_password': 'a-new_password'}) data={'new_password': 'a-new_password'})
assert response.status_code == 302 assert response.status_code == 302
assert response.location == 'http://localhost/two-factor' assert response.location == 'http://localhost/two-factor'
@@ -30,15 +24,26 @@ def test_should_redirect_to_two_factor_when_password_reset_is_successful(notific
assert check_hash('a-new_password', saved_user.password) assert check_hash('a-new_password', saved_user.password)
def test_should_return_validation_error_that_token_is_expired(notifications_admin, notifications_admin_db, def test_should_redirect_to_forgot_password_with_flash_message_when_token_is_expired(notifications_admin,
notify_db_session): notifications_admin_db,
notify_db_session):
with notifications_admin.test_request_context(): with notifications_admin.test_request_context():
with notifications_admin.test_client() as client: with notifications_admin.test_client() as client:
notifications_admin.config['TOKEN_MAX_AGE_SECONDS'] = -1000
user = create_test_user('active') user = create_test_user('active')
expired_token = PasswordResetToken(id=1, token='some_token', user_id=user.id, token = generate_token(user.email_address)
expiry_date=datetime.now() + timedelta(hours=-2)) response = client.post('/new-password/{}'.format(token),
password_reset_token_dao.insert_token(expired_token)
response = client.post('/new-password/some_token',
data={'new_password': 'a-new_password'}) data={'new_password': 'a-new_password'})
assert response.status_code == 200 assert response.status_code == 302
assert 'token is invalid' in response.get_data(as_text=True) assert response.location == 'http://localhost/forgot-password'
notifications_admin.config['TOKEN_MAX_AGE_SECONDS'] = 86400
def test_should_return_500_error_page_when_email_addres_does_not_exist(notifications_admin, notifications_admin_db,
notify_db_session):
with notifications_admin.test_request_context():
with notifications_admin.test_client() as client:
token = generate_token('doesnotexist@it.gov.uk')
response = client.post('/new-password/{}'.format(token),
data={'new_password': 'a-new_password'})
assert response.status_code == 500