mirror of
https://github.com/GSA/notifications-api.git
synced 2026-05-06 00:59:41 -04:00
Merge pull request #834 from alphagov/reset-2fa
ensure we reset failed_login_count when appropriate
This commit is contained in:
@@ -113,6 +113,7 @@ def reset_failed_login_count(user):
|
||||
|
||||
|
||||
def update_user_password(user, password):
|
||||
# reset failed login count - they've just reset their password so should be fine
|
||||
user.password = password
|
||||
user.password_changed_at = datetime.utcnow()
|
||||
db.session.add(user)
|
||||
|
||||
@@ -59,12 +59,14 @@ def update_user(user_id):
|
||||
user_to_update = get_user_by_id(user_id=user_id)
|
||||
req_json = request.get_json()
|
||||
update_dct, errors = user_schema_load_json.load(req_json)
|
||||
# TODO don't let password be updated in this PUT method (currently used by the forgot password flow)
|
||||
pwd = req_json.get('password', None)
|
||||
# TODO password validation, it is already done on the admin app
|
||||
# but would be good to have the same validation here.
|
||||
if pwd is not None and not pwd:
|
||||
errors.update({'password': ['Invalid data for field']})
|
||||
raise InvalidRequest(errors, status_code=400)
|
||||
if pwd is not None:
|
||||
if not pwd:
|
||||
errors.update({'password': ['Invalid data for field']})
|
||||
raise InvalidRequest(errors, status_code=400)
|
||||
else:
|
||||
reset_failed_login_count(user_to_update)
|
||||
save_model_user(user_to_update, update_dict=update_dct, pwd=pwd)
|
||||
return jsonify(data=user_schema.dump(user_to_update).data), 200
|
||||
|
||||
@@ -130,6 +132,7 @@ def verify_user_code(user_id):
|
||||
increment_failed_login_count(user_to_verify)
|
||||
raise InvalidRequest("Code has expired", status_code=400)
|
||||
use_user_code(code.id)
|
||||
reset_failed_login_count(user_to_verify)
|
||||
return jsonify({}), 204
|
||||
|
||||
|
||||
@@ -323,6 +326,7 @@ def update_password(user_id):
|
||||
if errors:
|
||||
raise InvalidRequest(errors, status_code=400)
|
||||
|
||||
reset_failed_login_count(user)
|
||||
update_user_password(user, pwd)
|
||||
return jsonify(data=user_schema.dump(user).data), 200
|
||||
|
||||
|
||||
@@ -10,174 +10,165 @@ from app.dao.permissions_dao import default_service_permissions
|
||||
from tests import create_authorization_header
|
||||
|
||||
|
||||
def test_get_user_list(notify_api, notify_db, notify_db_session, sample_service):
|
||||
def test_get_user_list(client, sample_service):
|
||||
"""
|
||||
Tests GET endpoint '/' to retrieve entire user list.
|
||||
"""
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
header = create_authorization_header()
|
||||
response = client.get(url_for('user.get_user'),
|
||||
headers=[header])
|
||||
assert response.status_code == 200
|
||||
json_resp = json.loads(response.get_data(as_text=True))
|
||||
assert len(json_resp['data']) == 1
|
||||
sample_user = sample_service.users[0]
|
||||
expected_permissions = default_service_permissions
|
||||
fetched = json_resp['data'][0]
|
||||
header = create_authorization_header()
|
||||
response = client.get(url_for('user.get_user'),
|
||||
headers=[header])
|
||||
assert response.status_code == 200
|
||||
json_resp = json.loads(response.get_data(as_text=True))
|
||||
assert len(json_resp['data']) == 1
|
||||
sample_user = sample_service.users[0]
|
||||
expected_permissions = default_service_permissions
|
||||
fetched = json_resp['data'][0]
|
||||
|
||||
assert str(sample_user.id) == fetched['id']
|
||||
assert sample_user.name == fetched['name']
|
||||
assert sample_user.mobile_number == fetched['mobile_number']
|
||||
assert sample_user.email_address == fetched['email_address']
|
||||
assert sample_user.state == fetched['state']
|
||||
assert sorted(expected_permissions) == sorted(fetched['permissions'][str(sample_service.id)])
|
||||
assert str(sample_user.id) == fetched['id']
|
||||
assert sample_user.name == fetched['name']
|
||||
assert sample_user.mobile_number == fetched['mobile_number']
|
||||
assert sample_user.email_address == fetched['email_address']
|
||||
assert sample_user.state == fetched['state']
|
||||
assert sorted(expected_permissions) == sorted(fetched['permissions'][str(sample_service.id)])
|
||||
|
||||
|
||||
def test_get_user(notify_api, notify_db, notify_db_session, sample_service):
|
||||
def test_get_user(client, sample_service):
|
||||
"""
|
||||
Tests GET endpoint '/<user_id>' to retrieve a single service.
|
||||
"""
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
sample_user = sample_service.users[0]
|
||||
header = create_authorization_header()
|
||||
resp = client.get(url_for('user.get_user',
|
||||
user_id=sample_user.id),
|
||||
headers=[header])
|
||||
assert resp.status_code == 200
|
||||
json_resp = json.loads(resp.get_data(as_text=True))
|
||||
sample_user = sample_service.users[0]
|
||||
header = create_authorization_header()
|
||||
resp = client.get(url_for('user.get_user',
|
||||
user_id=sample_user.id),
|
||||
headers=[header])
|
||||
assert resp.status_code == 200
|
||||
json_resp = json.loads(resp.get_data(as_text=True))
|
||||
|
||||
expected_permissions = default_service_permissions
|
||||
fetched = json_resp['data']
|
||||
expected_permissions = default_service_permissions
|
||||
fetched = json_resp['data']
|
||||
|
||||
assert str(sample_user.id) == fetched['id']
|
||||
assert sample_user.name == fetched['name']
|
||||
assert sample_user.mobile_number == fetched['mobile_number']
|
||||
assert sample_user.email_address == fetched['email_address']
|
||||
assert sample_user.state == fetched['state']
|
||||
assert sorted(expected_permissions) == sorted(fetched['permissions'][str(sample_service.id)])
|
||||
assert str(sample_user.id) == fetched['id']
|
||||
assert sample_user.name == fetched['name']
|
||||
assert sample_user.mobile_number == fetched['mobile_number']
|
||||
assert sample_user.email_address == fetched['email_address']
|
||||
assert sample_user.state == fetched['state']
|
||||
assert sorted(expected_permissions) == sorted(fetched['permissions'][str(sample_service.id)])
|
||||
|
||||
|
||||
def test_post_user(notify_api, notify_db, notify_db_session):
|
||||
def test_post_user(client, notify_db, notify_db_session):
|
||||
"""
|
||||
Tests POST endpoint '/' to create a user.
|
||||
"""
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
assert User.query.count() == 0
|
||||
data = {
|
||||
"name": "Test User",
|
||||
"email_address": "user@digital.cabinet-office.gov.uk",
|
||||
"password": "password",
|
||||
"mobile_number": "+447700900986",
|
||||
"logged_in_at": None,
|
||||
"state": "active",
|
||||
"failed_login_count": 0,
|
||||
"permissions": {}
|
||||
}
|
||||
auth_header = create_authorization_header()
|
||||
headers = [('Content-Type', 'application/json'), auth_header]
|
||||
resp = client.post(
|
||||
url_for('user.create_user'),
|
||||
data=json.dumps(data),
|
||||
headers=headers)
|
||||
assert resp.status_code == 201
|
||||
user = User.query.filter_by(email_address='user@digital.cabinet-office.gov.uk').first()
|
||||
json_resp = json.loads(resp.get_data(as_text=True))
|
||||
assert json_resp['data']['email_address'] == user.email_address
|
||||
assert json_resp['data']['id'] == str(user.id)
|
||||
assert User.query.count() == 0
|
||||
data = {
|
||||
"name": "Test User",
|
||||
"email_address": "user@digital.cabinet-office.gov.uk",
|
||||
"password": "password",
|
||||
"mobile_number": "+447700900986",
|
||||
"logged_in_at": None,
|
||||
"state": "active",
|
||||
"failed_login_count": 0,
|
||||
"permissions": {}
|
||||
}
|
||||
auth_header = create_authorization_header()
|
||||
headers = [('Content-Type', 'application/json'), auth_header]
|
||||
resp = client.post(
|
||||
url_for('user.create_user'),
|
||||
data=json.dumps(data),
|
||||
headers=headers)
|
||||
assert resp.status_code == 201
|
||||
user = User.query.filter_by(email_address='user@digital.cabinet-office.gov.uk').first()
|
||||
json_resp = json.loads(resp.get_data(as_text=True))
|
||||
assert json_resp['data']['email_address'] == user.email_address
|
||||
assert json_resp['data']['id'] == str(user.id)
|
||||
|
||||
|
||||
def test_post_user_missing_attribute_email(notify_api, notify_db, notify_db_session):
|
||||
def test_post_user_missing_attribute_email(client, notify_db, notify_db_session):
|
||||
"""
|
||||
Tests POST endpoint '/' missing attribute email.
|
||||
"""
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
assert User.query.count() == 0
|
||||
data = {
|
||||
"name": "Test User",
|
||||
"password": "password",
|
||||
"mobile_number": "+447700900986",
|
||||
"logged_in_at": None,
|
||||
"state": "active",
|
||||
"failed_login_count": 0,
|
||||
"permissions": {}
|
||||
}
|
||||
auth_header = create_authorization_header()
|
||||
headers = [('Content-Type', 'application/json'), auth_header]
|
||||
resp = client.post(
|
||||
url_for('user.create_user'),
|
||||
data=json.dumps(data),
|
||||
headers=headers)
|
||||
assert resp.status_code == 400
|
||||
assert User.query.count() == 0
|
||||
json_resp = json.loads(resp.get_data(as_text=True))
|
||||
assert {'email_address': ['Missing data for required field.']} == json_resp['message']
|
||||
assert User.query.count() == 0
|
||||
data = {
|
||||
"name": "Test User",
|
||||
"password": "password",
|
||||
"mobile_number": "+447700900986",
|
||||
"logged_in_at": None,
|
||||
"state": "active",
|
||||
"failed_login_count": 0,
|
||||
"permissions": {}
|
||||
}
|
||||
auth_header = create_authorization_header()
|
||||
headers = [('Content-Type', 'application/json'), auth_header]
|
||||
resp = client.post(
|
||||
url_for('user.create_user'),
|
||||
data=json.dumps(data),
|
||||
headers=headers)
|
||||
assert resp.status_code == 400
|
||||
assert User.query.count() == 0
|
||||
json_resp = json.loads(resp.get_data(as_text=True))
|
||||
assert {'email_address': ['Missing data for required field.']} == json_resp['message']
|
||||
|
||||
|
||||
def test_create_user_missing_attribute_password(notify_api, notify_db, notify_db_session):
|
||||
def test_create_user_missing_attribute_password(client, notify_db, notify_db_session):
|
||||
"""
|
||||
Tests POST endpoint '/' missing attribute password.
|
||||
"""
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
assert User.query.count() == 0
|
||||
data = {
|
||||
"name": "Test User",
|
||||
"email_address": "user@digital.cabinet-office.gov.uk",
|
||||
"mobile_number": "+447700900986",
|
||||
"logged_in_at": None,
|
||||
"state": "active",
|
||||
"failed_login_count": 0,
|
||||
"permissions": {}
|
||||
}
|
||||
auth_header = create_authorization_header()
|
||||
headers = [('Content-Type', 'application/json'), auth_header]
|
||||
resp = client.post(
|
||||
url_for('user.create_user'),
|
||||
data=json.dumps(data),
|
||||
headers=headers)
|
||||
assert resp.status_code == 400
|
||||
assert User.query.count() == 0
|
||||
json_resp = json.loads(resp.get_data(as_text=True))
|
||||
assert {'password': ['Missing data for required field.']} == json_resp['message']
|
||||
assert User.query.count() == 0
|
||||
data = {
|
||||
"name": "Test User",
|
||||
"email_address": "user@digital.cabinet-office.gov.uk",
|
||||
"mobile_number": "+447700900986",
|
||||
"logged_in_at": None,
|
||||
"state": "active",
|
||||
"failed_login_count": 0,
|
||||
"permissions": {}
|
||||
}
|
||||
auth_header = create_authorization_header()
|
||||
headers = [('Content-Type', 'application/json'), auth_header]
|
||||
resp = client.post(
|
||||
url_for('user.create_user'),
|
||||
data=json.dumps(data),
|
||||
headers=headers)
|
||||
assert resp.status_code == 400
|
||||
assert User.query.count() == 0
|
||||
json_resp = json.loads(resp.get_data(as_text=True))
|
||||
assert {'password': ['Missing data for required field.']} == json_resp['message']
|
||||
|
||||
|
||||
def test_put_user(notify_api, notify_db, notify_db_session, sample_service):
|
||||
def test_put_user(client, sample_service):
|
||||
"""
|
||||
Tests PUT endpoint '/' to update a user.
|
||||
"""
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
assert User.query.count() == 1
|
||||
sample_user = sample_service.users[0]
|
||||
new_email = 'new@digital.cabinet-office.gov.uk'
|
||||
data = {
|
||||
'name': sample_user.name,
|
||||
'email_address': new_email,
|
||||
'mobile_number': sample_user.mobile_number
|
||||
}
|
||||
auth_header = create_authorization_header()
|
||||
headers = [('Content-Type', 'application/json'), auth_header]
|
||||
resp = client.put(
|
||||
url_for('user.update_user', user_id=sample_user.id),
|
||||
data=json.dumps(data),
|
||||
headers=headers)
|
||||
assert resp.status_code == 200
|
||||
assert User.query.count() == 1
|
||||
json_resp = json.loads(resp.get_data(as_text=True))
|
||||
assert json_resp['data']['email_address'] == new_email
|
||||
expected_permissions = default_service_permissions
|
||||
fetched = json_resp['data']
|
||||
assert User.query.count() == 1
|
||||
sample_user = sample_service.users[0]
|
||||
sample_user.failed_login_count = 1
|
||||
new_email = 'new@digital.cabinet-office.gov.uk'
|
||||
data = {
|
||||
'name': sample_user.name,
|
||||
'email_address': new_email,
|
||||
'mobile_number': sample_user.mobile_number
|
||||
}
|
||||
auth_header = create_authorization_header()
|
||||
headers = [('Content-Type', 'application/json'), auth_header]
|
||||
resp = client.put(
|
||||
url_for('user.update_user', user_id=sample_user.id),
|
||||
data=json.dumps(data),
|
||||
headers=headers)
|
||||
assert resp.status_code == 200
|
||||
assert User.query.count() == 1
|
||||
json_resp = json.loads(resp.get_data(as_text=True))
|
||||
assert json_resp['data']['email_address'] == new_email
|
||||
expected_permissions = default_service_permissions
|
||||
fetched = json_resp['data']
|
||||
|
||||
assert str(sample_user.id) == fetched['id']
|
||||
assert sample_user.name == fetched['name']
|
||||
assert sample_user.mobile_number == fetched['mobile_number']
|
||||
assert new_email == fetched['email_address']
|
||||
assert sample_user.state == fetched['state']
|
||||
assert sorted(expected_permissions) == sorted(fetched['permissions'][str(sample_service.id)])
|
||||
assert str(sample_user.id) == fetched['id']
|
||||
assert sample_user.name == fetched['name']
|
||||
assert sample_user.mobile_number == fetched['mobile_number']
|
||||
assert new_email == fetched['email_address']
|
||||
assert sample_user.state == fetched['state']
|
||||
assert sorted(expected_permissions) == sorted(fetched['permissions'][str(sample_service.id)])
|
||||
# password wasn't updated, so failed_login_count stays the same
|
||||
assert sample_user.failed_login_count == 1
|
||||
|
||||
|
||||
@pytest.mark.parametrize('user_attribute, user_value', [
|
||||
@@ -203,214 +194,169 @@ def test_post_user_attribute(client, sample_user, user_attribute, user_value):
|
||||
assert json_resp['data'][user_attribute] == user_value
|
||||
|
||||
|
||||
def test_put_user_update_password(notify_api,
|
||||
notify_db,
|
||||
notify_db_session,
|
||||
sample_service):
|
||||
def test_put_user_update_password(client, sample_service):
|
||||
"""
|
||||
Tests PUT endpoint '/' to update a user including their password.
|
||||
"""
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
assert User.query.count() == 1
|
||||
sample_user = sample_service.users[0]
|
||||
new_password = '1234567890'
|
||||
data = {
|
||||
'name': sample_user.name,
|
||||
'email_address': sample_user.email_address,
|
||||
'mobile_number': sample_user.mobile_number,
|
||||
'password': new_password
|
||||
}
|
||||
auth_header = create_authorization_header()
|
||||
headers = [('Content-Type', 'application/json'), auth_header]
|
||||
resp = client.put(
|
||||
url_for('user.update_user', user_id=sample_user.id),
|
||||
data=json.dumps(data),
|
||||
headers=headers)
|
||||
assert resp.status_code == 200
|
||||
assert User.query.count() == 1
|
||||
json_resp = json.loads(resp.get_data(as_text=True))
|
||||
assert json_resp['data']['password_changed_at'] is not None
|
||||
data = {'password': new_password}
|
||||
auth_header = create_authorization_header()
|
||||
headers = [('Content-Type', 'application/json'), auth_header]
|
||||
resp = client.post(
|
||||
url_for('user.verify_user_password', user_id=str(sample_user.id)),
|
||||
data=json.dumps(data),
|
||||
headers=headers)
|
||||
assert resp.status_code == 204
|
||||
assert User.query.count() == 1
|
||||
sample_user = sample_service.users[0]
|
||||
new_password = '1234567890'
|
||||
data = {
|
||||
'name': sample_user.name,
|
||||
'email_address': sample_user.email_address,
|
||||
'mobile_number': sample_user.mobile_number,
|
||||
'password': new_password
|
||||
}
|
||||
auth_header = create_authorization_header()
|
||||
headers = [('Content-Type', 'application/json'), auth_header]
|
||||
resp = client.put(
|
||||
url_for('user.update_user', user_id=sample_user.id),
|
||||
data=json.dumps(data),
|
||||
headers=headers)
|
||||
assert resp.status_code == 200
|
||||
assert User.query.count() == 1
|
||||
json_resp = json.loads(resp.get_data(as_text=True))
|
||||
assert json_resp['data']['password_changed_at'] is not None
|
||||
data = {'password': new_password}
|
||||
auth_header = create_authorization_header()
|
||||
headers = [('Content-Type', 'application/json'), auth_header]
|
||||
resp = client.post(
|
||||
url_for('user.verify_user_password', user_id=str(sample_user.id)),
|
||||
data=json.dumps(data),
|
||||
headers=headers)
|
||||
assert resp.status_code == 204
|
||||
|
||||
|
||||
def test_put_user_not_exists(notify_api, notify_db, notify_db_session, sample_user, fake_uuid):
|
||||
def test_put_user_not_exists(client, sample_user, fake_uuid):
|
||||
"""
|
||||
Tests PUT endpoint '/' to update a user doesn't exist.
|
||||
"""
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
assert User.query.count() == 1
|
||||
new_email = 'new@digital.cabinet-office.gov.uk'
|
||||
data = {'email_address': new_email}
|
||||
auth_header = create_authorization_header()
|
||||
headers = [('Content-Type', 'application/json'), auth_header]
|
||||
resp = client.put(
|
||||
url_for('user.update_user', user_id=fake_uuid),
|
||||
data=json.dumps(data),
|
||||
headers=headers)
|
||||
assert resp.status_code == 404
|
||||
assert User.query.count() == 1
|
||||
user = User.query.filter_by(id=str(sample_user.id)).first()
|
||||
json_resp = json.loads(resp.get_data(as_text=True))
|
||||
assert json_resp['result'] == "error"
|
||||
assert json_resp['message'] == 'No result found'
|
||||
assert User.query.count() == 1
|
||||
new_email = 'new@digital.cabinet-office.gov.uk'
|
||||
data = {'email_address': new_email}
|
||||
auth_header = create_authorization_header()
|
||||
headers = [('Content-Type', 'application/json'), auth_header]
|
||||
resp = client.put(
|
||||
url_for('user.update_user', user_id=fake_uuid),
|
||||
data=json.dumps(data),
|
||||
headers=headers)
|
||||
assert resp.status_code == 404
|
||||
assert User.query.count() == 1
|
||||
user = User.query.filter_by(id=str(sample_user.id)).first()
|
||||
json_resp = json.loads(resp.get_data(as_text=True))
|
||||
assert json_resp['result'] == "error"
|
||||
assert json_resp['message'] == 'No result found'
|
||||
|
||||
assert user == sample_user
|
||||
assert user.email_address != new_email
|
||||
assert user == sample_user
|
||||
assert user.email_address != new_email
|
||||
|
||||
|
||||
def test_get_user_by_email(notify_api, notify_db, notify_db_session, sample_service):
|
||||
def test_get_user_by_email(client, sample_service):
|
||||
sample_user = sample_service.users[0]
|
||||
header = create_authorization_header()
|
||||
url = url_for('user.get_by_email', email=sample_user.email_address)
|
||||
resp = client.get(url, headers=[header])
|
||||
assert resp.status_code == 200
|
||||
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
sample_user = sample_service.users[0]
|
||||
header = create_authorization_header()
|
||||
url = url_for('user.get_by_email', email=sample_user.email_address)
|
||||
resp = client.get(url, headers=[header])
|
||||
assert resp.status_code == 200
|
||||
json_resp = json.loads(resp.get_data(as_text=True))
|
||||
expected_permissions = default_service_permissions
|
||||
fetched = json_resp['data']
|
||||
|
||||
json_resp = json.loads(resp.get_data(as_text=True))
|
||||
expected_permissions = default_service_permissions
|
||||
fetched = json_resp['data']
|
||||
|
||||
assert str(sample_user.id) == fetched['id']
|
||||
assert sample_user.name == fetched['name']
|
||||
assert sample_user.mobile_number == fetched['mobile_number']
|
||||
assert sample_user.email_address == fetched['email_address']
|
||||
assert sample_user.state == fetched['state']
|
||||
assert sorted(expected_permissions) == sorted(fetched['permissions'][str(sample_service.id)])
|
||||
assert str(sample_user.id) == fetched['id']
|
||||
assert sample_user.name == fetched['name']
|
||||
assert sample_user.mobile_number == fetched['mobile_number']
|
||||
assert sample_user.email_address == fetched['email_address']
|
||||
assert sample_user.state == fetched['state']
|
||||
assert sorted(expected_permissions) == sorted(fetched['permissions'][str(sample_service.id)])
|
||||
|
||||
|
||||
def test_get_user_by_email_not_found_returns_404(notify_api,
|
||||
notify_db,
|
||||
notify_db_session,
|
||||
sample_user):
|
||||
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
header = create_authorization_header()
|
||||
url = url_for('user.get_by_email', email='no_user@digital.gov.uk')
|
||||
resp = client.get(url, headers=[header])
|
||||
assert resp.status_code == 404
|
||||
json_resp = json.loads(resp.get_data(as_text=True))
|
||||
assert json_resp['result'] == 'error'
|
||||
assert json_resp['message'] == 'No result found'
|
||||
def test_get_user_by_email_not_found_returns_404(client, sample_user):
|
||||
header = create_authorization_header()
|
||||
url = url_for('user.get_by_email', email='no_user@digital.gov.uk')
|
||||
resp = client.get(url, headers=[header])
|
||||
assert resp.status_code == 404
|
||||
json_resp = json.loads(resp.get_data(as_text=True))
|
||||
assert json_resp['result'] == 'error'
|
||||
assert json_resp['message'] == 'No result found'
|
||||
|
||||
|
||||
def test_get_user_by_email_bad_url_returns_404(notify_api,
|
||||
notify_db,
|
||||
notify_db_session,
|
||||
sample_user):
|
||||
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
header = create_authorization_header()
|
||||
url = '/user/email'
|
||||
resp = client.get(url, headers=[header])
|
||||
assert resp.status_code == 400
|
||||
json_resp = json.loads(resp.get_data(as_text=True))
|
||||
assert json_resp['result'] == 'error'
|
||||
assert json_resp['message'] == 'Invalid request. Email query string param required'
|
||||
def test_get_user_by_email_bad_url_returns_404(client, sample_user):
|
||||
header = create_authorization_header()
|
||||
url = '/user/email'
|
||||
resp = client.get(url, headers=[header])
|
||||
assert resp.status_code == 400
|
||||
json_resp = json.loads(resp.get_data(as_text=True))
|
||||
assert json_resp['result'] == 'error'
|
||||
assert json_resp['message'] == 'Invalid request. Email query string param required'
|
||||
|
||||
|
||||
def test_get_user_with_permissions(notify_api,
|
||||
notify_db,
|
||||
notify_db_session,
|
||||
sample_service_permission):
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
header = create_authorization_header()
|
||||
response = client.get(url_for('user.get_user', user_id=str(sample_service_permission.user.id)),
|
||||
headers=[header])
|
||||
assert response.status_code == 200
|
||||
permissions = json.loads(response.get_data(as_text=True))['data']['permissions']
|
||||
assert sample_service_permission.permission in permissions[str(sample_service_permission.service.id)]
|
||||
def test_get_user_with_permissions(client, sample_service_permission):
|
||||
header = create_authorization_header()
|
||||
response = client.get(url_for('user.get_user', user_id=str(sample_service_permission.user.id)),
|
||||
headers=[header])
|
||||
assert response.status_code == 200
|
||||
permissions = json.loads(response.get_data(as_text=True))['data']['permissions']
|
||||
assert sample_service_permission.permission in permissions[str(sample_service_permission.service.id)]
|
||||
|
||||
|
||||
def test_set_user_permissions(notify_api,
|
||||
notify_db,
|
||||
notify_db_session,
|
||||
sample_user,
|
||||
sample_service):
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
data = json.dumps([{'permission': MANAGE_SETTINGS}])
|
||||
header = create_authorization_header()
|
||||
headers = [('Content-Type', 'application/json'), header]
|
||||
response = client.post(
|
||||
url_for(
|
||||
'user.set_permissions',
|
||||
user_id=str(sample_user.id),
|
||||
service_id=str(sample_service.id)),
|
||||
headers=headers,
|
||||
data=data)
|
||||
def test_set_user_permissions(client, sample_user, sample_service):
|
||||
data = json.dumps([{'permission': MANAGE_SETTINGS}])
|
||||
header = create_authorization_header()
|
||||
headers = [('Content-Type', 'application/json'), header]
|
||||
response = client.post(
|
||||
url_for(
|
||||
'user.set_permissions',
|
||||
user_id=str(sample_user.id),
|
||||
service_id=str(sample_service.id)),
|
||||
headers=headers,
|
||||
data=data)
|
||||
|
||||
assert response.status_code == 204
|
||||
permission = Permission.query.filter_by(permission=MANAGE_SETTINGS).first()
|
||||
assert permission.user == sample_user
|
||||
assert permission.service == sample_service
|
||||
assert permission.permission == MANAGE_SETTINGS
|
||||
assert response.status_code == 204
|
||||
permission = Permission.query.filter_by(permission=MANAGE_SETTINGS).first()
|
||||
assert permission.user == sample_user
|
||||
assert permission.service == sample_service
|
||||
assert permission.permission == MANAGE_SETTINGS
|
||||
|
||||
|
||||
def test_set_user_permissions_multiple(notify_api,
|
||||
notify_db,
|
||||
notify_db_session,
|
||||
sample_user,
|
||||
sample_service):
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
data = json.dumps([{'permission': MANAGE_SETTINGS}, {'permission': MANAGE_TEMPLATES}])
|
||||
header = create_authorization_header()
|
||||
headers = [('Content-Type', 'application/json'), header]
|
||||
response = client.post(
|
||||
url_for(
|
||||
'user.set_permissions',
|
||||
user_id=str(sample_user.id),
|
||||
service_id=str(sample_service.id)),
|
||||
headers=headers,
|
||||
data=data)
|
||||
def test_set_user_permissions_multiple(client, sample_user, sample_service):
|
||||
data = json.dumps([{'permission': MANAGE_SETTINGS}, {'permission': MANAGE_TEMPLATES}])
|
||||
header = create_authorization_header()
|
||||
headers = [('Content-Type', 'application/json'), header]
|
||||
response = client.post(
|
||||
url_for(
|
||||
'user.set_permissions',
|
||||
user_id=str(sample_user.id),
|
||||
service_id=str(sample_service.id)),
|
||||
headers=headers,
|
||||
data=data)
|
||||
|
||||
assert response.status_code == 204
|
||||
permission = Permission.query.filter_by(permission=MANAGE_SETTINGS).first()
|
||||
assert permission.user == sample_user
|
||||
assert permission.service == sample_service
|
||||
assert permission.permission == MANAGE_SETTINGS
|
||||
permission = Permission.query.filter_by(permission=MANAGE_TEMPLATES).first()
|
||||
assert permission.user == sample_user
|
||||
assert permission.service == sample_service
|
||||
assert permission.permission == MANAGE_TEMPLATES
|
||||
assert response.status_code == 204
|
||||
permission = Permission.query.filter_by(permission=MANAGE_SETTINGS).first()
|
||||
assert permission.user == sample_user
|
||||
assert permission.service == sample_service
|
||||
assert permission.permission == MANAGE_SETTINGS
|
||||
permission = Permission.query.filter_by(permission=MANAGE_TEMPLATES).first()
|
||||
assert permission.user == sample_user
|
||||
assert permission.service == sample_service
|
||||
assert permission.permission == MANAGE_TEMPLATES
|
||||
|
||||
|
||||
def test_set_user_permissions_remove_old(notify_api,
|
||||
notify_db,
|
||||
notify_db_session,
|
||||
sample_user,
|
||||
sample_service):
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
data = json.dumps([{'permission': MANAGE_SETTINGS}])
|
||||
header = create_authorization_header()
|
||||
headers = [('Content-Type', 'application/json'), header]
|
||||
response = client.post(
|
||||
url_for(
|
||||
'user.set_permissions',
|
||||
user_id=str(sample_user.id),
|
||||
service_id=str(sample_service.id)),
|
||||
headers=headers,
|
||||
data=data)
|
||||
def test_set_user_permissions_remove_old(client, sample_user, sample_service):
|
||||
data = json.dumps([{'permission': MANAGE_SETTINGS}])
|
||||
header = create_authorization_header()
|
||||
headers = [('Content-Type', 'application/json'), header]
|
||||
response = client.post(
|
||||
url_for(
|
||||
'user.set_permissions',
|
||||
user_id=str(sample_user.id),
|
||||
service_id=str(sample_service.id)),
|
||||
headers=headers,
|
||||
data=data)
|
||||
|
||||
assert response.status_code == 204
|
||||
query = Permission.query.filter_by(user=sample_user)
|
||||
assert query.count() == 1
|
||||
assert query.first().permission == MANAGE_SETTINGS
|
||||
assert response.status_code == 204
|
||||
query = Permission.query.filter_by(user=sample_user)
|
||||
assert query.count() == 1
|
||||
assert query.first().permission == MANAGE_SETTINGS
|
||||
|
||||
|
||||
@freeze_time("2016-01-01 11:09:00.061258")
|
||||
@@ -446,8 +392,7 @@ def test_send_user_reset_password_should_return_400_when_email_is_missing(client
|
||||
assert mocked.call_count == 0
|
||||
|
||||
|
||||
def test_send_user_reset_password_should_return_400_when_user_doesnot_exist(client,
|
||||
mocker):
|
||||
def test_send_user_reset_password_should_return_400_when_user_doesnot_exist(client, mocker):
|
||||
mocked = mocker.patch('app.celery.provider_tasks.deliver_email.apply_async')
|
||||
bad_email_address = 'bad@email.gov.uk'
|
||||
data = json.dumps({'email': bad_email_address})
|
||||
@@ -495,15 +440,15 @@ def test_send_already_registered_email(client, sample_user, already_registered_t
|
||||
|
||||
|
||||
def test_send_already_registered_email_returns_400_when_data_is_missing(client, sample_user):
|
||||
data = json.dumps({})
|
||||
auth_header = create_authorization_header()
|
||||
data = json.dumps({})
|
||||
auth_header = create_authorization_header()
|
||||
|
||||
resp = client.post(
|
||||
url_for('user.send_already_registered_email', user_id=str(sample_user.id)),
|
||||
data=data,
|
||||
headers=[('Content-Type', 'application/json'), auth_header])
|
||||
assert resp.status_code == 400
|
||||
assert json.loads(resp.get_data(as_text=True))['message'] == {'email': ['Missing data for required field.']}
|
||||
resp = client.post(
|
||||
url_for('user.send_already_registered_email', user_id=str(sample_user.id)),
|
||||
data=data,
|
||||
headers=[('Content-Type', 'application/json'), auth_header])
|
||||
assert resp.status_code == 400
|
||||
assert json.loads(resp.get_data(as_text=True))['message'] == {'email': ['Missing data for required field.']}
|
||||
|
||||
|
||||
def test_send_user_confirm_new_email_returns_204(client, sample_user, change_email_confirmation_template, mocker):
|
||||
@@ -535,7 +480,6 @@ def test_send_user_confirm_new_email_returns_400_when_email_missing(client, samp
|
||||
|
||||
|
||||
def test_update_user_password_saves_correctly(client, sample_service):
|
||||
assert User.query.count() == 1
|
||||
sample_user = sample_service.users[0]
|
||||
new_password = '1234567890'
|
||||
data = {
|
||||
@@ -548,7 +492,7 @@ def test_update_user_password_saves_correctly(client, sample_service):
|
||||
data=json.dumps(data),
|
||||
headers=headers)
|
||||
assert resp.status_code == 200
|
||||
assert User.query.count() == 1
|
||||
|
||||
json_resp = json.loads(resp.get_data(as_text=True))
|
||||
assert json_resp['data']['password_changed_at'] is not None
|
||||
data = {'password': new_password}
|
||||
@@ -559,3 +503,36 @@ def test_update_user_password_saves_correctly(client, sample_service):
|
||||
data=json.dumps(data),
|
||||
headers=headers)
|
||||
assert resp.status_code == 204
|
||||
|
||||
|
||||
def test_update_user_password_resets_failed_login_count(client, sample_service):
|
||||
user = sample_service.users[0]
|
||||
user.failed_login_count = 1
|
||||
|
||||
resp = client.post(
|
||||
url_for('user.update_password', user_id=user.id),
|
||||
data=json.dumps({'_password': 'foo'}),
|
||||
headers=[('Content-Type', 'application/json'), create_authorization_header()]
|
||||
)
|
||||
|
||||
assert resp.status_code == 200
|
||||
assert user.failed_login_count == 0
|
||||
|
||||
|
||||
def test_update_user_resets_failed_login_count_if_updating_password(client, sample_service):
|
||||
user = sample_service.users[0]
|
||||
user.failed_login_count = 1
|
||||
|
||||
resp = client.put(
|
||||
url_for('user.update_user', user_id=user.id),
|
||||
data=json.dumps({
|
||||
'name': user.name,
|
||||
'email_address': user.email_address,
|
||||
'mobile_number': user.mobile_number,
|
||||
'password': 'foo'
|
||||
}),
|
||||
headers=[('Content-Type', 'application/json'), create_authorization_header()]
|
||||
)
|
||||
|
||||
assert resp.status_code == 200
|
||||
assert user.failed_login_count == 0
|
||||
|
||||
@@ -1,27 +1,24 @@
|
||||
import json
|
||||
import uuid
|
||||
|
||||
import pytest
|
||||
|
||||
from datetime import (
|
||||
datetime,
|
||||
timedelta
|
||||
)
|
||||
|
||||
import pytest
|
||||
from flask import url_for, current_app
|
||||
from freezegun import freeze_time
|
||||
|
||||
from app.dao.services_dao import dao_update_service, dao_fetch_service_by_id
|
||||
from app.models import (
|
||||
VerifyCode,
|
||||
User,
|
||||
Notification
|
||||
)
|
||||
|
||||
from app import db
|
||||
import app.celery.tasks
|
||||
|
||||
from tests import create_authorization_header
|
||||
from freezegun import freeze_time
|
||||
|
||||
import app.celery.tasks
|
||||
|
||||
|
||||
def test_user_verify_code(client,
|
||||
@@ -163,7 +160,7 @@ def test_user_verify_password_missing_password(client,
|
||||
|
||||
@pytest.mark.parametrize('research_mode', [True, False])
|
||||
@freeze_time("2016-01-01 11:09:00.061258")
|
||||
def test_send_user_sms_code(notify_api,
|
||||
def test_send_user_sms_code(client,
|
||||
sample_user,
|
||||
sms_code_template,
|
||||
mocker,
|
||||
@@ -171,68 +168,63 @@ def test_send_user_sms_code(notify_api,
|
||||
"""
|
||||
Tests POST endpoint /user/<user_id>/sms-code
|
||||
"""
|
||||
if research_mode:
|
||||
notify_service = dao_fetch_service_by_id(current_app.config['NOTIFY_SERVICE_ID'])
|
||||
notify_service.research_mode = True
|
||||
dao_update_service(notify_service)
|
||||
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
if research_mode:
|
||||
notify_service = dao_fetch_service_by_id(current_app.config['NOTIFY_SERVICE_ID'])
|
||||
notify_service.research_mode = True
|
||||
dao_update_service(notify_service)
|
||||
auth_header = create_authorization_header()
|
||||
mocked = mocker.patch('app.user.rest.create_secret_code', return_value='11111')
|
||||
mocker.patch('app.celery.provider_tasks.deliver_sms.apply_async')
|
||||
|
||||
auth_header = create_authorization_header()
|
||||
mocked = mocker.patch('app.user.rest.create_secret_code', return_value='11111')
|
||||
mocker.patch('app.celery.provider_tasks.deliver_sms.apply_async')
|
||||
resp = client.post(
|
||||
url_for('user.send_user_sms_code', user_id=sample_user.id),
|
||||
data=json.dumps({}),
|
||||
headers=[('Content-Type', 'application/json'), auth_header])
|
||||
assert resp.status_code == 204
|
||||
|
||||
resp = client.post(
|
||||
url_for('user.send_user_sms_code', user_id=sample_user.id),
|
||||
data=json.dumps({}),
|
||||
headers=[('Content-Type', 'application/json'), auth_header])
|
||||
assert resp.status_code == 204
|
||||
assert mocked.call_count == 1
|
||||
assert VerifyCode.query.count() == 1
|
||||
assert VerifyCode.query.first().check_code('11111')
|
||||
|
||||
assert mocked.call_count == 1
|
||||
assert VerifyCode.query.count() == 1
|
||||
assert VerifyCode.query.first().check_code('11111')
|
||||
assert Notification.query.count() == 1
|
||||
notification = Notification.query.first()
|
||||
assert notification.personalisation == {'verify_code': '11111'}
|
||||
assert notification.to == sample_user.mobile_number
|
||||
assert str(notification.service_id) == current_app.config['NOTIFY_SERVICE_ID']
|
||||
|
||||
assert Notification.query.count() == 1
|
||||
notification = Notification.query.first()
|
||||
assert notification.personalisation == {'verify_code': '11111'}
|
||||
assert notification.to == sample_user.mobile_number
|
||||
assert str(notification.service_id) == current_app.config['NOTIFY_SERVICE_ID']
|
||||
|
||||
app.celery.provider_tasks.deliver_sms.apply_async.assert_called_once_with(
|
||||
([str(notification.id)]),
|
||||
queue="notify"
|
||||
)
|
||||
app.celery.provider_tasks.deliver_sms.apply_async.assert_called_once_with(
|
||||
([str(notification.id)]),
|
||||
queue="notify"
|
||||
)
|
||||
|
||||
|
||||
@freeze_time("2016-01-01 11:09:00.061258")
|
||||
def test_send_user_code_for_sms_with_optional_to_field(notify_api,
|
||||
def test_send_user_code_for_sms_with_optional_to_field(client,
|
||||
sample_user,
|
||||
sms_code_template,
|
||||
mocker):
|
||||
"""
|
||||
Tests POST endpoint /user/<user_id>/sms-code with optional to field
|
||||
"""
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
to_number = '+441119876757'
|
||||
mocked = mocker.patch('app.user.rest.create_secret_code', return_value='11111')
|
||||
mocker.patch('app.celery.provider_tasks.deliver_sms.apply_async')
|
||||
auth_header = create_authorization_header()
|
||||
to_number = '+441119876757'
|
||||
mocked = mocker.patch('app.user.rest.create_secret_code', return_value='11111')
|
||||
mocker.patch('app.celery.provider_tasks.deliver_sms.apply_async')
|
||||
auth_header = create_authorization_header()
|
||||
|
||||
resp = client.post(
|
||||
url_for('user.send_user_sms_code', user_id=sample_user.id),
|
||||
data=json.dumps({'to': to_number}),
|
||||
headers=[('Content-Type', 'application/json'), auth_header])
|
||||
resp = client.post(
|
||||
url_for('user.send_user_sms_code', user_id=sample_user.id),
|
||||
data=json.dumps({'to': to_number}),
|
||||
headers=[('Content-Type', 'application/json'), auth_header])
|
||||
|
||||
assert resp.status_code == 204
|
||||
assert mocked.call_count == 1
|
||||
notification = Notification.query.first()
|
||||
assert notification.to == to_number
|
||||
app.celery.provider_tasks.deliver_sms.apply_async.assert_called_once_with(
|
||||
([str(notification.id)]),
|
||||
queue="notify"
|
||||
)
|
||||
assert resp.status_code == 204
|
||||
assert mocked.call_count == 1
|
||||
notification = Notification.query.first()
|
||||
assert notification.to == to_number
|
||||
app.celery.provider_tasks.deliver_sms.apply_async.assert_called_once_with(
|
||||
([str(notification.id)]),
|
||||
queue="notify"
|
||||
)
|
||||
|
||||
|
||||
def test_send_sms_code_returns_404_for_bad_input_data(client):
|
||||
@@ -282,12 +274,11 @@ def test_send_user_email_verification(client,
|
||||
mocked.assert_called_once_with(([str(notification.id)]), queue="notify")
|
||||
|
||||
|
||||
def test_send_email_verification_returns_404_for_bad_input_data(client, notify_db, notify_db_session, mocker):
|
||||
def test_send_email_verification_returns_404_for_bad_input_data(client, notify_db_session, mocker):
|
||||
"""
|
||||
Tests POST endpoint /user/<user_id>/sms-code return 404 for bad input data
|
||||
"""
|
||||
mocked = mocker.patch('app.celery.provider_tasks.deliver_email.apply_async')
|
||||
import uuid
|
||||
uuid_ = uuid.uuid4()
|
||||
auth_header = create_authorization_header()
|
||||
resp = client.post(
|
||||
@@ -297,3 +288,17 @@ def test_send_email_verification_returns_404_for_bad_input_data(client, notify_d
|
||||
assert resp.status_code == 404
|
||||
assert json.loads(resp.get_data(as_text=True))['message'] == 'No result found'
|
||||
assert mocked.call_count == 0
|
||||
|
||||
|
||||
def test_user_verify_user_code_valid_code_resets_failed_login_count(client, sample_sms_code):
|
||||
sample_sms_code.user.failed_login_count = 1
|
||||
data = json.dumps({
|
||||
'code_type': sample_sms_code.code_type,
|
||||
'code': sample_sms_code.txt_code})
|
||||
resp = client.post(
|
||||
url_for('user.verify_user_code', user_id=sample_sms_code.user.id),
|
||||
data=data,
|
||||
headers=[('Content-Type', 'application/json'), create_authorization_header()])
|
||||
assert resp.status_code == 204
|
||||
assert sample_sms_code.user.failed_login_count == 0
|
||||
assert sample_sms_code.code_used
|
||||
|
||||
Reference in New Issue
Block a user