diff --git a/app/config.py b/app/config.py index 45762a33a..10c66a9f3 100644 --- a/app/config.py +++ b/app/config.py @@ -78,6 +78,7 @@ class Config(object): SMS_CHAR_COUNT_LIMIT = 495 BRANDING_PATH = '/images/email-template/crests/' TEST_MESSAGE_FILENAME = 'Test message' + MAX_VERIFY_CODE_COUNT = 10 NOTIFY_SERVICE_ID = 'd6aa2c68-a2d9-4437-ab19-3ae8eb202553' INVITATION_EMAIL_TEMPLATE_ID = '4f46df42-f795-4cc4-83bb-65ca312f49cc' diff --git a/app/dao/users_dao.py b/app/dao/users_dao.py index 491d05fea..020830b7c 100644 --- a/app/dao/users_dao.py +++ b/app/dao/users_dao.py @@ -1,8 +1,6 @@ import random from datetime import (datetime, timedelta) - from sqlalchemy import func - from app import db from app.models import (User, VerifyCode) @@ -35,7 +33,7 @@ def save_model_user(usr, update_dict={}, pwd=None): def create_user_code(user, code, code_type): verify_code = VerifyCode(code_type=code_type, - expiry_datetime=datetime.utcnow() + timedelta(hours=1), + expiry_datetime=datetime.utcnow() + timedelta(minutes=30), user=user) verify_code.code = code db.session.add(verify_code) @@ -48,7 +46,7 @@ def get_user_code(user, code, code_type): # time searching for the correct code. codes = VerifyCode.query.filter_by( user=user, code_type=code_type).order_by( - VerifyCode.created_at.desc()) + VerifyCode.created_at.desc()) retval = None for x in codes: if x.check_code(code): @@ -82,6 +80,15 @@ def delete_user_verify_codes(user): db.session.commit() +def count_user_verify_codes(user): + query = VerifyCode.query.filter( + VerifyCode.user == user, + VerifyCode.expiry_datetime > datetime.utcnow(), + VerifyCode.code_used.is_(False) + ) + return query.count() + + def get_user_by_id(user_id=None): if user_id: return User.query.filter_by(id=user_id).one() diff --git a/app/user/rest.py b/app/user/rest.py index 5f9dd91ed..ad1c0dee9 100644 --- a/app/user/rest.py +++ b/app/user/rest.py @@ -12,7 +12,8 @@ from app.dao.users_dao import ( get_user_by_email, create_secret_code, save_user_attribute, - update_user_password + update_user_password, + count_user_verify_codes ) from app.dao.permissions_dao import permission_dao from app.dao.services_dao import dao_fetch_service_by_id @@ -123,8 +124,10 @@ def verify_user_code(user_id): code = get_user_code(user_to_verify, txt_code, txt_type) if not code: + increment_failed_login_count(user_to_verify) raise InvalidRequest("Code not found", status_code=404) if datetime.utcnow() > code.expiry_datetime or code.code_used: + increment_failed_login_count(user_to_verify) raise InvalidRequest("Code has expired", status_code=400) use_user_code(code.id) return jsonify({}), 204 @@ -135,6 +138,11 @@ def send_user_sms_code(user_id): user_to_send_to = get_user_by_id(user_id=user_id) verify_code, errors = request_verify_code_schema.load(request.get_json()) + if count_user_verify_codes(user_to_send_to) >= current_app.config.get('MAX_VERIFY_CODE_COUNT'): + # Prevent more than `MAX_VERIFY_CODE_COUNT` active verify codes at a time + current_app.logger.warn('Max verify code has exceeded for user {}'.format(user_to_send_to.id)) + return jsonify({}), 204 + secret_code = create_secret_code() create_user_code(user_to_send_to, secret_code, SMS_TYPE) diff --git a/tests/app/dao/test_users_dao.py b/tests/app/dao/test_users_dao.py index e18ef6b79..c815eb66b 100644 --- a/tests/app/dao/test_users_dao.py +++ b/tests/app/dao/test_users_dao.py @@ -1,5 +1,6 @@ from datetime import datetime, timedelta +from freezegun import freeze_time from sqlalchemy.exc import DataError from sqlalchemy.orm.exc import NoResultFound import pytest @@ -14,8 +15,8 @@ from app.dao.users_dao import ( reset_failed_login_count, get_user_by_email, delete_codes_older_created_more_than_a_day_ago, - update_user_password -) + update_user_password, + count_user_verify_codes) from app.models import User, VerifyCode @@ -109,13 +110,14 @@ def test_should_not_delete_verification_codes_less_than_one_day_old(sample_user) assert VerifyCode.query.one()._code == "12345" -def make_verify_code(user, age=timedelta(hours=0), code="12335"): +def make_verify_code(user, age=timedelta(hours=0), expiry_age=timedelta(0), code="12335", code_used=False): verify_code = VerifyCode( code_type='sms', _code=code, created_at=datetime.utcnow() - age, - expiry_datetime=datetime.utcnow(), - user=user + expiry_datetime=datetime.utcnow() - expiry_age, + user=user, + code_used=code_used ) db.session.add(verify_code) db.session.commit() @@ -140,3 +142,12 @@ def test_update_user_password(notify_api, notify_db, notify_db_session, sample_u assert not sample_user.check_password(password) update_user_password(sample_user, password) assert sample_user.check_password(password) + + +def test_count_user_verify_codes(sample_user): + with freeze_time(datetime.utcnow() + timedelta(hours=1)): + make_verify_code(sample_user, code_used=True) + make_verify_code(sample_user, expiry_age=timedelta(hours=2)) + [make_verify_code(sample_user) for i in range(5)] + + assert count_user_verify_codes(sample_user) == 5 diff --git a/tests/app/user/test_rest_verify.py b/tests/app/user/test_rest_verify.py index 6f84d5154..1d9bba17e 100644 --- a/tests/app/user/test_rest_verify.py +++ b/tests/app/user/test_rest_verify.py @@ -1,5 +1,6 @@ import json -import moto +import uuid + import pytest from datetime import ( @@ -15,7 +16,7 @@ from app.models import ( Notification ) -from app import db, encryption +from app import db from tests import create_authorization_header from freezegun import freeze_time @@ -23,198 +24,141 @@ from freezegun import freeze_time import app.celery.tasks -def test_user_verify_code_sms(notify_api, - sample_sms_code): - """ - Tests POST endpoint '//verify/code' - """ - with notify_api.test_request_context(): - with notify_api.test_client() as client: - assert not VerifyCode.query.first().code_used - data = json.dumps({ - 'code_type': sample_sms_code.code_type, - 'code': sample_sms_code.txt_code}) - auth_header = create_authorization_header() - resp = client.post( - url_for('user.verify_user_code', user_id=sample_sms_code.user.id), - data=data, - headers=[('Content-Type', 'application/json'), auth_header]) - assert resp.status_code == 204 - assert VerifyCode.query.first().code_used +def test_user_verify_code(client, + sample_sms_code): + assert not VerifyCode.query.first().code_used + data = json.dumps({ + 'code_type': sample_sms_code.code_type, + 'code': sample_sms_code.txt_code}) + auth_header = create_authorization_header() + resp = client.post( + url_for('user.verify_user_code', user_id=sample_sms_code.user.id), + data=data, + headers=[('Content-Type', 'application/json'), auth_header]) + assert resp.status_code == 204 + assert VerifyCode.query.first().code_used -def test_user_verify_code_sms_missing_code(notify_api, - sample_sms_code): - """ - Tests POST endpoint '//verify/code' - """ - with notify_api.test_request_context(): - with notify_api.test_client() as client: - assert not VerifyCode.query.first().code_used - data = json.dumps({'code_type': sample_sms_code.code_type}) - auth_header = create_authorization_header() - resp = client.post( - url_for('user.verify_user_code', user_id=sample_sms_code.user.id), - data=data, - headers=[('Content-Type', 'application/json'), auth_header]) - assert resp.status_code == 400 - assert not VerifyCode.query.first().code_used +def test_user_verify_code_missing_code(client, + sample_sms_code): + assert not VerifyCode.query.first().code_used + data = json.dumps({'code_type': sample_sms_code.code_type}) + auth_header = create_authorization_header() + resp = client.post( + url_for('user.verify_user_code', user_id=sample_sms_code.user.id), + data=data, + headers=[('Content-Type', 'application/json'), auth_header]) + assert resp.status_code == 400 + assert not VerifyCode.query.first().code_used + assert User.query.get(sample_sms_code.user.id).failed_login_count == 0 -@moto.mock_sqs -def test_user_verify_code_email(notify_api, - sqs_client_conn, - sample_email_code): - """ - Tests POST endpoint '//verify/code' - """ - with notify_api.test_request_context(): - with notify_api.test_client() as client: - assert not VerifyCode.query.first().code_used - data = json.dumps({ - 'code_type': sample_email_code.code_type, - 'code': sample_email_code.txt_code}) - auth_header = create_authorization_header() - resp = client.post( - url_for('user.verify_user_code', user_id=sample_email_code.user.id), - data=data, - headers=[('Content-Type', 'application/json'), auth_header]) - assert resp.status_code == 204 - assert VerifyCode.query.first().code_used +def test_user_verify_code_bad_code_and_increments_failed_login_count(client, + sample_sms_code): + assert not VerifyCode.query.first().code_used + data = json.dumps({ + 'code_type': sample_sms_code.code_type, + 'code': "blah"}) + auth_header = create_authorization_header() + resp = client.post( + url_for('user.verify_user_code', user_id=sample_sms_code.user.id), + data=data, + headers=[('Content-Type', 'application/json'), auth_header]) + assert resp.status_code == 404 + assert not VerifyCode.query.first().code_used + assert User.query.get(sample_sms_code.user.id).failed_login_count == 1 -def test_user_verify_code_email_bad_code(notify_api, - sample_email_code): - """ - Tests POST endpoint '//verify/code' - """ - with notify_api.test_request_context(): - with notify_api.test_client() as client: - assert not VerifyCode.query.first().code_used - data = json.dumps({ - 'code_type': sample_email_code.code_type, - 'code': "blah"}) - auth_header = create_authorization_header() - resp = client.post( - url_for('user.verify_user_code', user_id=sample_email_code.user.id), - data=data, - headers=[('Content-Type', 'application/json'), auth_header]) - assert resp.status_code == 404 - assert not VerifyCode.query.first().code_used - - -def test_user_verify_code_email_expired_code(notify_api, - sample_email_code): - """ - Tests POST endpoint '//verify/code' - """ - with notify_api.test_request_context(): - with notify_api.test_client() as client: - assert not VerifyCode.query.first().code_used - sample_email_code.expiry_datetime = ( - datetime.utcnow() - timedelta(hours=1)) - db.session.add(sample_email_code) - db.session.commit() - data = json.dumps({ - 'code_type': sample_email_code.code_type, - 'code': sample_email_code.txt_code}) - auth_header = create_authorization_header() - resp = client.post( - url_for('user.verify_user_code', user_id=sample_email_code.user.id), - data=data, - headers=[('Content-Type', 'application/json'), auth_header]) - assert resp.status_code == 400 - assert not VerifyCode.query.first().code_used +def test_user_verify_code_expired_code_and_increments_failed_login_count( + client, + sample_sms_code): + assert not VerifyCode.query.first().code_used + sample_sms_code.expiry_datetime = ( + datetime.utcnow() - timedelta(hours=1)) + db.session.add(sample_sms_code) + db.session.commit() + data = json.dumps({ + 'code_type': sample_sms_code.code_type, + 'code': sample_sms_code.txt_code}) + auth_header = create_authorization_header() + resp = client.post( + url_for('user.verify_user_code', user_id=sample_sms_code.user.id), + data=data, + headers=[('Content-Type', 'application/json'), auth_header]) + assert resp.status_code == 400 + assert not VerifyCode.query.first().code_used + assert User.query.get(sample_sms_code.user.id).failed_login_count == 1 @freeze_time("2016-01-01 10:00:00.000000") -def test_user_verify_password(notify_api, - notify_db, +def test_user_verify_password(client, notify_db_session, sample_user): - """ - Tests POST endpoint '//verify/password' - """ - with notify_api.test_request_context(): - with notify_api.test_client() as client: - data = json.dumps({'password': 'password'}) - auth_header = create_authorization_header() - resp = client.post( - url_for('user.verify_user_password', user_id=sample_user.id), - data=data, - headers=[('Content-Type', 'application/json'), auth_header]) - assert resp.status_code == 204 - User.query.get(sample_user.id).logged_in_at == datetime.utcnow() + data = json.dumps({'password': 'password'}) + auth_header = create_authorization_header() + resp = client.post( + url_for('user.verify_user_password', user_id=sample_user.id), + data=data, + headers=[('Content-Type', 'application/json'), auth_header]) + assert resp.status_code == 204 + assert User.query.get(sample_user.id).logged_in_at == datetime.utcnow() -def test_user_verify_password_invalid_password(notify_api, +def test_user_verify_password_invalid_password(client, sample_user): - """ - Tests POST endpoint '//verify/password' invalid endpoint. - """ - with notify_api.test_request_context(): - with notify_api.test_client() as client: - data = json.dumps({'password': 'bad password'}) - auth_header = create_authorization_header() + data = json.dumps({'password': 'bad password'}) + auth_header = create_authorization_header() - assert sample_user.failed_login_count == 0 + assert sample_user.failed_login_count == 0 - resp = client.post( - url_for('user.verify_user_password', user_id=sample_user.id), - data=data, - headers=[('Content-Type', 'application/json'), auth_header]) - assert resp.status_code == 400 - json_resp = json.loads(resp.get_data(as_text=True)) - assert 'Incorrect password' in json_resp['message']['password'] - assert sample_user.failed_login_count == 1 + resp = client.post( + url_for('user.verify_user_password', user_id=sample_user.id), + data=data, + headers=[('Content-Type', 'application/json'), auth_header]) + assert resp.status_code == 400 + json_resp = json.loads(resp.get_data(as_text=True)) + assert 'Incorrect password' in json_resp['message']['password'] + assert sample_user.failed_login_count == 1 -def test_user_verify_password_valid_password_resets_failed_logins(notify_api, +def test_user_verify_password_valid_password_resets_failed_logins(client, sample_user): - with notify_api.test_request_context(): - with notify_api.test_client() as client: - data = json.dumps({'password': 'bad password'}) - auth_header = create_authorization_header() + data = json.dumps({'password': 'bad password'}) + auth_header = create_authorization_header() - assert sample_user.failed_login_count == 0 + assert sample_user.failed_login_count == 0 - resp = client.post( - url_for('user.verify_user_password', user_id=sample_user.id), - data=data, - headers=[('Content-Type', 'application/json'), auth_header]) - assert resp.status_code == 400 - json_resp = json.loads(resp.get_data(as_text=True)) - assert 'Incorrect password' in json_resp['message']['password'] + resp = client.post( + url_for('user.verify_user_password', user_id=sample_user.id), + data=data, + headers=[('Content-Type', 'application/json'), auth_header]) + assert resp.status_code == 400 + json_resp = json.loads(resp.get_data(as_text=True)) + assert 'Incorrect password' in json_resp['message']['password'] - assert sample_user.failed_login_count == 1 + assert sample_user.failed_login_count == 1 - data = json.dumps({'password': 'password'}) - auth_header = create_authorization_header() - resp = client.post( - url_for('user.verify_user_password', user_id=sample_user.id), - data=data, - headers=[('Content-Type', 'application/json'), auth_header]) + data = json.dumps({'password': 'password'}) + auth_header = create_authorization_header() + resp = client.post( + url_for('user.verify_user_password', user_id=sample_user.id), + data=data, + headers=[('Content-Type', 'application/json'), auth_header]) - assert resp.status_code == 204 - assert sample_user.failed_login_count == 0 + assert resp.status_code == 204 + assert sample_user.failed_login_count == 0 -def test_user_verify_password_missing_password(notify_api, +def test_user_verify_password_missing_password(client, sample_user): - """ - Tests POST endpoint '//verify/password' missing password. - """ - with notify_api.test_request_context(): - with notify_api.test_client() as client: - data = json.dumps({'bingo': 'bongo'}) - auth_header = create_authorization_header() - resp = client.post( - url_for('user.verify_user_password', user_id=sample_user.id), - data=data, - headers=[('Content-Type', 'application/json'), auth_header]) - assert resp.status_code == 400 - json_resp = json.loads(resp.get_data(as_text=True)) - assert 'Required field missing data' in json_resp['message']['password'] + auth_header = create_authorization_header() + resp = client.post( + url_for('user.verify_user_password', user_id=sample_user.id), + data=json.dumps({'bingo': 'bongo'}), + headers=[('Content-Type', 'application/json'), auth_header]) + assert resp.status_code == 400 + json_resp = json.loads(resp.get_data(as_text=True)) + assert 'Required field missing data' in json_resp['message']['password'] @pytest.mark.parametrize('research_mode', [True, False]) @@ -235,14 +179,13 @@ def test_send_user_sms_code(notify_api, notify_service.research_mode = True dao_update_service(notify_service) - data = json.dumps({}) auth_header = create_authorization_header() mocked = mocker.patch('app.user.rest.create_secret_code', return_value='11111') mocker.patch('app.celery.provider_tasks.deliver_sms.apply_async') resp = client.post( url_for('user.send_user_sms_code', user_id=sample_user.id), - data=data, + data=json.dumps({}), headers=[('Content-Type', 'application/json'), auth_header]) assert resp.status_code == 204 @@ -275,12 +218,11 @@ def test_send_user_code_for_sms_with_optional_to_field(notify_api, to_number = '+441119876757' mocked = mocker.patch('app.user.rest.create_secret_code', return_value='11111') mocker.patch('app.celery.provider_tasks.deliver_sms.apply_async') - data = json.dumps({'to': to_number}) auth_header = create_authorization_header() resp = client.post( url_for('user.send_user_sms_code', user_id=sample_user.id), - data=data, + data=json.dumps({'to': to_number}), headers=[('Content-Type', 'application/json'), auth_header]) assert resp.status_code == 204 @@ -293,34 +235,47 @@ def test_send_user_code_for_sms_with_optional_to_field(notify_api, ) -def test_send_sms_code_returns_404_for_bad_input_data(notify_api, notify_db, notify_db_session): - """ - Tests POST endpoint /user//sms-code return 404 for bad input data - """ - with notify_api.test_request_context(): - with notify_api.test_client() as client: - data = json.dumps({}) - import uuid - uuid_ = uuid.uuid4() - auth_header = create_authorization_header() - resp = client.post( - url_for('user.send_user_sms_code', user_id=uuid_), - data=data, - headers=[('Content-Type', 'application/json'), auth_header]) - assert resp.status_code == 404 - assert json.loads(resp.get_data(as_text=True))['message'] == 'No result found' +def test_send_sms_code_returns_404_for_bad_input_data(client): + uuid_ = uuid.uuid4() + auth_header = create_authorization_header() + resp = client.post( + url_for('user.send_user_sms_code', user_id=uuid_), + data=json.dumps({}), + headers=[('Content-Type', 'application/json'), auth_header]) + assert resp.status_code == 404 + assert json.loads(resp.get_data(as_text=True))['message'] == 'No result found' + + +def test_send_sms_code_returns_204_when_too_many_codes_already_created(client, sample_user): + for i in range(10): + verify_code = VerifyCode( + code_type='sms', + _code=12345, + created_at=datetime.utcnow() - timedelta(minutes=10), + expiry_datetime=datetime.utcnow() + timedelta(minutes=40), + user=sample_user + ) + db.session.add(verify_code) + db.session.commit() + assert VerifyCode.query.count() == 10 + auth_header = create_authorization_header() + resp = client.post( + url_for('user.send_user_sms_code', user_id=sample_user.id), + data=json.dumps({}), + headers=[('Content-Type', 'application/json'), auth_header]) + assert resp.status_code == 204 + assert VerifyCode.query.count() == 10 def test_send_user_email_verification(client, sample_user, mocker, email_verification_template): - data = json.dumps({}) mocked = mocker.patch('app.celery.provider_tasks.deliver_email.apply_async') auth_header = create_authorization_header() resp = client.post( url_for('user.send_user_email_verification', user_id=str(sample_user.id)), - data=data, + data=json.dumps({}), headers=[('Content-Type', 'application/json'), auth_header]) assert resp.status_code == 204 notification = Notification.query.first() @@ -332,13 +287,12 @@ def test_send_email_verification_returns_404_for_bad_input_data(client, notify_d Tests POST endpoint /user//sms-code return 404 for bad input data """ mocked = mocker.patch('app.celery.provider_tasks.deliver_email.apply_async') - data = json.dumps({}) import uuid uuid_ = uuid.uuid4() auth_header = create_authorization_header() resp = client.post( url_for('user.send_user_email_verification', user_id=uuid_), - data=data, + data=json.dumps({}), headers=[('Content-Type', 'application/json'), auth_header]) assert resp.status_code == 404 assert json.loads(resp.get_data(as_text=True))['message'] == 'No result found'