mirror of
https://github.com/GSA/notifications-api.git
synced 2026-02-03 18:01:08 -05:00
Set the expiry time on a verify code (2fa) to 10 minutes.
When the verify code is wrong or expired increment the failed to login count for the user. When the verify code is successfully used reset the failed login count to 0.
This commit is contained in:
@@ -35,7 +35,7 @@ def save_model_user(usr, update_dict={}, pwd=None):
|
|||||||
|
|
||||||
def create_user_code(user, code, code_type):
|
def create_user_code(user, code, code_type):
|
||||||
verify_code = VerifyCode(code_type=code_type,
|
verify_code = VerifyCode(code_type=code_type,
|
||||||
expiry_datetime=datetime.utcnow() + timedelta(hours=1),
|
expiry_datetime=datetime.utcnow() + timedelta(minutes=30),
|
||||||
user=user)
|
user=user)
|
||||||
verify_code.code = code
|
verify_code.code = code
|
||||||
db.session.add(verify_code)
|
db.session.add(verify_code)
|
||||||
|
|||||||
@@ -123,10 +123,13 @@ def verify_user_code(user_id):
|
|||||||
|
|
||||||
code = get_user_code(user_to_verify, txt_code, txt_type)
|
code = get_user_code(user_to_verify, txt_code, txt_type)
|
||||||
if not code:
|
if not code:
|
||||||
|
increment_failed_login_count(user_to_verify)
|
||||||
raise InvalidRequest("Code not found", status_code=404)
|
raise InvalidRequest("Code not found", status_code=404)
|
||||||
if datetime.utcnow() > code.expiry_datetime or code.code_used:
|
if datetime.utcnow() > code.expiry_datetime or code.code_used:
|
||||||
|
increment_failed_login_count(user_to_verify)
|
||||||
raise InvalidRequest("Code has expired", status_code=400)
|
raise InvalidRequest("Code has expired", status_code=400)
|
||||||
use_user_code(code.id)
|
use_user_code(code.id)
|
||||||
|
reset_failed_login_count(user_to_verify)
|
||||||
return jsonify({}), 204
|
return jsonify({}), 204
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -1,5 +1,4 @@
|
|||||||
import json
|
import json
|
||||||
import moto
|
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from datetime import (
|
from datetime import (
|
||||||
@@ -15,7 +14,7 @@ from app.models import (
|
|||||||
Notification
|
Notification
|
||||||
)
|
)
|
||||||
|
|
||||||
from app import db, encryption
|
from app import db
|
||||||
|
|
||||||
from tests import create_authorization_header
|
from tests import create_authorization_header
|
||||||
from freezegun import freeze_time
|
from freezegun import freeze_time
|
||||||
@@ -23,198 +22,167 @@ from freezegun import freeze_time
|
|||||||
import app.celery.tasks
|
import app.celery.tasks
|
||||||
|
|
||||||
|
|
||||||
def test_user_verify_code_sms(notify_api,
|
def test_user_verify_code(client,
|
||||||
sample_sms_code):
|
sample_sms_code):
|
||||||
"""
|
assert not VerifyCode.query.first().code_used
|
||||||
Tests POST endpoint '/<user_id>/verify/code'
|
data = json.dumps({
|
||||||
"""
|
'code_type': sample_sms_code.code_type,
|
||||||
with notify_api.test_request_context():
|
'code': sample_sms_code.txt_code})
|
||||||
with notify_api.test_client() as client:
|
auth_header = create_authorization_header()
|
||||||
assert not VerifyCode.query.first().code_used
|
resp = client.post(
|
||||||
data = json.dumps({
|
url_for('user.verify_user_code', user_id=sample_sms_code.user.id),
|
||||||
'code_type': sample_sms_code.code_type,
|
data=data,
|
||||||
'code': sample_sms_code.txt_code})
|
headers=[('Content-Type', 'application/json'), auth_header])
|
||||||
auth_header = create_authorization_header()
|
assert resp.status_code == 204
|
||||||
resp = client.post(
|
assert VerifyCode.query.first().code_used
|
||||||
url_for('user.verify_user_code', user_id=sample_sms_code.user.id),
|
|
||||||
data=data,
|
|
||||||
headers=[('Content-Type', 'application/json'), auth_header])
|
|
||||||
assert resp.status_code == 204
|
|
||||||
assert VerifyCode.query.first().code_used
|
|
||||||
|
|
||||||
|
|
||||||
def test_user_verify_code_sms_missing_code(notify_api,
|
def test_user_verify_code_missing_code(client,
|
||||||
sample_sms_code):
|
sample_sms_code):
|
||||||
"""
|
assert not VerifyCode.query.first().code_used
|
||||||
Tests POST endpoint '/<user_id>/verify/code'
|
data = json.dumps({'code_type': sample_sms_code.code_type})
|
||||||
"""
|
auth_header = create_authorization_header()
|
||||||
with notify_api.test_request_context():
|
resp = client.post(
|
||||||
with notify_api.test_client() as client:
|
url_for('user.verify_user_code', user_id=sample_sms_code.user.id),
|
||||||
assert not VerifyCode.query.first().code_used
|
data=data,
|
||||||
data = json.dumps({'code_type': sample_sms_code.code_type})
|
headers=[('Content-Type', 'application/json'), auth_header])
|
||||||
auth_header = create_authorization_header()
|
assert resp.status_code == 400
|
||||||
resp = client.post(
|
assert not VerifyCode.query.first().code_used
|
||||||
url_for('user.verify_user_code', user_id=sample_sms_code.user.id),
|
assert User.query.get(sample_sms_code.user.id).failed_login_count == 0
|
||||||
data=data,
|
|
||||||
headers=[('Content-Type', 'application/json'), auth_header])
|
|
||||||
assert resp.status_code == 400
|
|
||||||
assert not VerifyCode.query.first().code_used
|
|
||||||
|
|
||||||
|
|
||||||
@moto.mock_sqs
|
def test_user_verify_code_bad_code_and_increments_failed_login_count(client,
|
||||||
def test_user_verify_code_email(notify_api,
|
sample_sms_code):
|
||||||
sqs_client_conn,
|
assert not VerifyCode.query.first().code_used
|
||||||
sample_email_code):
|
data = json.dumps({
|
||||||
"""
|
'code_type': sample_sms_code.code_type,
|
||||||
Tests POST endpoint '/<user_id>/verify/code'
|
'code': "blah"})
|
||||||
"""
|
auth_header = create_authorization_header()
|
||||||
with notify_api.test_request_context():
|
resp = client.post(
|
||||||
with notify_api.test_client() as client:
|
url_for('user.verify_user_code', user_id=sample_sms_code.user.id),
|
||||||
assert not VerifyCode.query.first().code_used
|
data=data,
|
||||||
data = json.dumps({
|
headers=[('Content-Type', 'application/json'), auth_header])
|
||||||
'code_type': sample_email_code.code_type,
|
assert resp.status_code == 404
|
||||||
'code': sample_email_code.txt_code})
|
assert not VerifyCode.query.first().code_used
|
||||||
auth_header = create_authorization_header()
|
assert User.query.get(sample_sms_code.user.id).failed_login_count == 1
|
||||||
resp = client.post(
|
|
||||||
url_for('user.verify_user_code', user_id=sample_email_code.user.id),
|
|
||||||
data=data,
|
|
||||||
headers=[('Content-Type', 'application/json'), auth_header])
|
|
||||||
assert resp.status_code == 204
|
|
||||||
assert VerifyCode.query.first().code_used
|
|
||||||
|
|
||||||
|
|
||||||
def test_user_verify_code_email_bad_code(notify_api,
|
def test_user_verify_code_resets_failed_login_count_when_successful(client, sample_sms_code):
|
||||||
sample_email_code):
|
assert not VerifyCode.query.first().code_used
|
||||||
"""
|
data = json.dumps({
|
||||||
Tests POST endpoint '/<user_id>/verify/code'
|
'code_type': sample_sms_code.code_type,
|
||||||
"""
|
'code': "blah"})
|
||||||
with notify_api.test_request_context():
|
auth_header = create_authorization_header()
|
||||||
with notify_api.test_client() as client:
|
resp = client.post(
|
||||||
assert not VerifyCode.query.first().code_used
|
url_for('user.verify_user_code', user_id=sample_sms_code.user.id),
|
||||||
data = json.dumps({
|
data=data,
|
||||||
'code_type': sample_email_code.code_type,
|
headers=[('Content-Type', 'application/json'), auth_header])
|
||||||
'code': "blah"})
|
assert resp.status_code == 404
|
||||||
auth_header = create_authorization_header()
|
assert not VerifyCode.query.first().code_used
|
||||||
resp = client.post(
|
assert User.query.get(sample_sms_code.user.id).failed_login_count == 1
|
||||||
url_for('user.verify_user_code', user_id=sample_email_code.user.id),
|
correct_data = json.dumps({
|
||||||
data=data,
|
'code_type': sample_sms_code.code_type,
|
||||||
headers=[('Content-Type', 'application/json'), auth_header])
|
'code': sample_sms_code.txt_code})
|
||||||
assert resp.status_code == 404
|
success_response = client.post(
|
||||||
assert not VerifyCode.query.first().code_used
|
url_for('user.verify_user_code', user_id=sample_sms_code.user.id),
|
||||||
|
data=correct_data,
|
||||||
|
headers=[('Content-Type', 'application/json'), auth_header])
|
||||||
|
assert success_response.status_code == 204
|
||||||
|
assert VerifyCode.query.first().code_used
|
||||||
|
assert User.query.get(sample_sms_code.user.id).failed_login_count == 0
|
||||||
|
|
||||||
|
|
||||||
def test_user_verify_code_email_expired_code(notify_api,
|
def test_user_verify_code_expired_code_and_increments_failed_login_count(
|
||||||
sample_email_code):
|
client,
|
||||||
"""
|
sample_sms_code):
|
||||||
Tests POST endpoint '/<user_id>/verify/code'
|
assert not VerifyCode.query.first().code_used
|
||||||
"""
|
sample_sms_code.expiry_datetime = (
|
||||||
with notify_api.test_request_context():
|
datetime.utcnow() - timedelta(hours=1))
|
||||||
with notify_api.test_client() as client:
|
db.session.add(sample_sms_code)
|
||||||
assert not VerifyCode.query.first().code_used
|
db.session.commit()
|
||||||
sample_email_code.expiry_datetime = (
|
data = json.dumps({
|
||||||
datetime.utcnow() - timedelta(hours=1))
|
'code_type': sample_sms_code.code_type,
|
||||||
db.session.add(sample_email_code)
|
'code': sample_sms_code.txt_code})
|
||||||
db.session.commit()
|
auth_header = create_authorization_header()
|
||||||
data = json.dumps({
|
resp = client.post(
|
||||||
'code_type': sample_email_code.code_type,
|
url_for('user.verify_user_code', user_id=sample_sms_code.user.id),
|
||||||
'code': sample_email_code.txt_code})
|
data=data,
|
||||||
auth_header = create_authorization_header()
|
headers=[('Content-Type', 'application/json'), auth_header])
|
||||||
resp = client.post(
|
assert resp.status_code == 400
|
||||||
url_for('user.verify_user_code', user_id=sample_email_code.user.id),
|
assert not VerifyCode.query.first().code_used
|
||||||
data=data,
|
assert User.query.get(sample_sms_code.user.id).failed_login_count == 1
|
||||||
headers=[('Content-Type', 'application/json'), auth_header])
|
|
||||||
assert resp.status_code == 400
|
|
||||||
assert not VerifyCode.query.first().code_used
|
|
||||||
|
|
||||||
|
|
||||||
@freeze_time("2016-01-01 10:00:00.000000")
|
@freeze_time("2016-01-01 10:00:00.000000")
|
||||||
def test_user_verify_password(notify_api,
|
def test_user_verify_password(client,
|
||||||
notify_db,
|
|
||||||
notify_db_session,
|
notify_db_session,
|
||||||
sample_user):
|
sample_user):
|
||||||
"""
|
data = json.dumps({'password': 'password'})
|
||||||
Tests POST endpoint '/<user_id>/verify/password'
|
auth_header = create_authorization_header()
|
||||||
"""
|
resp = client.post(
|
||||||
with notify_api.test_request_context():
|
url_for('user.verify_user_password', user_id=sample_user.id),
|
||||||
with notify_api.test_client() as client:
|
data=data,
|
||||||
data = json.dumps({'password': 'password'})
|
headers=[('Content-Type', 'application/json'), auth_header])
|
||||||
auth_header = create_authorization_header()
|
assert resp.status_code == 204
|
||||||
resp = client.post(
|
assert User.query.get(sample_user.id).logged_in_at == datetime.utcnow()
|
||||||
url_for('user.verify_user_password', user_id=sample_user.id),
|
|
||||||
data=data,
|
|
||||||
headers=[('Content-Type', 'application/json'), auth_header])
|
|
||||||
assert resp.status_code == 204
|
|
||||||
User.query.get(sample_user.id).logged_in_at == datetime.utcnow()
|
|
||||||
|
|
||||||
|
|
||||||
def test_user_verify_password_invalid_password(notify_api,
|
def test_user_verify_password_invalid_password(client,
|
||||||
sample_user):
|
sample_user):
|
||||||
"""
|
data = json.dumps({'password': 'bad password'})
|
||||||
Tests POST endpoint '/<user_id>/verify/password' invalid endpoint.
|
auth_header = create_authorization_header()
|
||||||
"""
|
|
||||||
with notify_api.test_request_context():
|
|
||||||
with notify_api.test_client() as client:
|
|
||||||
data = json.dumps({'password': 'bad password'})
|
|
||||||
auth_header = create_authorization_header()
|
|
||||||
|
|
||||||
assert sample_user.failed_login_count == 0
|
assert sample_user.failed_login_count == 0
|
||||||
|
|
||||||
resp = client.post(
|
resp = client.post(
|
||||||
url_for('user.verify_user_password', user_id=sample_user.id),
|
url_for('user.verify_user_password', user_id=sample_user.id),
|
||||||
data=data,
|
data=data,
|
||||||
headers=[('Content-Type', 'application/json'), auth_header])
|
headers=[('Content-Type', 'application/json'), auth_header])
|
||||||
assert resp.status_code == 400
|
assert resp.status_code == 400
|
||||||
json_resp = json.loads(resp.get_data(as_text=True))
|
json_resp = json.loads(resp.get_data(as_text=True))
|
||||||
assert 'Incorrect password' in json_resp['message']['password']
|
assert 'Incorrect password' in json_resp['message']['password']
|
||||||
assert sample_user.failed_login_count == 1
|
assert sample_user.failed_login_count == 1
|
||||||
|
|
||||||
|
|
||||||
def test_user_verify_password_valid_password_resets_failed_logins(notify_api,
|
def test_user_verify_password_valid_password_resets_failed_logins(client,
|
||||||
sample_user):
|
sample_user):
|
||||||
with notify_api.test_request_context():
|
data = json.dumps({'password': 'bad password'})
|
||||||
with notify_api.test_client() as client:
|
auth_header = create_authorization_header()
|
||||||
data = json.dumps({'password': 'bad password'})
|
|
||||||
auth_header = create_authorization_header()
|
|
||||||
|
|
||||||
assert sample_user.failed_login_count == 0
|
assert sample_user.failed_login_count == 0
|
||||||
|
|
||||||
resp = client.post(
|
resp = client.post(
|
||||||
url_for('user.verify_user_password', user_id=sample_user.id),
|
url_for('user.verify_user_password', user_id=sample_user.id),
|
||||||
data=data,
|
data=data,
|
||||||
headers=[('Content-Type', 'application/json'), auth_header])
|
headers=[('Content-Type', 'application/json'), auth_header])
|
||||||
assert resp.status_code == 400
|
assert resp.status_code == 400
|
||||||
json_resp = json.loads(resp.get_data(as_text=True))
|
json_resp = json.loads(resp.get_data(as_text=True))
|
||||||
assert 'Incorrect password' in json_resp['message']['password']
|
assert 'Incorrect password' in json_resp['message']['password']
|
||||||
|
|
||||||
assert sample_user.failed_login_count == 1
|
assert sample_user.failed_login_count == 1
|
||||||
|
|
||||||
data = json.dumps({'password': 'password'})
|
data = json.dumps({'password': 'password'})
|
||||||
auth_header = create_authorization_header()
|
auth_header = create_authorization_header()
|
||||||
resp = client.post(
|
resp = client.post(
|
||||||
url_for('user.verify_user_password', user_id=sample_user.id),
|
url_for('user.verify_user_password', user_id=sample_user.id),
|
||||||
data=data,
|
data=data,
|
||||||
headers=[('Content-Type', 'application/json'), auth_header])
|
headers=[('Content-Type', 'application/json'), auth_header])
|
||||||
|
|
||||||
assert resp.status_code == 204
|
assert resp.status_code == 204
|
||||||
assert sample_user.failed_login_count == 0
|
assert sample_user.failed_login_count == 0
|
||||||
|
|
||||||
|
|
||||||
def test_user_verify_password_missing_password(notify_api,
|
def test_user_verify_password_missing_password(client,
|
||||||
sample_user):
|
sample_user):
|
||||||
"""
|
data = json.dumps({'bingo': 'bongo'})
|
||||||
Tests POST endpoint '/<user_id>/verify/password' missing password.
|
auth_header = create_authorization_header()
|
||||||
"""
|
resp = client.post(
|
||||||
with notify_api.test_request_context():
|
url_for('user.verify_user_password', user_id=sample_user.id),
|
||||||
with notify_api.test_client() as client:
|
data=data,
|
||||||
data = json.dumps({'bingo': 'bongo'})
|
headers=[('Content-Type', 'application/json'), auth_header])
|
||||||
auth_header = create_authorization_header()
|
assert resp.status_code == 400
|
||||||
resp = client.post(
|
json_resp = json.loads(resp.get_data(as_text=True))
|
||||||
url_for('user.verify_user_password', user_id=sample_user.id),
|
assert 'Required field missing data' in json_resp['message']['password']
|
||||||
data=data,
|
|
||||||
headers=[('Content-Type', 'application/json'), auth_header])
|
|
||||||
assert resp.status_code == 400
|
|
||||||
json_resp = json.loads(resp.get_data(as_text=True))
|
|
||||||
assert 'Required field missing data' in json_resp['message']['password']
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize('research_mode', [True, False])
|
@pytest.mark.parametrize('research_mode', [True, False])
|
||||||
@@ -293,22 +261,17 @@ def test_send_user_code_for_sms_with_optional_to_field(notify_api,
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def test_send_sms_code_returns_404_for_bad_input_data(notify_api, notify_db, notify_db_session):
|
def test_send_sms_code_returns_404_for_bad_input_data(client):
|
||||||
"""
|
data = json.dumps({})
|
||||||
Tests POST endpoint /user/<user_id>/sms-code return 404 for bad input data
|
import uuid
|
||||||
"""
|
uuid_ = uuid.uuid4()
|
||||||
with notify_api.test_request_context():
|
auth_header = create_authorization_header()
|
||||||
with notify_api.test_client() as client:
|
resp = client.post(
|
||||||
data = json.dumps({})
|
url_for('user.send_user_sms_code', user_id=uuid_),
|
||||||
import uuid
|
data=data,
|
||||||
uuid_ = uuid.uuid4()
|
headers=[('Content-Type', 'application/json'), auth_header])
|
||||||
auth_header = create_authorization_header()
|
assert resp.status_code == 404
|
||||||
resp = client.post(
|
assert json.loads(resp.get_data(as_text=True))['message'] == 'No result found'
|
||||||
url_for('user.send_user_sms_code', user_id=uuid_),
|
|
||||||
data=data,
|
|
||||||
headers=[('Content-Type', 'application/json'), auth_header])
|
|
||||||
assert resp.status_code == 404
|
|
||||||
assert json.loads(resp.get_data(as_text=True))['message'] == 'No result found'
|
|
||||||
|
|
||||||
|
|
||||||
def test_send_user_email_verification(client,
|
def test_send_user_email_verification(client,
|
||||||
|
|||||||
Reference in New Issue
Block a user