From ac9739f8a24ccae8f01800a5c11ad8f06f4d6861 Mon Sep 17 00:00:00 2001 From: Leo Hemsted Date: Thu, 16 Feb 2017 15:20:30 +0000 Subject: [PATCH 1/3] ensure we reset failed_login_count when appropriate in verify_user_password, if succesful we reset the failed_login_count. now we use failed_login_count for 2FA attempts, we need to make sure we reset it in other places too, so that people don't get blocked, especially in the reset-password user journey. * verify_user_code - if it's succesful, reset the failed_login_count * update_password - reset failed_login_count because either * you're logged in and so it's 0 anyway * you're resetting your password via pword reset link, and the old count isn't relevant anymore --- app/dao/users_dao.py | 2 + app/user/rest.py | 1 + tests/app/user/test_rest.py | 17 ++++- tests/app/user/test_rest_verify.py | 117 +++++++++++++++-------------- 4 files changed, 79 insertions(+), 58 deletions(-) diff --git a/app/dao/users_dao.py b/app/dao/users_dao.py index 020830b7c..904231105 100644 --- a/app/dao/users_dao.py +++ b/app/dao/users_dao.py @@ -113,6 +113,8 @@ def reset_failed_login_count(user): def update_user_password(user, password): + # reset failed login count - they've just reset their password so should be fine + user.failed_login_count = 0 user.password = password user.password_changed_at = datetime.utcnow() db.session.add(user) diff --git a/app/user/rest.py b/app/user/rest.py index ad1c0dee9..c252c2aec 100644 --- a/app/user/rest.py +++ b/app/user/rest.py @@ -130,6 +130,7 @@ def verify_user_code(user_id): increment_failed_login_count(user_to_verify) raise InvalidRequest("Code has expired", status_code=400) use_user_code(code.id) + reset_failed_login_count(user_to_verify) return jsonify({}), 204 diff --git a/tests/app/user/test_rest.py b/tests/app/user/test_rest.py index 0f4e4295b..22fb1ef71 100644 --- a/tests/app/user/test_rest.py +++ b/tests/app/user/test_rest.py @@ -535,7 +535,6 @@ def test_send_user_confirm_new_email_returns_400_when_email_missing(client, samp def test_update_user_password_saves_correctly(client, sample_service): - assert User.query.count() == 1 sample_user = sample_service.users[0] new_password = '1234567890' data = { @@ -548,7 +547,7 @@ def test_update_user_password_saves_correctly(client, sample_service): data=json.dumps(data), headers=headers) assert resp.status_code == 200 - assert User.query.count() == 1 + json_resp = json.loads(resp.get_data(as_text=True)) assert json_resp['data']['password_changed_at'] is not None data = {'password': new_password} @@ -559,3 +558,17 @@ def test_update_user_password_saves_correctly(client, sample_service): data=json.dumps(data), headers=headers) assert resp.status_code == 204 + + +def test_update_user_password_resets_failed_login_count(client, sample_service): + user = sample_service.users[0] + user.failed_login_count = 1 + + resp = client.post( + url_for('user.update_password', user_id=user.id), + data=json.dumps({'_password': 'foo'}), + headers=[('Content-Type', 'application/json'), create_authorization_header()] + ) + + assert resp.status_code == 200 + assert user.failed_login_count == 0 diff --git a/tests/app/user/test_rest_verify.py b/tests/app/user/test_rest_verify.py index 1d9bba17e..63d4b61d3 100644 --- a/tests/app/user/test_rest_verify.py +++ b/tests/app/user/test_rest_verify.py @@ -1,27 +1,24 @@ import json import uuid - -import pytest - from datetime import ( datetime, timedelta ) +import pytest from flask import url_for, current_app +from freezegun import freeze_time + from app.dao.services_dao import dao_update_service, dao_fetch_service_by_id from app.models import ( VerifyCode, User, Notification ) - from app import db +import app.celery.tasks from tests import create_authorization_header -from freezegun import freeze_time - -import app.celery.tasks def test_user_verify_code(client, @@ -163,7 +160,7 @@ def test_user_verify_password_missing_password(client, @pytest.mark.parametrize('research_mode', [True, False]) @freeze_time("2016-01-01 11:09:00.061258") -def test_send_user_sms_code(notify_api, +def test_send_user_sms_code(client, sample_user, sms_code_template, mocker, @@ -171,68 +168,63 @@ def test_send_user_sms_code(notify_api, """ Tests POST endpoint /user//sms-code """ + if research_mode: + notify_service = dao_fetch_service_by_id(current_app.config['NOTIFY_SERVICE_ID']) + notify_service.research_mode = True + dao_update_service(notify_service) - with notify_api.test_request_context(): - with notify_api.test_client() as client: - if research_mode: - notify_service = dao_fetch_service_by_id(current_app.config['NOTIFY_SERVICE_ID']) - notify_service.research_mode = True - dao_update_service(notify_service) + auth_header = create_authorization_header() + mocked = mocker.patch('app.user.rest.create_secret_code', return_value='11111') + mocker.patch('app.celery.provider_tasks.deliver_sms.apply_async') - auth_header = create_authorization_header() - mocked = mocker.patch('app.user.rest.create_secret_code', return_value='11111') - mocker.patch('app.celery.provider_tasks.deliver_sms.apply_async') + resp = client.post( + url_for('user.send_user_sms_code', user_id=sample_user.id), + data=json.dumps({}), + headers=[('Content-Type', 'application/json'), auth_header]) + assert resp.status_code == 204 - resp = client.post( - url_for('user.send_user_sms_code', user_id=sample_user.id), - data=json.dumps({}), - headers=[('Content-Type', 'application/json'), auth_header]) - assert resp.status_code == 204 + assert mocked.call_count == 1 + assert VerifyCode.query.count() == 1 + assert VerifyCode.query.first().check_code('11111') - assert mocked.call_count == 1 - assert VerifyCode.query.count() == 1 - assert VerifyCode.query.first().check_code('11111') + assert Notification.query.count() == 1 + notification = Notification.query.first() + assert notification.personalisation == {'verify_code': '11111'} + assert notification.to == sample_user.mobile_number + assert str(notification.service_id) == current_app.config['NOTIFY_SERVICE_ID'] - assert Notification.query.count() == 1 - notification = Notification.query.first() - assert notification.personalisation == {'verify_code': '11111'} - assert notification.to == sample_user.mobile_number - assert str(notification.service_id) == current_app.config['NOTIFY_SERVICE_ID'] - - app.celery.provider_tasks.deliver_sms.apply_async.assert_called_once_with( - ([str(notification.id)]), - queue="notify" - ) + app.celery.provider_tasks.deliver_sms.apply_async.assert_called_once_with( + ([str(notification.id)]), + queue="notify" + ) @freeze_time("2016-01-01 11:09:00.061258") -def test_send_user_code_for_sms_with_optional_to_field(notify_api, +def test_send_user_code_for_sms_with_optional_to_field(client, sample_user, sms_code_template, mocker): """ Tests POST endpoint /user//sms-code with optional to field """ - with notify_api.test_request_context(): - with notify_api.test_client() as client: - to_number = '+441119876757' - mocked = mocker.patch('app.user.rest.create_secret_code', return_value='11111') - mocker.patch('app.celery.provider_tasks.deliver_sms.apply_async') - auth_header = create_authorization_header() + to_number = '+441119876757' + mocked = mocker.patch('app.user.rest.create_secret_code', return_value='11111') + mocker.patch('app.celery.provider_tasks.deliver_sms.apply_async') + auth_header = create_authorization_header() - resp = client.post( - url_for('user.send_user_sms_code', user_id=sample_user.id), - data=json.dumps({'to': to_number}), - headers=[('Content-Type', 'application/json'), auth_header]) + resp = client.post( + url_for('user.send_user_sms_code', user_id=sample_user.id), + data=json.dumps({'to': to_number}), + headers=[('Content-Type', 'application/json'), auth_header]) - assert resp.status_code == 204 - assert mocked.call_count == 1 - notification = Notification.query.first() - assert notification.to == to_number - app.celery.provider_tasks.deliver_sms.apply_async.assert_called_once_with( - ([str(notification.id)]), - queue="notify" - ) + assert resp.status_code == 204 + assert mocked.call_count == 1 + notification = Notification.query.first() + assert notification.to == to_number + app.celery.provider_tasks.deliver_sms.apply_async.assert_called_once_with( + ([str(notification.id)]), + queue="notify" + ) def test_send_sms_code_returns_404_for_bad_input_data(client): @@ -282,12 +274,11 @@ def test_send_user_email_verification(client, mocked.assert_called_once_with(([str(notification.id)]), queue="notify") -def test_send_email_verification_returns_404_for_bad_input_data(client, notify_db, notify_db_session, mocker): +def test_send_email_verification_returns_404_for_bad_input_data(client, notify_db_session, mocker): """ Tests POST endpoint /user//sms-code return 404 for bad input data """ mocked = mocker.patch('app.celery.provider_tasks.deliver_email.apply_async') - import uuid uuid_ = uuid.uuid4() auth_header = create_authorization_header() resp = client.post( @@ -297,3 +288,17 @@ def test_send_email_verification_returns_404_for_bad_input_data(client, notify_d assert resp.status_code == 404 assert json.loads(resp.get_data(as_text=True))['message'] == 'No result found' assert mocked.call_count == 0 + + +def test_user_verify_user_code_valid_code_resets_failed_login_count(client, sample_sms_code): + sample_sms_code.failed_login_count = 1 + data = json.dumps({ + 'code_type': sample_sms_code.code_type, + 'code': sample_sms_code.txt_code}) + resp = client.post( + url_for('user.verify_user_code', user_id=sample_sms_code.user.id), + data=data, + headers=[('Content-Type', 'application/json'), create_authorization_header()]) + assert resp.status_code == 204 + assert sample_sms_code.user.failed_login_count == 0 + assert sample_sms_code.code_used From 521872ce63d824b9a338f0e3eac732768797b89c Mon Sep 17 00:00:00 2001 From: Leo Hemsted Date: Thu, 16 Feb 2017 17:37:21 +0000 Subject: [PATCH 2/3] update_user now resets failed_login_count if password is changed until work is done to stop using PUT /user/{id} on the admin app, this function also needs to reset failed logins, cos it's used during the forgotten password flow --- app/dao/users_dao.py | 1 - app/user/rest.py | 13 ++++++++----- tests/app/user/test_rest.py | 22 ++++++++++++++++++++++ 3 files changed, 30 insertions(+), 6 deletions(-) diff --git a/app/dao/users_dao.py b/app/dao/users_dao.py index 904231105..fd1723fc7 100644 --- a/app/dao/users_dao.py +++ b/app/dao/users_dao.py @@ -114,7 +114,6 @@ def reset_failed_login_count(user): def update_user_password(user, password): # reset failed login count - they've just reset their password so should be fine - user.failed_login_count = 0 user.password = password user.password_changed_at = datetime.utcnow() db.session.add(user) diff --git a/app/user/rest.py b/app/user/rest.py index c252c2aec..0a0455591 100644 --- a/app/user/rest.py +++ b/app/user/rest.py @@ -59,12 +59,14 @@ def update_user(user_id): user_to_update = get_user_by_id(user_id=user_id) req_json = request.get_json() update_dct, errors = user_schema_load_json.load(req_json) + # TODO don't let password be updated in this PUT method (currently used by the forgot password flow) pwd = req_json.get('password', None) - # TODO password validation, it is already done on the admin app - # but would be good to have the same validation here. - if pwd is not None and not pwd: - errors.update({'password': ['Invalid data for field']}) - raise InvalidRequest(errors, status_code=400) + if pwd is not None: + if not pwd: + errors.update({'password': ['Invalid data for field']}) + raise InvalidRequest(errors, status_code=400) + else: + reset_failed_login_count(user_to_update) save_model_user(user_to_update, update_dict=update_dct, pwd=pwd) return jsonify(data=user_schema.dump(user_to_update).data), 200 @@ -324,6 +326,7 @@ def update_password(user_id): if errors: raise InvalidRequest(errors, status_code=400) + reset_failed_login_count(user) update_user_password(user, pwd) return jsonify(data=user_schema.dump(user).data), 200 diff --git a/tests/app/user/test_rest.py b/tests/app/user/test_rest.py index 22fb1ef71..e66ab816f 100644 --- a/tests/app/user/test_rest.py +++ b/tests/app/user/test_rest.py @@ -153,6 +153,7 @@ def test_put_user(notify_api, notify_db, notify_db_session, sample_service): with notify_api.test_client() as client: assert User.query.count() == 1 sample_user = sample_service.users[0] + sample_user.failed_login_count = 1 new_email = 'new@digital.cabinet-office.gov.uk' data = { 'name': sample_user.name, @@ -178,6 +179,8 @@ def test_put_user(notify_api, notify_db, notify_db_session, sample_service): assert new_email == fetched['email_address'] assert sample_user.state == fetched['state'] assert sorted(expected_permissions) == sorted(fetched['permissions'][str(sample_service.id)]) + # password wasn't updated, so failed_login_count stays the same + assert sample_user.failed_login_count == 1 @pytest.mark.parametrize('user_attribute, user_value', [ @@ -572,3 +575,22 @@ def test_update_user_password_resets_failed_login_count(client, sample_service): assert resp.status_code == 200 assert user.failed_login_count == 0 + + +def test_update_user_resets_failed_login_count_if_updating_password(client, sample_service): + user = sample_service.users[0] + user.failed_login_count = 1 + + resp = client.put( + url_for('user.update_user', user_id=user.id), + data=json.dumps({ + 'name': user.name, + 'email_address': user.email_address, + 'mobile_number': user.mobile_number, + 'password': 'foo' + }), + headers=[('Content-Type', 'application/json'), create_authorization_header()] + ) + + assert resp.status_code == 200 + assert user.failed_login_count == 0 From 0515c5147557d0a66a4ed11b61b2148410c31957 Mon Sep 17 00:00:00 2001 From: Leo Hemsted Date: Thu, 16 Feb 2017 17:44:04 +0000 Subject: [PATCH 3/3] replace notify_api with client fixture in user/test_rest.py --- tests/app/user/test_rest.py | 600 +++++++++++++---------------- tests/app/user/test_rest_verify.py | 2 +- 2 files changed, 272 insertions(+), 330 deletions(-) diff --git a/tests/app/user/test_rest.py b/tests/app/user/test_rest.py index e66ab816f..a1070fcff 100644 --- a/tests/app/user/test_rest.py +++ b/tests/app/user/test_rest.py @@ -10,177 +10,165 @@ from app.dao.permissions_dao import default_service_permissions from tests import create_authorization_header -def test_get_user_list(notify_api, notify_db, notify_db_session, sample_service): +def test_get_user_list(client, sample_service): """ Tests GET endpoint '/' to retrieve entire user list. """ - with notify_api.test_request_context(): - with notify_api.test_client() as client: - header = create_authorization_header() - response = client.get(url_for('user.get_user'), - headers=[header]) - assert response.status_code == 200 - json_resp = json.loads(response.get_data(as_text=True)) - assert len(json_resp['data']) == 1 - sample_user = sample_service.users[0] - expected_permissions = default_service_permissions - fetched = json_resp['data'][0] + header = create_authorization_header() + response = client.get(url_for('user.get_user'), + headers=[header]) + assert response.status_code == 200 + json_resp = json.loads(response.get_data(as_text=True)) + assert len(json_resp['data']) == 1 + sample_user = sample_service.users[0] + expected_permissions = default_service_permissions + fetched = json_resp['data'][0] - assert str(sample_user.id) == fetched['id'] - assert sample_user.name == fetched['name'] - assert sample_user.mobile_number == fetched['mobile_number'] - assert sample_user.email_address == fetched['email_address'] - assert sample_user.state == fetched['state'] - assert sorted(expected_permissions) == sorted(fetched['permissions'][str(sample_service.id)]) + assert str(sample_user.id) == fetched['id'] + assert sample_user.name == fetched['name'] + assert sample_user.mobile_number == fetched['mobile_number'] + assert sample_user.email_address == fetched['email_address'] + assert sample_user.state == fetched['state'] + assert sorted(expected_permissions) == sorted(fetched['permissions'][str(sample_service.id)]) -def test_get_user(notify_api, notify_db, notify_db_session, sample_service): +def test_get_user(client, sample_service): """ Tests GET endpoint '/' to retrieve a single service. """ - with notify_api.test_request_context(): - with notify_api.test_client() as client: - sample_user = sample_service.users[0] - header = create_authorization_header() - resp = client.get(url_for('user.get_user', - user_id=sample_user.id), - headers=[header]) - assert resp.status_code == 200 - json_resp = json.loads(resp.get_data(as_text=True)) + sample_user = sample_service.users[0] + header = create_authorization_header() + resp = client.get(url_for('user.get_user', + user_id=sample_user.id), + headers=[header]) + assert resp.status_code == 200 + json_resp = json.loads(resp.get_data(as_text=True)) - expected_permissions = default_service_permissions - fetched = json_resp['data'] + expected_permissions = default_service_permissions + fetched = json_resp['data'] - assert str(sample_user.id) == fetched['id'] - assert sample_user.name == fetched['name'] - assert sample_user.mobile_number == fetched['mobile_number'] - assert sample_user.email_address == fetched['email_address'] - assert sample_user.state == fetched['state'] - assert sorted(expected_permissions) == sorted(fetched['permissions'][str(sample_service.id)]) + assert str(sample_user.id) == fetched['id'] + assert sample_user.name == fetched['name'] + assert sample_user.mobile_number == fetched['mobile_number'] + assert sample_user.email_address == fetched['email_address'] + assert sample_user.state == fetched['state'] + assert sorted(expected_permissions) == sorted(fetched['permissions'][str(sample_service.id)]) -def test_post_user(notify_api, notify_db, notify_db_session): +def test_post_user(client, notify_db, notify_db_session): """ Tests POST endpoint '/' to create a user. """ - with notify_api.test_request_context(): - with notify_api.test_client() as client: - assert User.query.count() == 0 - data = { - "name": "Test User", - "email_address": "user@digital.cabinet-office.gov.uk", - "password": "password", - "mobile_number": "+447700900986", - "logged_in_at": None, - "state": "active", - "failed_login_count": 0, - "permissions": {} - } - auth_header = create_authorization_header() - headers = [('Content-Type', 'application/json'), auth_header] - resp = client.post( - url_for('user.create_user'), - data=json.dumps(data), - headers=headers) - assert resp.status_code == 201 - user = User.query.filter_by(email_address='user@digital.cabinet-office.gov.uk').first() - json_resp = json.loads(resp.get_data(as_text=True)) - assert json_resp['data']['email_address'] == user.email_address - assert json_resp['data']['id'] == str(user.id) + assert User.query.count() == 0 + data = { + "name": "Test User", + "email_address": "user@digital.cabinet-office.gov.uk", + "password": "password", + "mobile_number": "+447700900986", + "logged_in_at": None, + "state": "active", + "failed_login_count": 0, + "permissions": {} + } + auth_header = create_authorization_header() + headers = [('Content-Type', 'application/json'), auth_header] + resp = client.post( + url_for('user.create_user'), + data=json.dumps(data), + headers=headers) + assert resp.status_code == 201 + user = User.query.filter_by(email_address='user@digital.cabinet-office.gov.uk').first() + json_resp = json.loads(resp.get_data(as_text=True)) + assert json_resp['data']['email_address'] == user.email_address + assert json_resp['data']['id'] == str(user.id) -def test_post_user_missing_attribute_email(notify_api, notify_db, notify_db_session): +def test_post_user_missing_attribute_email(client, notify_db, notify_db_session): """ Tests POST endpoint '/' missing attribute email. """ - with notify_api.test_request_context(): - with notify_api.test_client() as client: - assert User.query.count() == 0 - data = { - "name": "Test User", - "password": "password", - "mobile_number": "+447700900986", - "logged_in_at": None, - "state": "active", - "failed_login_count": 0, - "permissions": {} - } - auth_header = create_authorization_header() - headers = [('Content-Type', 'application/json'), auth_header] - resp = client.post( - url_for('user.create_user'), - data=json.dumps(data), - headers=headers) - assert resp.status_code == 400 - assert User.query.count() == 0 - json_resp = json.loads(resp.get_data(as_text=True)) - assert {'email_address': ['Missing data for required field.']} == json_resp['message'] + assert User.query.count() == 0 + data = { + "name": "Test User", + "password": "password", + "mobile_number": "+447700900986", + "logged_in_at": None, + "state": "active", + "failed_login_count": 0, + "permissions": {} + } + auth_header = create_authorization_header() + headers = [('Content-Type', 'application/json'), auth_header] + resp = client.post( + url_for('user.create_user'), + data=json.dumps(data), + headers=headers) + assert resp.status_code == 400 + assert User.query.count() == 0 + json_resp = json.loads(resp.get_data(as_text=True)) + assert {'email_address': ['Missing data for required field.']} == json_resp['message'] -def test_create_user_missing_attribute_password(notify_api, notify_db, notify_db_session): +def test_create_user_missing_attribute_password(client, notify_db, notify_db_session): """ Tests POST endpoint '/' missing attribute password. """ - with notify_api.test_request_context(): - with notify_api.test_client() as client: - assert User.query.count() == 0 - data = { - "name": "Test User", - "email_address": "user@digital.cabinet-office.gov.uk", - "mobile_number": "+447700900986", - "logged_in_at": None, - "state": "active", - "failed_login_count": 0, - "permissions": {} - } - auth_header = create_authorization_header() - headers = [('Content-Type', 'application/json'), auth_header] - resp = client.post( - url_for('user.create_user'), - data=json.dumps(data), - headers=headers) - assert resp.status_code == 400 - assert User.query.count() == 0 - json_resp = json.loads(resp.get_data(as_text=True)) - assert {'password': ['Missing data for required field.']} == json_resp['message'] + assert User.query.count() == 0 + data = { + "name": "Test User", + "email_address": "user@digital.cabinet-office.gov.uk", + "mobile_number": "+447700900986", + "logged_in_at": None, + "state": "active", + "failed_login_count": 0, + "permissions": {} + } + auth_header = create_authorization_header() + headers = [('Content-Type', 'application/json'), auth_header] + resp = client.post( + url_for('user.create_user'), + data=json.dumps(data), + headers=headers) + assert resp.status_code == 400 + assert User.query.count() == 0 + json_resp = json.loads(resp.get_data(as_text=True)) + assert {'password': ['Missing data for required field.']} == json_resp['message'] -def test_put_user(notify_api, notify_db, notify_db_session, sample_service): +def test_put_user(client, sample_service): """ Tests PUT endpoint '/' to update a user. """ - with notify_api.test_request_context(): - with notify_api.test_client() as client: - assert User.query.count() == 1 - sample_user = sample_service.users[0] - sample_user.failed_login_count = 1 - new_email = 'new@digital.cabinet-office.gov.uk' - data = { - 'name': sample_user.name, - 'email_address': new_email, - 'mobile_number': sample_user.mobile_number - } - auth_header = create_authorization_header() - headers = [('Content-Type', 'application/json'), auth_header] - resp = client.put( - url_for('user.update_user', user_id=sample_user.id), - data=json.dumps(data), - headers=headers) - assert resp.status_code == 200 - assert User.query.count() == 1 - json_resp = json.loads(resp.get_data(as_text=True)) - assert json_resp['data']['email_address'] == new_email - expected_permissions = default_service_permissions - fetched = json_resp['data'] + assert User.query.count() == 1 + sample_user = sample_service.users[0] + sample_user.failed_login_count = 1 + new_email = 'new@digital.cabinet-office.gov.uk' + data = { + 'name': sample_user.name, + 'email_address': new_email, + 'mobile_number': sample_user.mobile_number + } + auth_header = create_authorization_header() + headers = [('Content-Type', 'application/json'), auth_header] + resp = client.put( + url_for('user.update_user', user_id=sample_user.id), + data=json.dumps(data), + headers=headers) + assert resp.status_code == 200 + assert User.query.count() == 1 + json_resp = json.loads(resp.get_data(as_text=True)) + assert json_resp['data']['email_address'] == new_email + expected_permissions = default_service_permissions + fetched = json_resp['data'] - assert str(sample_user.id) == fetched['id'] - assert sample_user.name == fetched['name'] - assert sample_user.mobile_number == fetched['mobile_number'] - assert new_email == fetched['email_address'] - assert sample_user.state == fetched['state'] - assert sorted(expected_permissions) == sorted(fetched['permissions'][str(sample_service.id)]) - # password wasn't updated, so failed_login_count stays the same - assert sample_user.failed_login_count == 1 + assert str(sample_user.id) == fetched['id'] + assert sample_user.name == fetched['name'] + assert sample_user.mobile_number == fetched['mobile_number'] + assert new_email == fetched['email_address'] + assert sample_user.state == fetched['state'] + assert sorted(expected_permissions) == sorted(fetched['permissions'][str(sample_service.id)]) + # password wasn't updated, so failed_login_count stays the same + assert sample_user.failed_login_count == 1 @pytest.mark.parametrize('user_attribute, user_value', [ @@ -206,214 +194,169 @@ def test_post_user_attribute(client, sample_user, user_attribute, user_value): assert json_resp['data'][user_attribute] == user_value -def test_put_user_update_password(notify_api, - notify_db, - notify_db_session, - sample_service): +def test_put_user_update_password(client, sample_service): """ Tests PUT endpoint '/' to update a user including their password. """ - with notify_api.test_request_context(): - with notify_api.test_client() as client: - assert User.query.count() == 1 - sample_user = sample_service.users[0] - new_password = '1234567890' - data = { - 'name': sample_user.name, - 'email_address': sample_user.email_address, - 'mobile_number': sample_user.mobile_number, - 'password': new_password - } - auth_header = create_authorization_header() - headers = [('Content-Type', 'application/json'), auth_header] - resp = client.put( - url_for('user.update_user', user_id=sample_user.id), - data=json.dumps(data), - headers=headers) - assert resp.status_code == 200 - assert User.query.count() == 1 - json_resp = json.loads(resp.get_data(as_text=True)) - assert json_resp['data']['password_changed_at'] is not None - data = {'password': new_password} - auth_header = create_authorization_header() - headers = [('Content-Type', 'application/json'), auth_header] - resp = client.post( - url_for('user.verify_user_password', user_id=str(sample_user.id)), - data=json.dumps(data), - headers=headers) - assert resp.status_code == 204 + assert User.query.count() == 1 + sample_user = sample_service.users[0] + new_password = '1234567890' + data = { + 'name': sample_user.name, + 'email_address': sample_user.email_address, + 'mobile_number': sample_user.mobile_number, + 'password': new_password + } + auth_header = create_authorization_header() + headers = [('Content-Type', 'application/json'), auth_header] + resp = client.put( + url_for('user.update_user', user_id=sample_user.id), + data=json.dumps(data), + headers=headers) + assert resp.status_code == 200 + assert User.query.count() == 1 + json_resp = json.loads(resp.get_data(as_text=True)) + assert json_resp['data']['password_changed_at'] is not None + data = {'password': new_password} + auth_header = create_authorization_header() + headers = [('Content-Type', 'application/json'), auth_header] + resp = client.post( + url_for('user.verify_user_password', user_id=str(sample_user.id)), + data=json.dumps(data), + headers=headers) + assert resp.status_code == 204 -def test_put_user_not_exists(notify_api, notify_db, notify_db_session, sample_user, fake_uuid): +def test_put_user_not_exists(client, sample_user, fake_uuid): """ Tests PUT endpoint '/' to update a user doesn't exist. """ - with notify_api.test_request_context(): - with notify_api.test_client() as client: - assert User.query.count() == 1 - new_email = 'new@digital.cabinet-office.gov.uk' - data = {'email_address': new_email} - auth_header = create_authorization_header() - headers = [('Content-Type', 'application/json'), auth_header] - resp = client.put( - url_for('user.update_user', user_id=fake_uuid), - data=json.dumps(data), - headers=headers) - assert resp.status_code == 404 - assert User.query.count() == 1 - user = User.query.filter_by(id=str(sample_user.id)).first() - json_resp = json.loads(resp.get_data(as_text=True)) - assert json_resp['result'] == "error" - assert json_resp['message'] == 'No result found' + assert User.query.count() == 1 + new_email = 'new@digital.cabinet-office.gov.uk' + data = {'email_address': new_email} + auth_header = create_authorization_header() + headers = [('Content-Type', 'application/json'), auth_header] + resp = client.put( + url_for('user.update_user', user_id=fake_uuid), + data=json.dumps(data), + headers=headers) + assert resp.status_code == 404 + assert User.query.count() == 1 + user = User.query.filter_by(id=str(sample_user.id)).first() + json_resp = json.loads(resp.get_data(as_text=True)) + assert json_resp['result'] == "error" + assert json_resp['message'] == 'No result found' - assert user == sample_user - assert user.email_address != new_email + assert user == sample_user + assert user.email_address != new_email -def test_get_user_by_email(notify_api, notify_db, notify_db_session, sample_service): +def test_get_user_by_email(client, sample_service): + sample_user = sample_service.users[0] + header = create_authorization_header() + url = url_for('user.get_by_email', email=sample_user.email_address) + resp = client.get(url, headers=[header]) + assert resp.status_code == 200 - with notify_api.test_request_context(): - with notify_api.test_client() as client: - sample_user = sample_service.users[0] - header = create_authorization_header() - url = url_for('user.get_by_email', email=sample_user.email_address) - resp = client.get(url, headers=[header]) - assert resp.status_code == 200 + json_resp = json.loads(resp.get_data(as_text=True)) + expected_permissions = default_service_permissions + fetched = json_resp['data'] - json_resp = json.loads(resp.get_data(as_text=True)) - expected_permissions = default_service_permissions - fetched = json_resp['data'] - - assert str(sample_user.id) == fetched['id'] - assert sample_user.name == fetched['name'] - assert sample_user.mobile_number == fetched['mobile_number'] - assert sample_user.email_address == fetched['email_address'] - assert sample_user.state == fetched['state'] - assert sorted(expected_permissions) == sorted(fetched['permissions'][str(sample_service.id)]) + assert str(sample_user.id) == fetched['id'] + assert sample_user.name == fetched['name'] + assert sample_user.mobile_number == fetched['mobile_number'] + assert sample_user.email_address == fetched['email_address'] + assert sample_user.state == fetched['state'] + assert sorted(expected_permissions) == sorted(fetched['permissions'][str(sample_service.id)]) -def test_get_user_by_email_not_found_returns_404(notify_api, - notify_db, - notify_db_session, - sample_user): - - with notify_api.test_request_context(): - with notify_api.test_client() as client: - header = create_authorization_header() - url = url_for('user.get_by_email', email='no_user@digital.gov.uk') - resp = client.get(url, headers=[header]) - assert resp.status_code == 404 - json_resp = json.loads(resp.get_data(as_text=True)) - assert json_resp['result'] == 'error' - assert json_resp['message'] == 'No result found' +def test_get_user_by_email_not_found_returns_404(client, sample_user): + header = create_authorization_header() + url = url_for('user.get_by_email', email='no_user@digital.gov.uk') + resp = client.get(url, headers=[header]) + assert resp.status_code == 404 + json_resp = json.loads(resp.get_data(as_text=True)) + assert json_resp['result'] == 'error' + assert json_resp['message'] == 'No result found' -def test_get_user_by_email_bad_url_returns_404(notify_api, - notify_db, - notify_db_session, - sample_user): - - with notify_api.test_request_context(): - with notify_api.test_client() as client: - header = create_authorization_header() - url = '/user/email' - resp = client.get(url, headers=[header]) - assert resp.status_code == 400 - json_resp = json.loads(resp.get_data(as_text=True)) - assert json_resp['result'] == 'error' - assert json_resp['message'] == 'Invalid request. Email query string param required' +def test_get_user_by_email_bad_url_returns_404(client, sample_user): + header = create_authorization_header() + url = '/user/email' + resp = client.get(url, headers=[header]) + assert resp.status_code == 400 + json_resp = json.loads(resp.get_data(as_text=True)) + assert json_resp['result'] == 'error' + assert json_resp['message'] == 'Invalid request. Email query string param required' -def test_get_user_with_permissions(notify_api, - notify_db, - notify_db_session, - sample_service_permission): - with notify_api.test_request_context(): - with notify_api.test_client() as client: - header = create_authorization_header() - response = client.get(url_for('user.get_user', user_id=str(sample_service_permission.user.id)), - headers=[header]) - assert response.status_code == 200 - permissions = json.loads(response.get_data(as_text=True))['data']['permissions'] - assert sample_service_permission.permission in permissions[str(sample_service_permission.service.id)] +def test_get_user_with_permissions(client, sample_service_permission): + header = create_authorization_header() + response = client.get(url_for('user.get_user', user_id=str(sample_service_permission.user.id)), + headers=[header]) + assert response.status_code == 200 + permissions = json.loads(response.get_data(as_text=True))['data']['permissions'] + assert sample_service_permission.permission in permissions[str(sample_service_permission.service.id)] -def test_set_user_permissions(notify_api, - notify_db, - notify_db_session, - sample_user, - sample_service): - with notify_api.test_request_context(): - with notify_api.test_client() as client: - data = json.dumps([{'permission': MANAGE_SETTINGS}]) - header = create_authorization_header() - headers = [('Content-Type', 'application/json'), header] - response = client.post( - url_for( - 'user.set_permissions', - user_id=str(sample_user.id), - service_id=str(sample_service.id)), - headers=headers, - data=data) +def test_set_user_permissions(client, sample_user, sample_service): + data = json.dumps([{'permission': MANAGE_SETTINGS}]) + header = create_authorization_header() + headers = [('Content-Type', 'application/json'), header] + response = client.post( + url_for( + 'user.set_permissions', + user_id=str(sample_user.id), + service_id=str(sample_service.id)), + headers=headers, + data=data) - assert response.status_code == 204 - permission = Permission.query.filter_by(permission=MANAGE_SETTINGS).first() - assert permission.user == sample_user - assert permission.service == sample_service - assert permission.permission == MANAGE_SETTINGS + assert response.status_code == 204 + permission = Permission.query.filter_by(permission=MANAGE_SETTINGS).first() + assert permission.user == sample_user + assert permission.service == sample_service + assert permission.permission == MANAGE_SETTINGS -def test_set_user_permissions_multiple(notify_api, - notify_db, - notify_db_session, - sample_user, - sample_service): - with notify_api.test_request_context(): - with notify_api.test_client() as client: - data = json.dumps([{'permission': MANAGE_SETTINGS}, {'permission': MANAGE_TEMPLATES}]) - header = create_authorization_header() - headers = [('Content-Type', 'application/json'), header] - response = client.post( - url_for( - 'user.set_permissions', - user_id=str(sample_user.id), - service_id=str(sample_service.id)), - headers=headers, - data=data) +def test_set_user_permissions_multiple(client, sample_user, sample_service): + data = json.dumps([{'permission': MANAGE_SETTINGS}, {'permission': MANAGE_TEMPLATES}]) + header = create_authorization_header() + headers = [('Content-Type', 'application/json'), header] + response = client.post( + url_for( + 'user.set_permissions', + user_id=str(sample_user.id), + service_id=str(sample_service.id)), + headers=headers, + data=data) - assert response.status_code == 204 - permission = Permission.query.filter_by(permission=MANAGE_SETTINGS).first() - assert permission.user == sample_user - assert permission.service == sample_service - assert permission.permission == MANAGE_SETTINGS - permission = Permission.query.filter_by(permission=MANAGE_TEMPLATES).first() - assert permission.user == sample_user - assert permission.service == sample_service - assert permission.permission == MANAGE_TEMPLATES + assert response.status_code == 204 + permission = Permission.query.filter_by(permission=MANAGE_SETTINGS).first() + assert permission.user == sample_user + assert permission.service == sample_service + assert permission.permission == MANAGE_SETTINGS + permission = Permission.query.filter_by(permission=MANAGE_TEMPLATES).first() + assert permission.user == sample_user + assert permission.service == sample_service + assert permission.permission == MANAGE_TEMPLATES -def test_set_user_permissions_remove_old(notify_api, - notify_db, - notify_db_session, - sample_user, - sample_service): - with notify_api.test_request_context(): - with notify_api.test_client() as client: - data = json.dumps([{'permission': MANAGE_SETTINGS}]) - header = create_authorization_header() - headers = [('Content-Type', 'application/json'), header] - response = client.post( - url_for( - 'user.set_permissions', - user_id=str(sample_user.id), - service_id=str(sample_service.id)), - headers=headers, - data=data) +def test_set_user_permissions_remove_old(client, sample_user, sample_service): + data = json.dumps([{'permission': MANAGE_SETTINGS}]) + header = create_authorization_header() + headers = [('Content-Type', 'application/json'), header] + response = client.post( + url_for( + 'user.set_permissions', + user_id=str(sample_user.id), + service_id=str(sample_service.id)), + headers=headers, + data=data) - assert response.status_code == 204 - query = Permission.query.filter_by(user=sample_user) - assert query.count() == 1 - assert query.first().permission == MANAGE_SETTINGS + assert response.status_code == 204 + query = Permission.query.filter_by(user=sample_user) + assert query.count() == 1 + assert query.first().permission == MANAGE_SETTINGS @freeze_time("2016-01-01 11:09:00.061258") @@ -449,8 +392,7 @@ def test_send_user_reset_password_should_return_400_when_email_is_missing(client assert mocked.call_count == 0 -def test_send_user_reset_password_should_return_400_when_user_doesnot_exist(client, - mocker): +def test_send_user_reset_password_should_return_400_when_user_doesnot_exist(client, mocker): mocked = mocker.patch('app.celery.provider_tasks.deliver_email.apply_async') bad_email_address = 'bad@email.gov.uk' data = json.dumps({'email': bad_email_address}) @@ -498,15 +440,15 @@ def test_send_already_registered_email(client, sample_user, already_registered_t def test_send_already_registered_email_returns_400_when_data_is_missing(client, sample_user): - data = json.dumps({}) - auth_header = create_authorization_header() + data = json.dumps({}) + auth_header = create_authorization_header() - resp = client.post( - url_for('user.send_already_registered_email', user_id=str(sample_user.id)), - data=data, - headers=[('Content-Type', 'application/json'), auth_header]) - assert resp.status_code == 400 - assert json.loads(resp.get_data(as_text=True))['message'] == {'email': ['Missing data for required field.']} + resp = client.post( + url_for('user.send_already_registered_email', user_id=str(sample_user.id)), + data=data, + headers=[('Content-Type', 'application/json'), auth_header]) + assert resp.status_code == 400 + assert json.loads(resp.get_data(as_text=True))['message'] == {'email': ['Missing data for required field.']} def test_send_user_confirm_new_email_returns_204(client, sample_user, change_email_confirmation_template, mocker): diff --git a/tests/app/user/test_rest_verify.py b/tests/app/user/test_rest_verify.py index 63d4b61d3..043321d03 100644 --- a/tests/app/user/test_rest_verify.py +++ b/tests/app/user/test_rest_verify.py @@ -291,7 +291,7 @@ def test_send_email_verification_returns_404_for_bad_input_data(client, notify_d def test_user_verify_user_code_valid_code_resets_failed_login_count(client, sample_sms_code): - sample_sms_code.failed_login_count = 1 + sample_sms_code.user.failed_login_count = 1 data = json.dumps({ 'code_type': sample_sms_code.code_type, 'code': sample_sms_code.txt_code})