mirror of
https://github.com/GSA/notifications-admin.git
synced 2026-02-04 18:32:33 -05:00
Updated generate_token to use encrypt the entire url.
Created notify_client.sender to hold the methods to send notifications.
This commit is contained in:
0
app/main/notifications/__init__.py
Normal file
0
app/main/notifications/__init__.py
Normal file
@@ -1,54 +0,0 @@
|
||||
from random import randint
|
||||
|
||||
from flask import url_for, current_app
|
||||
|
||||
from app import admin_api_client
|
||||
from app.main.dao import verify_codes_dao
|
||||
|
||||
|
||||
def create_verify_code():
|
||||
return ''.join(["%s" % randint(0, 9) for _ in range(0, 5)])
|
||||
|
||||
|
||||
def send_sms_code(user_id, mobile_number):
|
||||
sms_code = create_verify_code()
|
||||
verify_codes_dao.add_code(user_id=user_id, code=sms_code, code_type='sms')
|
||||
admin_api_client.send_sms(mobile_number=mobile_number, message=sms_code, token=admin_api_client.auth_token)
|
||||
|
||||
return sms_code
|
||||
|
||||
|
||||
def send_email_code(user_id, email):
|
||||
email_code = create_verify_code()
|
||||
verify_codes_dao.add_code(user_id=user_id, code=email_code, code_type='email')
|
||||
admin_api_client.send_email(email_address=email,
|
||||
from_str='notify@digital.cabinet-office.gov.uk',
|
||||
message=email_code,
|
||||
subject='Verification code',
|
||||
token=admin_api_client.auth_token)
|
||||
return email_code
|
||||
|
||||
|
||||
def send_change_password_email(email):
|
||||
link_to_change_password = url_for('.new_password', token=generate_token(email), _external=True)
|
||||
admin_api_client.send_email(email_address=email,
|
||||
from_str='notify@digital.cabinet-office.gov.uk',
|
||||
message=link_to_change_password,
|
||||
subject='Reset password for GOV.UK Notify',
|
||||
token=admin_api_client.auth_token)
|
||||
|
||||
|
||||
def generate_token(email):
|
||||
from itsdangerous import TimestampSigner
|
||||
signer = TimestampSigner(current_app.config['SECRET_KEY'])
|
||||
return signer.sign(email).decode('utf8')
|
||||
|
||||
|
||||
def check_token(token):
|
||||
from itsdangerous import TimestampSigner, SignatureExpired
|
||||
signer = TimestampSigner(current_app.config['SECRET_KEY'])
|
||||
try:
|
||||
email = signer.unsign(token, max_age=current_app.config['TOKEN_MAX_AGE_SECONDS'])
|
||||
return email
|
||||
except SignatureExpired as e:
|
||||
current_app.logger.info('token expired %s' % e)
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
from flask import (
|
||||
render_template, redirect, jsonify, session, url_for)
|
||||
render_template, redirect, session, url_for)
|
||||
|
||||
from app.main import main
|
||||
from app.main.dao import users_dao
|
||||
from app.main.forms import EmailNotReceivedForm, TextNotReceivedForm
|
||||
from app.main.views import send_sms_code, send_email_code
|
||||
from app.notify_client.sender import send_sms_code, send_email_code
|
||||
|
||||
|
||||
@main.route('/email-not-received', methods=['GET', 'POST'])
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
from flask import render_template, flash, current_app
|
||||
from flask import render_template, current_app
|
||||
from app.main import main
|
||||
from app.main.dao import users_dao
|
||||
from app.main.forms import ForgotPasswordForm
|
||||
from app.main.views import send_change_password_email
|
||||
from app.notify_client.sender import send_change_password_email
|
||||
|
||||
|
||||
@main.route('/forgot-password', methods=['GET', 'POST'])
|
||||
|
||||
@@ -3,7 +3,7 @@ from flask import (render_template, url_for, redirect, flash)
|
||||
from app.main import main
|
||||
from app.main.dao import users_dao
|
||||
from app.main.forms import NewPasswordForm
|
||||
from app.main.views import send_sms_code, check_token
|
||||
from app.notify_client.sender import check_token, send_sms_code
|
||||
|
||||
|
||||
@main.route('/new-password/<path:token>', methods=['GET', 'POST'])
|
||||
@@ -13,7 +13,7 @@ def new_password(token):
|
||||
flash('The link in the email we sent you has expired. Enter your email address to resend.')
|
||||
return redirect(url_for('.forgot_password'))
|
||||
|
||||
user = users_dao.get_user_by_email(email_address=email_address.decode('utf-8'))
|
||||
user = users_dao.get_user_by_email(email_address=email_address)
|
||||
if user and user.state != 'request_password_reset':
|
||||
flash('The link in the email we sent you has already been used.')
|
||||
return redirect(url_for('.index'))
|
||||
|
||||
@@ -5,12 +5,14 @@ from flask import render_template, redirect, session
|
||||
from app.main import main
|
||||
from app.main.dao import users_dao
|
||||
from app.main.forms import RegisterUserForm
|
||||
from app.main.views import send_sms_code, send_email_code
|
||||
from app.models import User
|
||||
|
||||
|
||||
# TODO how do we handle duplicate unverifed email addresses?
|
||||
# malicious or otherwise.
|
||||
from app.notify_client.sender import send_sms_code, send_email_code
|
||||
|
||||
|
||||
@main.route('/register', methods=['GET', 'POST'])
|
||||
def register():
|
||||
form = RegisterUserForm(users_dao.get_user_by_email)
|
||||
|
||||
@@ -1,12 +1,12 @@
|
||||
from flask import (
|
||||
render_template, redirect, jsonify, url_for)
|
||||
render_template, redirect, url_for)
|
||||
from flask import session
|
||||
|
||||
from app.main import main
|
||||
from app.main.dao import users_dao
|
||||
from app.main.encryption import check_hash
|
||||
from app.main.forms import LoginForm
|
||||
from app.main.views import send_sms_code
|
||||
from app.notify_client.sender import send_sms_code
|
||||
|
||||
|
||||
@main.route('/sign-in', methods=(['GET', 'POST']))
|
||||
|
||||
52
app/notify_client/sender.py
Normal file
52
app/notify_client/sender.py
Normal file
@@ -0,0 +1,52 @@
|
||||
from random import randint
|
||||
from flask import url_for, current_app
|
||||
from itsdangerous import URLSafeTimedSerializer, SignatureExpired
|
||||
from app import admin_api_client
|
||||
from app.main.dao import verify_codes_dao
|
||||
|
||||
|
||||
def create_verify_code():
|
||||
return ''.join(["%s" % randint(0, 9) for _ in range(0, 5)])
|
||||
|
||||
|
||||
def send_sms_code(user_id, mobile_number):
|
||||
sms_code = create_verify_code()
|
||||
verify_codes_dao.add_code(user_id=user_id, code=sms_code, code_type='sms')
|
||||
admin_api_client.send_sms(mobile_number=mobile_number, message=sms_code, token=admin_api_client.auth_token)
|
||||
|
||||
return sms_code
|
||||
|
||||
|
||||
def send_email_code(user_id, email):
|
||||
email_code = create_verify_code()
|
||||
verify_codes_dao.add_code(user_id=user_id, code=email_code, code_type='email')
|
||||
admin_api_client.send_email(email_address=email,
|
||||
from_str='notify@digital.cabinet-office.gov.uk',
|
||||
message=email_code,
|
||||
subject='Verification code',
|
||||
token=admin_api_client.auth_token)
|
||||
return email_code
|
||||
|
||||
|
||||
def send_change_password_email(email):
|
||||
link_to_change_password = url_for('.new_password', token=generate_token(email), _external=True)
|
||||
admin_api_client.send_email(email_address=email,
|
||||
from_str='notify@digital.cabinet-office.gov.uk',
|
||||
message=link_to_change_password,
|
||||
subject='Reset password for GOV.UK Notify',
|
||||
token=admin_api_client.auth_token)
|
||||
|
||||
|
||||
def generate_token(email):
|
||||
ser = URLSafeTimedSerializer(current_app.config['SECRET_KEY'])
|
||||
return ser.dumps(email, current_app.config.get('DANGEROUS_SALT'))
|
||||
|
||||
|
||||
def check_token(token):
|
||||
ser = URLSafeTimedSerializer(current_app.config['SECRET_KEY'])
|
||||
try:
|
||||
email = ser.loads(token, max_age=current_app.config['TOKEN_MAX_AGE_SECONDS'],
|
||||
salt=current_app.config.get('DANGEROUS_SALT'))
|
||||
return email
|
||||
except SignatureExpired as e:
|
||||
current_app.logger.info('token expired %s' % e)
|
||||
@@ -29,7 +29,7 @@ class Config(object):
|
||||
SECRET_KEY = 'secret-key'
|
||||
HTTP_PROTOCOL = 'http'
|
||||
DANGEROUS_SALT = 'itsdangeroussalt'
|
||||
TOKEN_MAX_AGE_SECONDS = 120000
|
||||
TOKEN_MAX_AGE_SECONDS = 3600
|
||||
|
||||
|
||||
class Development(Config):
|
||||
|
||||
0
tests/app/main/notify_client/__init__.py
Normal file
0
tests/app/main/notify_client/__init__.py
Normal file
35
tests/app/main/notify_client/test_sender.py
Normal file
35
tests/app/main/notify_client/test_sender.py
Normal file
@@ -0,0 +1,35 @@
|
||||
from itsdangerous import BadSignature
|
||||
from pytest import fail
|
||||
|
||||
from app.notify_client.sender import generate_token, check_token
|
||||
|
||||
|
||||
def test_should_return_email_from_signed_token(notifications_admin,
|
||||
notifications_admin_db,
|
||||
notify_db_session):
|
||||
email = 'email@something.com'
|
||||
token = generate_token(email)
|
||||
assert email == check_token(token)
|
||||
|
||||
|
||||
def test_should_throw_exception_when_token_is_tampered_with(notifications_admin,
|
||||
notifications_admin_db,
|
||||
notify_db_session):
|
||||
email = 'email@something.com'
|
||||
token = generate_token(email)
|
||||
try:
|
||||
check_token(token + 'qerqwer')
|
||||
fail()
|
||||
except BadSignature:
|
||||
pass
|
||||
|
||||
|
||||
def test_return_none_when_token_is_expired(notifications_admin,
|
||||
notifications_admin_db,
|
||||
notify_db_session):
|
||||
with notifications_admin.test_request_context():
|
||||
notifications_admin.config['TOKEN_MAX_AGE_SECONDS'] = -1000
|
||||
email = 'email@something.com'
|
||||
token = generate_token(email)
|
||||
assert check_token(token) is None
|
||||
notifications_admin.config['TOKEN_MAX_AGE_SECONDS'] = 120000
|
||||
@@ -1,15 +1,14 @@
|
||||
from flask import url_for
|
||||
|
||||
from app.main.dao import users_dao
|
||||
from app.main.views import generate_token
|
||||
from tests.app.main import create_test_user
|
||||
|
||||
|
||||
def test_should_render_forgot_password(notifications_admin, notifications_admin_db, notify_db_session):
|
||||
response = notifications_admin.test_client().get('/forgot-password')
|
||||
assert response.status_code == 200
|
||||
assert 'If you have forgotten your password, we can send you an email to create a new password.' \
|
||||
in response.get_data(as_text=True)
|
||||
with notifications_admin.test_request_context():
|
||||
response = notifications_admin.test_client().get(url_for('.forgot_password'))
|
||||
assert response.status_code == 200
|
||||
assert 'If you have forgotten your password, we can send you an email to create a new password.' \
|
||||
in response.get_data(as_text=True)
|
||||
|
||||
|
||||
def test_should_redirect_to_password_reset_sent_and_state_updated(notifications_admin,
|
||||
@@ -17,25 +16,11 @@ def test_should_redirect_to_password_reset_sent_and_state_updated(notifications_
|
||||
mocker,
|
||||
notify_db_session):
|
||||
mocker.patch("app.admin_api_client.send_email")
|
||||
user = create_test_user('active')
|
||||
response = notifications_admin.test_client().post('/forgot-password',
|
||||
data={'email_address': user.email_address})
|
||||
assert response.status_code == 200
|
||||
assert 'You have been sent an email containing a link to reset your password.' in response.get_data(
|
||||
as_text=True)
|
||||
assert users_dao.get_user_by_id(user.id).state == 'request_password_reset'
|
||||
|
||||
|
||||
def test_should_redirect_to_forgot_password_with_flash_message_when_token_is_expired(notifications_admin,
|
||||
notifications_admin_db,
|
||||
notify_db_session):
|
||||
with notifications_admin.test_request_context():
|
||||
with notifications_admin.test_client() as client:
|
||||
notifications_admin.config['TOKEN_MAX_AGE_SECONDS'] = -1000
|
||||
user = create_test_user('active')
|
||||
token = generate_token(user.email_address)
|
||||
response = client.post('/new-password/{}'.format(token),
|
||||
data={'new_password': 'a-new_password'})
|
||||
assert response.status_code == 302
|
||||
assert response.location == url_for('.forgot_password', _external=True)
|
||||
notifications_admin.config['TOKEN_MAX_AGE_SECONDS'] = 86400
|
||||
user = create_test_user('active')
|
||||
response = notifications_admin.test_client().post(url_for('.forgot_password'),
|
||||
data={'email_address': user.email_address})
|
||||
assert response.status_code == 200
|
||||
assert 'You have been sent an email containing a link to reset your password.' in response.get_data(
|
||||
as_text=True)
|
||||
assert users_dao.get_user_by_id(user.id).state == 'request_password_reset'
|
||||
|
||||
@@ -2,7 +2,7 @@ from flask import url_for
|
||||
|
||||
from app.main.dao import users_dao
|
||||
from app.main.encryption import check_hash
|
||||
from app.main.views import generate_token
|
||||
from app.notify_client.sender import generate_token
|
||||
from tests.app.main import create_test_user
|
||||
|
||||
|
||||
@@ -56,7 +56,7 @@ def test_should_redirect_to_forgot_password_with_flash_message_when_token_is_exp
|
||||
response = client.post(url_for('.new_password', token=token), data={'new_password': 'a-new_password'})
|
||||
assert response.status_code == 302
|
||||
assert response.location == url_for('.forgot_password', _external=True)
|
||||
notifications_admin.config['TOKEN_MAX_AGE_SECONDS'] = 86400
|
||||
notifications_admin.config['TOKEN_MAX_AGE_SECONDS'] = 3600
|
||||
|
||||
|
||||
def test_should_redirect_to_forgot_password_when_user_is_active_should_be_request_password_reset(notifications_admin,
|
||||
|
||||
Reference in New Issue
Block a user