Merge pull request #827 from alphagov/stronger-2fa-security

Set the expiry time on a verify code (2fa) to 10 minutes.
This commit is contained in:
Leo Hemsted
2017-02-16 13:11:12 +00:00
committed by GitHub
5 changed files with 182 additions and 201 deletions

View File

@@ -78,6 +78,7 @@ class Config(object):
SMS_CHAR_COUNT_LIMIT = 495
BRANDING_PATH = '/images/email-template/crests/'
TEST_MESSAGE_FILENAME = 'Test message'
MAX_VERIFY_CODE_COUNT = 10
NOTIFY_SERVICE_ID = 'd6aa2c68-a2d9-4437-ab19-3ae8eb202553'
INVITATION_EMAIL_TEMPLATE_ID = '4f46df42-f795-4cc4-83bb-65ca312f49cc'

View File

@@ -1,8 +1,6 @@
import random
from datetime import (datetime, timedelta)
from sqlalchemy import func
from app import db
from app.models import (User, VerifyCode)
@@ -35,7 +33,7 @@ def save_model_user(usr, update_dict={}, pwd=None):
def create_user_code(user, code, code_type):
verify_code = VerifyCode(code_type=code_type,
expiry_datetime=datetime.utcnow() + timedelta(hours=1),
expiry_datetime=datetime.utcnow() + timedelta(minutes=30),
user=user)
verify_code.code = code
db.session.add(verify_code)
@@ -48,7 +46,7 @@ def get_user_code(user, code, code_type):
# time searching for the correct code.
codes = VerifyCode.query.filter_by(
user=user, code_type=code_type).order_by(
VerifyCode.created_at.desc())
VerifyCode.created_at.desc())
retval = None
for x in codes:
if x.check_code(code):
@@ -82,6 +80,15 @@ def delete_user_verify_codes(user):
db.session.commit()
def count_user_verify_codes(user):
query = VerifyCode.query.filter(
VerifyCode.user == user,
VerifyCode.expiry_datetime > datetime.utcnow(),
VerifyCode.code_used.is_(False)
)
return query.count()
def get_user_by_id(user_id=None):
if user_id:
return User.query.filter_by(id=user_id).one()

View File

@@ -12,7 +12,8 @@ from app.dao.users_dao import (
get_user_by_email,
create_secret_code,
save_user_attribute,
update_user_password
update_user_password,
count_user_verify_codes
)
from app.dao.permissions_dao import permission_dao
from app.dao.services_dao import dao_fetch_service_by_id
@@ -123,8 +124,10 @@ def verify_user_code(user_id):
code = get_user_code(user_to_verify, txt_code, txt_type)
if not code:
increment_failed_login_count(user_to_verify)
raise InvalidRequest("Code not found", status_code=404)
if datetime.utcnow() > code.expiry_datetime or code.code_used:
increment_failed_login_count(user_to_verify)
raise InvalidRequest("Code has expired", status_code=400)
use_user_code(code.id)
return jsonify({}), 204
@@ -135,6 +138,11 @@ def send_user_sms_code(user_id):
user_to_send_to = get_user_by_id(user_id=user_id)
verify_code, errors = request_verify_code_schema.load(request.get_json())
if count_user_verify_codes(user_to_send_to) >= current_app.config.get('MAX_VERIFY_CODE_COUNT'):
# Prevent more than `MAX_VERIFY_CODE_COUNT` active verify codes at a time
current_app.logger.warn('Max verify code has exceeded for user {}'.format(user_to_send_to.id))
return jsonify({}), 204
secret_code = create_secret_code()
create_user_code(user_to_send_to, secret_code, SMS_TYPE)

View File

@@ -1,5 +1,6 @@
from datetime import datetime, timedelta
from freezegun import freeze_time
from sqlalchemy.exc import DataError
from sqlalchemy.orm.exc import NoResultFound
import pytest
@@ -14,8 +15,8 @@ from app.dao.users_dao import (
reset_failed_login_count,
get_user_by_email,
delete_codes_older_created_more_than_a_day_ago,
update_user_password
)
update_user_password,
count_user_verify_codes)
from app.models import User, VerifyCode
@@ -109,13 +110,14 @@ def test_should_not_delete_verification_codes_less_than_one_day_old(sample_user)
assert VerifyCode.query.one()._code == "12345"
def make_verify_code(user, age=timedelta(hours=0), code="12335"):
def make_verify_code(user, age=timedelta(hours=0), expiry_age=timedelta(0), code="12335", code_used=False):
verify_code = VerifyCode(
code_type='sms',
_code=code,
created_at=datetime.utcnow() - age,
expiry_datetime=datetime.utcnow(),
user=user
expiry_datetime=datetime.utcnow() - expiry_age,
user=user,
code_used=code_used
)
db.session.add(verify_code)
db.session.commit()
@@ -140,3 +142,12 @@ def test_update_user_password(notify_api, notify_db, notify_db_session, sample_u
assert not sample_user.check_password(password)
update_user_password(sample_user, password)
assert sample_user.check_password(password)
def test_count_user_verify_codes(sample_user):
with freeze_time(datetime.utcnow() + timedelta(hours=1)):
make_verify_code(sample_user, code_used=True)
make_verify_code(sample_user, expiry_age=timedelta(hours=2))
[make_verify_code(sample_user) for i in range(5)]
assert count_user_verify_codes(sample_user) == 5

View File

@@ -1,5 +1,6 @@
import json
import moto
import uuid
import pytest
from datetime import (
@@ -15,7 +16,7 @@ from app.models import (
Notification
)
from app import db, encryption
from app import db
from tests import create_authorization_header
from freezegun import freeze_time
@@ -23,198 +24,141 @@ from freezegun import freeze_time
import app.celery.tasks
def test_user_verify_code_sms(notify_api,
sample_sms_code):
"""
Tests POST endpoint '/<user_id>/verify/code'
"""
with notify_api.test_request_context():
with notify_api.test_client() as client:
assert not VerifyCode.query.first().code_used
data = json.dumps({
'code_type': sample_sms_code.code_type,
'code': sample_sms_code.txt_code})
auth_header = create_authorization_header()
resp = client.post(
url_for('user.verify_user_code', user_id=sample_sms_code.user.id),
data=data,
headers=[('Content-Type', 'application/json'), auth_header])
assert resp.status_code == 204
assert VerifyCode.query.first().code_used
def test_user_verify_code(client,
sample_sms_code):
assert not VerifyCode.query.first().code_used
data = json.dumps({
'code_type': sample_sms_code.code_type,
'code': sample_sms_code.txt_code})
auth_header = create_authorization_header()
resp = client.post(
url_for('user.verify_user_code', user_id=sample_sms_code.user.id),
data=data,
headers=[('Content-Type', 'application/json'), auth_header])
assert resp.status_code == 204
assert VerifyCode.query.first().code_used
def test_user_verify_code_sms_missing_code(notify_api,
sample_sms_code):
"""
Tests POST endpoint '/<user_id>/verify/code'
"""
with notify_api.test_request_context():
with notify_api.test_client() as client:
assert not VerifyCode.query.first().code_used
data = json.dumps({'code_type': sample_sms_code.code_type})
auth_header = create_authorization_header()
resp = client.post(
url_for('user.verify_user_code', user_id=sample_sms_code.user.id),
data=data,
headers=[('Content-Type', 'application/json'), auth_header])
assert resp.status_code == 400
assert not VerifyCode.query.first().code_used
def test_user_verify_code_missing_code(client,
sample_sms_code):
assert not VerifyCode.query.first().code_used
data = json.dumps({'code_type': sample_sms_code.code_type})
auth_header = create_authorization_header()
resp = client.post(
url_for('user.verify_user_code', user_id=sample_sms_code.user.id),
data=data,
headers=[('Content-Type', 'application/json'), auth_header])
assert resp.status_code == 400
assert not VerifyCode.query.first().code_used
assert User.query.get(sample_sms_code.user.id).failed_login_count == 0
@moto.mock_sqs
def test_user_verify_code_email(notify_api,
sqs_client_conn,
sample_email_code):
"""
Tests POST endpoint '/<user_id>/verify/code'
"""
with notify_api.test_request_context():
with notify_api.test_client() as client:
assert not VerifyCode.query.first().code_used
data = json.dumps({
'code_type': sample_email_code.code_type,
'code': sample_email_code.txt_code})
auth_header = create_authorization_header()
resp = client.post(
url_for('user.verify_user_code', user_id=sample_email_code.user.id),
data=data,
headers=[('Content-Type', 'application/json'), auth_header])
assert resp.status_code == 204
assert VerifyCode.query.first().code_used
def test_user_verify_code_bad_code_and_increments_failed_login_count(client,
sample_sms_code):
assert not VerifyCode.query.first().code_used
data = json.dumps({
'code_type': sample_sms_code.code_type,
'code': "blah"})
auth_header = create_authorization_header()
resp = client.post(
url_for('user.verify_user_code', user_id=sample_sms_code.user.id),
data=data,
headers=[('Content-Type', 'application/json'), auth_header])
assert resp.status_code == 404
assert not VerifyCode.query.first().code_used
assert User.query.get(sample_sms_code.user.id).failed_login_count == 1
def test_user_verify_code_email_bad_code(notify_api,
sample_email_code):
"""
Tests POST endpoint '/<user_id>/verify/code'
"""
with notify_api.test_request_context():
with notify_api.test_client() as client:
assert not VerifyCode.query.first().code_used
data = json.dumps({
'code_type': sample_email_code.code_type,
'code': "blah"})
auth_header = create_authorization_header()
resp = client.post(
url_for('user.verify_user_code', user_id=sample_email_code.user.id),
data=data,
headers=[('Content-Type', 'application/json'), auth_header])
assert resp.status_code == 404
assert not VerifyCode.query.first().code_used
def test_user_verify_code_email_expired_code(notify_api,
sample_email_code):
"""
Tests POST endpoint '/<user_id>/verify/code'
"""
with notify_api.test_request_context():
with notify_api.test_client() as client:
assert not VerifyCode.query.first().code_used
sample_email_code.expiry_datetime = (
datetime.utcnow() - timedelta(hours=1))
db.session.add(sample_email_code)
db.session.commit()
data = json.dumps({
'code_type': sample_email_code.code_type,
'code': sample_email_code.txt_code})
auth_header = create_authorization_header()
resp = client.post(
url_for('user.verify_user_code', user_id=sample_email_code.user.id),
data=data,
headers=[('Content-Type', 'application/json'), auth_header])
assert resp.status_code == 400
assert not VerifyCode.query.first().code_used
def test_user_verify_code_expired_code_and_increments_failed_login_count(
client,
sample_sms_code):
assert not VerifyCode.query.first().code_used
sample_sms_code.expiry_datetime = (
datetime.utcnow() - timedelta(hours=1))
db.session.add(sample_sms_code)
db.session.commit()
data = json.dumps({
'code_type': sample_sms_code.code_type,
'code': sample_sms_code.txt_code})
auth_header = create_authorization_header()
resp = client.post(
url_for('user.verify_user_code', user_id=sample_sms_code.user.id),
data=data,
headers=[('Content-Type', 'application/json'), auth_header])
assert resp.status_code == 400
assert not VerifyCode.query.first().code_used
assert User.query.get(sample_sms_code.user.id).failed_login_count == 1
@freeze_time("2016-01-01 10:00:00.000000")
def test_user_verify_password(notify_api,
notify_db,
def test_user_verify_password(client,
notify_db_session,
sample_user):
"""
Tests POST endpoint '/<user_id>/verify/password'
"""
with notify_api.test_request_context():
with notify_api.test_client() as client:
data = json.dumps({'password': 'password'})
auth_header = create_authorization_header()
resp = client.post(
url_for('user.verify_user_password', user_id=sample_user.id),
data=data,
headers=[('Content-Type', 'application/json'), auth_header])
assert resp.status_code == 204
User.query.get(sample_user.id).logged_in_at == datetime.utcnow()
data = json.dumps({'password': 'password'})
auth_header = create_authorization_header()
resp = client.post(
url_for('user.verify_user_password', user_id=sample_user.id),
data=data,
headers=[('Content-Type', 'application/json'), auth_header])
assert resp.status_code == 204
assert User.query.get(sample_user.id).logged_in_at == datetime.utcnow()
def test_user_verify_password_invalid_password(notify_api,
def test_user_verify_password_invalid_password(client,
sample_user):
"""
Tests POST endpoint '/<user_id>/verify/password' invalid endpoint.
"""
with notify_api.test_request_context():
with notify_api.test_client() as client:
data = json.dumps({'password': 'bad password'})
auth_header = create_authorization_header()
data = json.dumps({'password': 'bad password'})
auth_header = create_authorization_header()
assert sample_user.failed_login_count == 0
assert sample_user.failed_login_count == 0
resp = client.post(
url_for('user.verify_user_password', user_id=sample_user.id),
data=data,
headers=[('Content-Type', 'application/json'), auth_header])
assert resp.status_code == 400
json_resp = json.loads(resp.get_data(as_text=True))
assert 'Incorrect password' in json_resp['message']['password']
assert sample_user.failed_login_count == 1
resp = client.post(
url_for('user.verify_user_password', user_id=sample_user.id),
data=data,
headers=[('Content-Type', 'application/json'), auth_header])
assert resp.status_code == 400
json_resp = json.loads(resp.get_data(as_text=True))
assert 'Incorrect password' in json_resp['message']['password']
assert sample_user.failed_login_count == 1
def test_user_verify_password_valid_password_resets_failed_logins(notify_api,
def test_user_verify_password_valid_password_resets_failed_logins(client,
sample_user):
with notify_api.test_request_context():
with notify_api.test_client() as client:
data = json.dumps({'password': 'bad password'})
auth_header = create_authorization_header()
data = json.dumps({'password': 'bad password'})
auth_header = create_authorization_header()
assert sample_user.failed_login_count == 0
assert sample_user.failed_login_count == 0
resp = client.post(
url_for('user.verify_user_password', user_id=sample_user.id),
data=data,
headers=[('Content-Type', 'application/json'), auth_header])
assert resp.status_code == 400
json_resp = json.loads(resp.get_data(as_text=True))
assert 'Incorrect password' in json_resp['message']['password']
resp = client.post(
url_for('user.verify_user_password', user_id=sample_user.id),
data=data,
headers=[('Content-Type', 'application/json'), auth_header])
assert resp.status_code == 400
json_resp = json.loads(resp.get_data(as_text=True))
assert 'Incorrect password' in json_resp['message']['password']
assert sample_user.failed_login_count == 1
assert sample_user.failed_login_count == 1
data = json.dumps({'password': 'password'})
auth_header = create_authorization_header()
resp = client.post(
url_for('user.verify_user_password', user_id=sample_user.id),
data=data,
headers=[('Content-Type', 'application/json'), auth_header])
data = json.dumps({'password': 'password'})
auth_header = create_authorization_header()
resp = client.post(
url_for('user.verify_user_password', user_id=sample_user.id),
data=data,
headers=[('Content-Type', 'application/json'), auth_header])
assert resp.status_code == 204
assert sample_user.failed_login_count == 0
assert resp.status_code == 204
assert sample_user.failed_login_count == 0
def test_user_verify_password_missing_password(notify_api,
def test_user_verify_password_missing_password(client,
sample_user):
"""
Tests POST endpoint '/<user_id>/verify/password' missing password.
"""
with notify_api.test_request_context():
with notify_api.test_client() as client:
data = json.dumps({'bingo': 'bongo'})
auth_header = create_authorization_header()
resp = client.post(
url_for('user.verify_user_password', user_id=sample_user.id),
data=data,
headers=[('Content-Type', 'application/json'), auth_header])
assert resp.status_code == 400
json_resp = json.loads(resp.get_data(as_text=True))
assert 'Required field missing data' in json_resp['message']['password']
auth_header = create_authorization_header()
resp = client.post(
url_for('user.verify_user_password', user_id=sample_user.id),
data=json.dumps({'bingo': 'bongo'}),
headers=[('Content-Type', 'application/json'), auth_header])
assert resp.status_code == 400
json_resp = json.loads(resp.get_data(as_text=True))
assert 'Required field missing data' in json_resp['message']['password']
@pytest.mark.parametrize('research_mode', [True, False])
@@ -235,14 +179,13 @@ def test_send_user_sms_code(notify_api,
notify_service.research_mode = True
dao_update_service(notify_service)
data = json.dumps({})
auth_header = create_authorization_header()
mocked = mocker.patch('app.user.rest.create_secret_code', return_value='11111')
mocker.patch('app.celery.provider_tasks.deliver_sms.apply_async')
resp = client.post(
url_for('user.send_user_sms_code', user_id=sample_user.id),
data=data,
data=json.dumps({}),
headers=[('Content-Type', 'application/json'), auth_header])
assert resp.status_code == 204
@@ -275,12 +218,11 @@ def test_send_user_code_for_sms_with_optional_to_field(notify_api,
to_number = '+441119876757'
mocked = mocker.patch('app.user.rest.create_secret_code', return_value='11111')
mocker.patch('app.celery.provider_tasks.deliver_sms.apply_async')
data = json.dumps({'to': to_number})
auth_header = create_authorization_header()
resp = client.post(
url_for('user.send_user_sms_code', user_id=sample_user.id),
data=data,
data=json.dumps({'to': to_number}),
headers=[('Content-Type', 'application/json'), auth_header])
assert resp.status_code == 204
@@ -293,34 +235,47 @@ def test_send_user_code_for_sms_with_optional_to_field(notify_api,
)
def test_send_sms_code_returns_404_for_bad_input_data(notify_api, notify_db, notify_db_session):
"""
Tests POST endpoint /user/<user_id>/sms-code return 404 for bad input data
"""
with notify_api.test_request_context():
with notify_api.test_client() as client:
data = json.dumps({})
import uuid
uuid_ = uuid.uuid4()
auth_header = create_authorization_header()
resp = client.post(
url_for('user.send_user_sms_code', user_id=uuid_),
data=data,
headers=[('Content-Type', 'application/json'), auth_header])
assert resp.status_code == 404
assert json.loads(resp.get_data(as_text=True))['message'] == 'No result found'
def test_send_sms_code_returns_404_for_bad_input_data(client):
uuid_ = uuid.uuid4()
auth_header = create_authorization_header()
resp = client.post(
url_for('user.send_user_sms_code', user_id=uuid_),
data=json.dumps({}),
headers=[('Content-Type', 'application/json'), auth_header])
assert resp.status_code == 404
assert json.loads(resp.get_data(as_text=True))['message'] == 'No result found'
def test_send_sms_code_returns_204_when_too_many_codes_already_created(client, sample_user):
for i in range(10):
verify_code = VerifyCode(
code_type='sms',
_code=12345,
created_at=datetime.utcnow() - timedelta(minutes=10),
expiry_datetime=datetime.utcnow() + timedelta(minutes=40),
user=sample_user
)
db.session.add(verify_code)
db.session.commit()
assert VerifyCode.query.count() == 10
auth_header = create_authorization_header()
resp = client.post(
url_for('user.send_user_sms_code', user_id=sample_user.id),
data=json.dumps({}),
headers=[('Content-Type', 'application/json'), auth_header])
assert resp.status_code == 204
assert VerifyCode.query.count() == 10
def test_send_user_email_verification(client,
sample_user,
mocker,
email_verification_template):
data = json.dumps({})
mocked = mocker.patch('app.celery.provider_tasks.deliver_email.apply_async')
auth_header = create_authorization_header()
resp = client.post(
url_for('user.send_user_email_verification', user_id=str(sample_user.id)),
data=data,
data=json.dumps({}),
headers=[('Content-Type', 'application/json'), auth_header])
assert resp.status_code == 204
notification = Notification.query.first()
@@ -332,13 +287,12 @@ def test_send_email_verification_returns_404_for_bad_input_data(client, notify_d
Tests POST endpoint /user/<user_id>/sms-code return 404 for bad input data
"""
mocked = mocker.patch('app.celery.provider_tasks.deliver_email.apply_async')
data = json.dumps({})
import uuid
uuid_ = uuid.uuid4()
auth_header = create_authorization_header()
resp = client.post(
url_for('user.send_user_email_verification', user_id=uuid_),
data=data,
data=json.dumps({}),
headers=[('Content-Type', 'application/json'), auth_header])
assert resp.status_code == 404
assert json.loads(resp.get_data(as_text=True))['message'] == 'No result found'