mirror of
https://github.com/GSA/notifications-admin.git
synced 2026-02-06 03:13:42 -05:00
Removed exceptions, found a better way to handle them.
Refactored the forms so that fields like email_address can be used in multiple forms. Refactored form validation so that a query function is passed into the form to be run, this way the form is not exposed to the dao layer and the query is more efficient. This PR still requires some frontend attention. Will work with Chris to update the templates.
This commit is contained in:
@@ -3,7 +3,6 @@ from datetime import datetime
|
||||
from sqlalchemy.orm import load_only
|
||||
|
||||
from app import db, login_manager
|
||||
from app.main.exceptions import NoDataFoundException
|
||||
from app.models import User
|
||||
from app.main.encryption import hashpw
|
||||
|
||||
@@ -60,16 +59,11 @@ def update_mobile_number(id, mobile_number):
|
||||
db.session.commit()
|
||||
|
||||
|
||||
def update_password(email, password):
|
||||
user = get_user_by_email(email)
|
||||
if user:
|
||||
user.password = hashpw(password)
|
||||
user.password_changed_at = datetime.now()
|
||||
db.session.add(user)
|
||||
db.session.commit()
|
||||
return user
|
||||
else:
|
||||
raise NoDataFoundException('The user does not exist')
|
||||
def update_password(user, password):
|
||||
user.password = hashpw(password)
|
||||
user.password_changed_at = datetime.now()
|
||||
db.session.add(user)
|
||||
db.session.commit()
|
||||
|
||||
|
||||
def find_all_email_address():
|
||||
|
||||
@@ -1,8 +0,0 @@
|
||||
class AdminApiClientException(Exception):
|
||||
def __init__(self, message):
|
||||
self.value = message
|
||||
|
||||
|
||||
class NoDataFoundException(Exception):
|
||||
def __init__(self, message):
|
||||
self.value = message
|
||||
@@ -57,9 +57,8 @@ class LoginForm(Form):
|
||||
|
||||
|
||||
class RegisterUserForm(Form):
|
||||
def __init__(self, existing_email_addresses, existing_mobile_numbers, *args, **kwargs):
|
||||
def __init__(self, existing_email_addresses, *args, **kwargs):
|
||||
self.existing_emails = existing_email_addresses
|
||||
self.existing_mobiles = existing_mobile_numbers
|
||||
super(RegisterUserForm, self).__init__(*args, **kwargs)
|
||||
|
||||
name = StringField('Full name',
|
||||
@@ -70,16 +69,9 @@ class RegisterUserForm(Form):
|
||||
|
||||
def validate_email_address(self, field):
|
||||
# Validate email address is unique.
|
||||
if field.data in self.existing_emails:
|
||||
if self.existing_emails(field.data):
|
||||
raise ValidationError('Email address already exists')
|
||||
|
||||
def validate_mobile_number(self, field):
|
||||
# Validate mobile number is unique
|
||||
# Code to re-added later
|
||||
# if field.data in self.existing_mobiles:
|
||||
# raise ValidationError('Mobile number already exists')
|
||||
pass
|
||||
|
||||
|
||||
class TwoFactorForm(Form):
|
||||
def __init__(self, user_codes, *args, **kwargs):
|
||||
@@ -132,6 +124,14 @@ class AddServiceForm(Form):
|
||||
class ForgotPasswordForm(Form):
|
||||
email_address = email_address()
|
||||
|
||||
def __init__(self, q, *args, **kwargs):
|
||||
self.query_function = q
|
||||
super(ForgotPasswordForm, self).__init__(*args, *kwargs)
|
||||
|
||||
def validate_email_address(self, a):
|
||||
if not self.query_function(a.data):
|
||||
raise ValidationError('The email address is not recognized. Enter the password you registered with.')
|
||||
|
||||
|
||||
class NewPasswordForm(Form):
|
||||
new_password = password()
|
||||
|
||||
@@ -1,11 +1,9 @@
|
||||
import traceback
|
||||
from random import randint
|
||||
|
||||
from flask import url_for, current_app
|
||||
|
||||
from app import admin_api_client
|
||||
from app.main.dao import verify_codes_dao
|
||||
from app.main.exceptions import AdminApiClientException
|
||||
|
||||
|
||||
def create_verify_code():
|
||||
@@ -14,40 +12,30 @@ def create_verify_code():
|
||||
|
||||
def send_sms_code(user_id, mobile_number):
|
||||
sms_code = create_verify_code()
|
||||
try:
|
||||
verify_codes_dao.add_code(user_id=user_id, code=sms_code, code_type='sms')
|
||||
admin_api_client.send_sms(mobile_number=mobile_number, message=sms_code, token=admin_api_client.auth_token)
|
||||
except:
|
||||
raise AdminApiClientException('Exception when sending sms.')
|
||||
verify_codes_dao.add_code(user_id=user_id, code=sms_code, code_type='sms')
|
||||
admin_api_client.send_sms(mobile_number=mobile_number, message=sms_code, token=admin_api_client.auth_token)
|
||||
|
||||
return sms_code
|
||||
|
||||
|
||||
def send_email_code(user_id, email):
|
||||
email_code = create_verify_code()
|
||||
try:
|
||||
verify_codes_dao.add_code(user_id=user_id, code=email_code, code_type='email')
|
||||
admin_api_client.send_email(email_address=email,
|
||||
from_str='notify@digital.cabinet-office.gov.uk',
|
||||
message=email_code,
|
||||
subject='Verification code',
|
||||
token=admin_api_client.auth_token)
|
||||
except:
|
||||
raise AdminApiClientException('Exception when sending email.')
|
||||
|
||||
verify_codes_dao.add_code(user_id=user_id, code=email_code, code_type='email')
|
||||
admin_api_client.send_email(email_address=email,
|
||||
from_str='notify@digital.cabinet-office.gov.uk',
|
||||
message=email_code,
|
||||
subject='Verification code',
|
||||
token=admin_api_client.auth_token)
|
||||
return email_code
|
||||
|
||||
|
||||
def send_change_password_email(email):
|
||||
try:
|
||||
link_to_change_password = url_for('.new_password', token=generate_token(email), _external=True)
|
||||
admin_api_client.send_email(email_address=email,
|
||||
from_str='notify@digital.cabinet-office.gov.uk',
|
||||
message=link_to_change_password,
|
||||
subject='Reset password for GOV.UK Notify',
|
||||
token=admin_api_client.auth_token)
|
||||
except:
|
||||
traceback.print_exc()
|
||||
raise AdminApiClientException('Exception when sending email.')
|
||||
link_to_change_password = url_for('.new_password', token=generate_token(email), _external=True)
|
||||
admin_api_client.send_email(email_address=email,
|
||||
from_str='notify@digital.cabinet-office.gov.uk',
|
||||
message=link_to_change_password,
|
||||
subject='Reset password for GOV.UK Notify',
|
||||
token=admin_api_client.auth_token)
|
||||
|
||||
|
||||
def generate_token(email):
|
||||
@@ -64,4 +52,3 @@ def check_token(token):
|
||||
return email
|
||||
except SignatureExpired as e:
|
||||
current_app.logger.info('token expired %s' % e)
|
||||
return None
|
||||
|
||||
@@ -7,13 +7,9 @@ from app.main.views import send_change_password_email
|
||||
|
||||
@main.route('/forgot-password', methods=['GET', 'POST'])
|
||||
def forgot_password():
|
||||
form = ForgotPasswordForm()
|
||||
form = ForgotPasswordForm(users_dao.get_user_by_email)
|
||||
if form.validate_on_submit():
|
||||
if users_dao.get_user_by_email(form.email_address.data):
|
||||
send_change_password_email(form.email_address.data)
|
||||
return render_template('views/password-reset-sent.html')
|
||||
else:
|
||||
flash('The email address is not recognized. Enter the password you registered with.')
|
||||
return render_template('views/forgot-password.html', form=form)
|
||||
send_change_password_email(form.email_address.data)
|
||||
return render_template('views/password-reset-sent.html')
|
||||
else:
|
||||
return render_template('views/forgot-password.html', form=form)
|
||||
|
||||
@@ -1,6 +1,4 @@
|
||||
from datetime import datetime
|
||||
|
||||
from flask import (render_template, url_for, redirect, flash, current_app)
|
||||
from flask import (render_template, url_for, redirect, flash)
|
||||
|
||||
from app.main import main
|
||||
from app.main.dao import users_dao
|
||||
@@ -10,20 +8,18 @@ from app.main.views import send_sms_code, check_token
|
||||
|
||||
@main.route('/new-password/<path:token>', methods=['GET', 'POST'])
|
||||
def new_password(token):
|
||||
email_address = check_token(token)
|
||||
if not email_address:
|
||||
flash('The token we sent you has expired. Enter your email address to try again.')
|
||||
return redirect(url_for('.forgot_password'))
|
||||
|
||||
user = users_dao.get_user_by_email(email_address=email_address.decode('utf-8'))
|
||||
|
||||
form = NewPasswordForm()
|
||||
|
||||
if form.validate_on_submit():
|
||||
email_address = check_token(token)
|
||||
if email_address:
|
||||
user = users_dao.update_password(email_address.decode('utf-8'), form.new_password.data)
|
||||
send_sms_code(user.id, user.mobile_number)
|
||||
return redirect(url_for('main.two_factor'))
|
||||
else:
|
||||
flash('expired token request again')
|
||||
current_app.logger.info('we got here')
|
||||
return redirect(url_for('.forgot_password'))
|
||||
users_dao.update_password(user, form.new_password.data)
|
||||
send_sms_code(user.id, user.mobile_number)
|
||||
return redirect(url_for('main.two_factor'))
|
||||
else:
|
||||
return render_template('views/new-password.html', toke=token, form=form)
|
||||
|
||||
|
||||
def valid_token(token):
|
||||
return token and datetime.now() <= token.expiry_date
|
||||
return render_template('views/new-password.html', token=token, form=form, user=user)
|
||||
|
||||
@@ -1,12 +1,9 @@
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
from flask import render_template, redirect, jsonify, session
|
||||
from sqlalchemy.exc import SQLAlchemyError
|
||||
from flask import render_template, redirect, session
|
||||
|
||||
from app.main import main
|
||||
from app.main.dao import users_dao
|
||||
from app.main.encryption import hashpw
|
||||
from app.main.exceptions import AdminApiClientException
|
||||
from app.main.forms import RegisterUserForm
|
||||
from app.main.views import send_sms_code, send_email_code
|
||||
from app.models import User
|
||||
@@ -15,16 +12,8 @@ from app.models import User
|
||||
# TODO how do we handle duplicate unverifed email addresses?
|
||||
# malicious or otherwise.
|
||||
@main.route('/register', methods=['GET', 'POST'])
|
||||
def process_register():
|
||||
try:
|
||||
existing_emails, existing_mobiles = zip(
|
||||
*[(x.email_address, x.mobile_number) for x in
|
||||
users_dao.get_all_users()])
|
||||
except ValueError:
|
||||
# Value error is raised if the db is empty.
|
||||
existing_emails, existing_mobiles = [], []
|
||||
|
||||
form = RegisterUserForm(existing_emails, existing_mobiles)
|
||||
def register():
|
||||
form = RegisterUserForm(users_dao.get_user_by_email)
|
||||
|
||||
if form.validate_on_submit():
|
||||
user = User(name=form.name.data,
|
||||
|
||||
@@ -16,7 +16,7 @@ GOV.UK Notify
|
||||
{{ form.hidden_tag() }}
|
||||
{{ render_field(form.email_address, class='form-control-2-3') }}
|
||||
<p>
|
||||
<button class="button" href="sign-in" role="button">Send email</button>
|
||||
<button class="button" role="button">Send email</button>
|
||||
</p>
|
||||
</form>
|
||||
</div>
|
||||
|
||||
@@ -8,6 +8,7 @@ GOV.UK Notify
|
||||
|
||||
<div class="grid-row">
|
||||
<div class="column-two-thirds">
|
||||
{% if user %}
|
||||
<h1 class="heading-xlarge">Create a new password</h1>
|
||||
|
||||
<p> You can now create a new password for your account.</p>
|
||||
@@ -24,6 +25,9 @@ GOV.UK Notify
|
||||
</p>
|
||||
|
||||
</form>
|
||||
{% else %}
|
||||
Message about email address does not exist. Some one needs to figure out the words here.
|
||||
{% endif %}
|
||||
</div>
|
||||
</div>
|
||||
|
||||
|
||||
@@ -176,7 +176,7 @@ def test_should_update_password(notifications_admin, notifications_admin_db, not
|
||||
saved = users_dao.get_user_by_id(user.id)
|
||||
assert check_hash('somepassword', saved.password)
|
||||
assert saved.password_changed_at is None
|
||||
users_dao.update_password(saved.email_address, 'newpassword')
|
||||
users_dao.update_password(saved, 'newpassword')
|
||||
updated = users_dao.get_user_by_id(user.id)
|
||||
assert check_hash('newpassword', updated.password)
|
||||
assert updated.password_changed_at < datetime.now()
|
||||
|
||||
@@ -1,10 +1,9 @@
|
||||
from pytest import fail
|
||||
|
||||
from app.main.dao import users_dao
|
||||
from app.main.forms import RegisterUserForm
|
||||
|
||||
|
||||
def test_should_raise_validation_error_for_password(notifications_admin):
|
||||
form = RegisterUserForm([], [])
|
||||
form = RegisterUserForm(users_dao.get_user_by_email)
|
||||
form.name.data = 'test'
|
||||
form.email_address.data = 'teset@example.gov.uk'
|
||||
form.mobile_number.data = '+441231231231'
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
from flask import url_for
|
||||
|
||||
from app.main.views import generate_token
|
||||
from tests.app.main import create_test_user
|
||||
|
||||
|
||||
@@ -21,5 +24,20 @@ def test_should_redirect_to_password_reset_sent(notifications_admin,
|
||||
assert 'You have been sent an email containing a url to reset your password.' in response.get_data(as_text=True)
|
||||
|
||||
|
||||
def test_should_redirect_to_forgot_password_with_flash_message_when_token_is_expired(notifications_admin,
|
||||
notifications_admin_db,
|
||||
notify_db_session):
|
||||
with notifications_admin.test_request_context():
|
||||
with notifications_admin.test_client() as client:
|
||||
notifications_admin.config['TOKEN_MAX_AGE_SECONDS'] = -1000
|
||||
user = create_test_user('active')
|
||||
token = generate_token(user.email_address)
|
||||
response = client.post('/new-password/{}'.format(token),
|
||||
data={'new_password': 'a-new_password'})
|
||||
assert response.status_code == 302
|
||||
assert response.location == url_for('.forgot_password', _external=True)
|
||||
notifications_admin.config['TOKEN_MAX_AGE_SECONDS'] = 86400
|
||||
|
||||
|
||||
def _set_up_mocker(mocker):
|
||||
mocker.patch("app.admin_api_client.send_email")
|
||||
|
||||
@@ -1,16 +1,30 @@
|
||||
from pytest import fail
|
||||
from flask import url_for
|
||||
|
||||
from app.main.dao import users_dao
|
||||
from app.main.encryption import check_hash
|
||||
from app.main.exceptions import NoDataFoundException
|
||||
from app.main.views import generate_token
|
||||
from tests.app.main import create_test_user
|
||||
|
||||
|
||||
def test_should_render_new_password_template(notifications_admin, notifications_admin_db, notify_db_session):
|
||||
response = notifications_admin.test_client().get('/new-password/some_token')
|
||||
assert response.status_code == 200
|
||||
assert ' You can now create a new password for your account.' in response.get_data(as_text=True)
|
||||
with notifications_admin.test_request_context():
|
||||
with notifications_admin.test_client() as client:
|
||||
user = create_test_user('active')
|
||||
token = generate_token(user.email_address)
|
||||
response = client.get('/new-password/{}'.format(token))
|
||||
assert response.status_code == 200
|
||||
assert ' You can now create a new password for your account.' in response.get_data(as_text=True)
|
||||
|
||||
|
||||
def test_should_render_new_password_template_with_message_of_bad_token(notifications_admin, notifications_admin_db,
|
||||
notify_db_session):
|
||||
with notifications_admin.test_request_context():
|
||||
with notifications_admin.test_client() as client:
|
||||
token = generate_token('no_user@d.gov.uk')
|
||||
response = client.get('/new-password/{}'.format(token))
|
||||
assert response.status_code == 200
|
||||
assert 'Message about email address does not exist. Some one needs to figure out the words here.' in \
|
||||
response.get_data(as_text=True)
|
||||
|
||||
|
||||
def test_should_redirect_to_two_factor_when_password_reset_is_successful(notifications_admin,
|
||||
@@ -25,7 +39,7 @@ def test_should_redirect_to_two_factor_when_password_reset_is_successful(notific
|
||||
response = client.post('/new-password/{}'.format(token),
|
||||
data={'new_password': 'a-new_password'})
|
||||
assert response.status_code == 302
|
||||
assert response.location == 'http://localhost/two-factor'
|
||||
assert response.location == url_for('.two_factor', _external=True)
|
||||
saved_user = users_dao.get_user_by_id(user.id)
|
||||
assert check_hash('a-new_password', saved_user.password)
|
||||
|
||||
@@ -41,23 +55,9 @@ def test_should_redirect_to_forgot_password_with_flash_message_when_token_is_exp
|
||||
response = client.post('/new-password/{}'.format(token),
|
||||
data={'new_password': 'a-new_password'})
|
||||
assert response.status_code == 302
|
||||
assert response.location == 'http://localhost/forgot-password'
|
||||
assert response.location == url_for('.forgot_password', _external=True)
|
||||
notifications_admin.config['TOKEN_MAX_AGE_SECONDS'] = 86400
|
||||
|
||||
|
||||
def test_should_return_raise_no_data_found_exception_when_email_address_does_not_exist(notifications_admin,
|
||||
notifications_admin_db,
|
||||
notify_db_session):
|
||||
with notifications_admin.test_request_context():
|
||||
with notifications_admin.test_client() as client:
|
||||
token = generate_token('doesnotexist@it.gov.uk')
|
||||
try:
|
||||
client.post('/new-password/{}'.format(token),
|
||||
data={'new_password': 'a-new_password'})
|
||||
fail('Expected NoDataFoundException')
|
||||
except NoDataFoundException:
|
||||
pass
|
||||
|
||||
|
||||
def _set_up_mocker(mocker):
|
||||
mocker.patch("app.admin_api_client.send_sms")
|
||||
|
||||
Reference in New Issue
Block a user