mirror of
https://github.com/GSA/notifications-admin.git
synced 2026-02-05 02:42:26 -05:00
Fix for security hole with setting session['user_id'] before second factor of authentication has been authorised.
This commit is contained in:
@@ -16,7 +16,9 @@ def add_code(user_id, code, code_type):
|
||||
return code.id
|
||||
|
||||
|
||||
def get_codes(user_id, code_type):
|
||||
def get_codes(user_id, code_type=None):
|
||||
if not code_type:
|
||||
return VerifyCodes.query.filter_by(user_id=user_id, code_used=False).all()
|
||||
return VerifyCodes.query.filter_by(user_id=user_id, code_type=code_type, code_used=False).all()
|
||||
|
||||
|
||||
|
||||
@@ -1,12 +1,11 @@
|
||||
from datetime import datetime
|
||||
|
||||
from flask import session
|
||||
from flask_wtf import Form
|
||||
from wtforms import StringField, PasswordField, ValidationError
|
||||
from wtforms.validators import DataRequired, Email, Length, Regexp
|
||||
from app.main.dao import verify_codes_dao
|
||||
from app.main.encryption import check_hash
|
||||
from app.main.validators import Blacklist
|
||||
from app.main.validators import Blacklist, ValidateUserCodes
|
||||
|
||||
|
||||
class LoginForm(Form):
|
||||
@@ -62,26 +61,40 @@ class RegisterUserForm(Form):
|
||||
|
||||
|
||||
class TwoFactorForm(Form):
|
||||
sms_code = StringField('sms code', validators=[DataRequired(message='Enter verification code'),
|
||||
Regexp(regex=verify_code, message='Code must be 5 digits')])
|
||||
|
||||
def validate_sms_code(self, a):
|
||||
return validate_codes(self.sms_code, 'sms')
|
||||
def __init__(self, user_codes, *args, **kwargs):
|
||||
'''
|
||||
Keyword arguments:
|
||||
user_codes -- List of user code objects which have the fields
|
||||
(code_type, expiry_datetime, code)
|
||||
'''
|
||||
self.user_codes = user_codes
|
||||
super(TwoFactorForm, self).__init__(*args, **kwargs)
|
||||
|
||||
sms_code = StringField('sms code', validators=[DataRequired(message='Enter verification code'),
|
||||
Regexp(regex=verify_code, message='Code must be 5 digits'),
|
||||
ValidateUserCodes(code_type='sms')])
|
||||
|
||||
|
||||
class VerifyForm(Form):
|
||||
|
||||
def __init__(self, user_codes, *args, **kwargs):
|
||||
'''
|
||||
Keyword arguments:
|
||||
user_codes -- List of user code objects which have the fields
|
||||
(code_type, expiry_datetime, code)
|
||||
'''
|
||||
self.user_codes = user_codes
|
||||
super(VerifyForm, self).__init__(*args, **kwargs)
|
||||
|
||||
sms_code = StringField("Text message confirmation code",
|
||||
validators=[DataRequired(message='SMS code can not be empty'),
|
||||
Regexp(regex=verify_code, message='Code must be 5 digits')])
|
||||
Regexp(regex=verify_code, message='Code must be 5 digits'),
|
||||
ValidateUserCodes(code_type='sms')])
|
||||
email_code = StringField("Email confirmation code",
|
||||
validators=[DataRequired(message='Email code can not be empty'),
|
||||
Regexp(regex=verify_code, message='Code must be 5 digits')])
|
||||
|
||||
def validate_email_code(self, a):
|
||||
return validate_codes(self.email_code, 'email')
|
||||
|
||||
def validate_sms_code(self, a):
|
||||
return validate_codes(self.sms_code, 'sms')
|
||||
Regexp(regex=verify_code, message='Code must be 5 digits'),
|
||||
ValidateUserCodes(code_type='email')])
|
||||
|
||||
|
||||
class EmailNotReceivedForm(Form):
|
||||
@@ -110,19 +123,3 @@ class AddServiceForm(Form):
|
||||
def validate_service_name(self, a):
|
||||
if self.service_name.data in self.service_names:
|
||||
raise ValidationError('Service name already exists')
|
||||
|
||||
|
||||
def validate_codes(field, code_type):
|
||||
codes = verify_codes_dao.get_codes(user_id=session['user_id'], code_type=code_type)
|
||||
# TODO need to remove this manual logging.
|
||||
print('validate_codes for user_id: {} are {}'.format(session['user_id'], codes))
|
||||
if not [code for code in codes if validate_code(field, code)]:
|
||||
raise ValidationError('Code does not match')
|
||||
|
||||
|
||||
def validate_code(field, code):
|
||||
if field.data and check_hash(field.data, code.code):
|
||||
if code.expiry_datetime <= datetime.now():
|
||||
raise ValidationError('Code has expired')
|
||||
else:
|
||||
return code.code
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
from wtforms import ValidationError
|
||||
from datetime import datetime
|
||||
from app.main.encryption import check_hash
|
||||
|
||||
|
||||
class Blacklist(object):
|
||||
@@ -10,3 +12,29 @@ class Blacklist(object):
|
||||
def __call__(self, form, field):
|
||||
if field.data in ['password1234', 'passw0rd1234']:
|
||||
raise ValidationError(self.message)
|
||||
|
||||
|
||||
class ValidateUserCodes(object):
|
||||
def __init__(self,
|
||||
expiry_msg='Code has expired',
|
||||
invalid_msg='Code does not match',
|
||||
code_type=None):
|
||||
self.expiry_msg = expiry_msg
|
||||
self.invalid_msg = invalid_msg
|
||||
self.code_type = code_type
|
||||
|
||||
def __call__(self, form, field):
|
||||
# TODO would be great to do this sql query but
|
||||
# not couple those parts of the code.
|
||||
user_codes = getattr(form, 'user_codes', [])
|
||||
valid_code = False
|
||||
for code in user_codes:
|
||||
if check_hash(field.data, code.code) and self.code_type == code.code_type:
|
||||
if code.expiry_datetime <= datetime.now():
|
||||
raise ValidationError(self.expiry_msg)
|
||||
else:
|
||||
# Valid code
|
||||
valid_code = True
|
||||
break
|
||||
if not valid_code:
|
||||
raise ValidationError(self.invalid_msg)
|
||||
|
||||
@@ -10,7 +10,7 @@ from app.main.views import send_sms_code, send_email_code
|
||||
@main.route('/email-not-received', methods=['GET', 'POST'])
|
||||
def check_and_resend_email_code():
|
||||
# TODO there needs to be a way to regenerate a session id
|
||||
user = users_dao.get_user_by_id(session['user_id'])
|
||||
user = users_dao.get_user_by_email(session['user_email'])
|
||||
form = EmailNotReceivedForm(email_address=user.email_address)
|
||||
if form.validate_on_submit():
|
||||
users_dao.update_email_address(id=user.id, email_address=form.email_address.data)
|
||||
@@ -22,7 +22,7 @@ def check_and_resend_email_code():
|
||||
@main.route('/text-not-received', methods=['GET', 'POST'])
|
||||
def check_and_resend_text_code():
|
||||
# TODO there needs to be a way to regenerate a session id
|
||||
user = users_dao.get_user_by_id(session['user_id'])
|
||||
user = users_dao.get_user_by_email(session['user_email'])
|
||||
form = TextNotReceivedForm(mobile_number=user.mobile_number)
|
||||
if form.validate_on_submit():
|
||||
users_dao.update_mobile_number(id=user.id, mobile_number=form.mobile_number.data)
|
||||
@@ -39,6 +39,6 @@ def verification_code_not_received():
|
||||
@main.route('/send-new-code', methods=['GET'])
|
||||
def check_and_resend_verification_code():
|
||||
# TODO there needs to be a way to generate a new session id
|
||||
user = users_dao.get_user_by_id(session['user_id'])
|
||||
user = users_dao.get_user_by_email(session['user_email'])
|
||||
send_sms_code(user.id, user.mobile_number)
|
||||
return redirect(url_for('main.two_factor'))
|
||||
|
||||
@@ -42,7 +42,7 @@ def process_register():
|
||||
send_sms_code(user_id=user.id, mobile_number=form.mobile_number.data)
|
||||
send_email_code(user_id=user.id, email=form.email_address.data)
|
||||
session['expiry_date'] = str(datetime.now() + timedelta(hours=1))
|
||||
session['user_id'] = user.id
|
||||
session['user_email'] = user.email_address
|
||||
return redirect('/verify')
|
||||
|
||||
return render_template('views/register.html', form=form)
|
||||
|
||||
@@ -18,7 +18,7 @@ def sign_in():
|
||||
if user:
|
||||
if not user.is_locked() and user.is_active() and check_hash(form.password.data, user.password):
|
||||
send_sms_code(user.id, user.mobile_number)
|
||||
session['user_id'] = user.id
|
||||
session['user_email'] = user.email_address
|
||||
return redirect(url_for('.two_factor'))
|
||||
else:
|
||||
users_dao.increment_failed_login_count(user.id)
|
||||
|
||||
@@ -11,10 +11,12 @@ from app.main.forms import TwoFactorForm
|
||||
|
||||
@main.route('/two-factor', methods=['GET', 'POST'])
|
||||
def two_factor():
|
||||
form = TwoFactorForm()
|
||||
# TODO handle user_email not in session
|
||||
user = users_dao.get_user_by_email(session['user_email'])
|
||||
codes = verify_codes_dao.get_codes(user.id)
|
||||
form = TwoFactorForm(codes)
|
||||
|
||||
if form.validate_on_submit():
|
||||
user = users_dao.get_user_by_id(session['user_id'])
|
||||
verify_codes_dao.use_code_for_user_and_type(user_id=user.id, code_type='sms')
|
||||
login_user(user)
|
||||
return redirect(url_for('.dashboard'))
|
||||
|
||||
@@ -12,8 +12,9 @@ from app.main.forms import VerifyForm
|
||||
def verify():
|
||||
# TODO there needs to be a way to regenerate a session id
|
||||
# or handle gracefully.
|
||||
user = users_dao.get_user_by_id(session['user_id'])
|
||||
form = VerifyForm()
|
||||
user = users_dao.get_user_by_email(session['user_email'])
|
||||
codes = verify_codes_dao.get_codes(user.id)
|
||||
form = VerifyForm(codes)
|
||||
if form.validate_on_submit():
|
||||
verify_codes_dao.use_code_for_user_and_type(user_id=user.id, code_type='email')
|
||||
verify_codes_dao.use_code_for_user_and_type(user_id=user.id, code_type='sms')
|
||||
|
||||
Reference in New Issue
Block a user