mirror of
https://github.com/GSA/notifications-admin.git
synced 2026-02-06 03:13:42 -05:00
Merge pull request #357 from alphagov/remove_user_dao
Merged with master.
This commit is contained in:
@@ -162,6 +162,11 @@ def valid_phone_number(phone_number):
|
||||
return False
|
||||
|
||||
|
||||
@login_manager.user_loader
|
||||
def load_user(user_id):
|
||||
return user_api_client.get_user(user_id)
|
||||
|
||||
|
||||
# https://www.owasp.org/index.php/List_of_useful_HTTP_headers
|
||||
def useful_headers_after_request(response):
|
||||
response.headers.add('X-Frame-Options', 'deny')
|
||||
|
||||
@@ -1,58 +0,0 @@
|
||||
from notifications_python_client import HTTPError
|
||||
|
||||
from app import login_manager
|
||||
from app import user_api_client
|
||||
|
||||
#
|
||||
# TODO fix up this, do we really need this class why not just use the clients
|
||||
# directly??
|
||||
#
|
||||
|
||||
|
||||
@login_manager.user_loader
|
||||
def load_user(user_id):
|
||||
return get_user_by_id(user_id)
|
||||
|
||||
|
||||
# TODO Would be better to have a generic get and update for user
|
||||
# something that replicates the sql functionality.
|
||||
def get_user_by_id(id):
|
||||
return user_api_client.get_user(id)
|
||||
|
||||
|
||||
def get_all_users():
|
||||
return user_api_client.get_users()
|
||||
|
||||
|
||||
def get_user_by_email(email_address):
|
||||
return user_api_client.get_user_by_email(email_address)
|
||||
|
||||
|
||||
def verify_password(user_id, password):
|
||||
return user_api_client.verify_password(user_id, password)
|
||||
|
||||
|
||||
def update_user(user):
|
||||
return user_api_client.update_user(user)
|
||||
|
||||
|
||||
def increment_failed_login_count(id):
|
||||
user = get_user_by_id(id)
|
||||
user.failed_login_count += 1
|
||||
return user_api_client.update_user(user)
|
||||
|
||||
|
||||
def activate_user(user):
|
||||
return user_api_client.activate_user(user)
|
||||
|
||||
|
||||
def is_email_unique(email_address):
|
||||
return user_api_client.is_email_unique(email_address)
|
||||
|
||||
|
||||
def send_verify_code(user_id, code_type, to):
|
||||
return user_api_client.send_verify_code(user_id, code_type, to)
|
||||
|
||||
|
||||
def check_verify_code(user_id, code, code_type):
|
||||
return user_api_client.check_verify_code(user_id, code, code_type)
|
||||
@@ -4,9 +4,9 @@ from flask import (render_template, url_for, redirect, flash, session, current_a
|
||||
from itsdangerous import SignatureExpired
|
||||
|
||||
from app.main import main
|
||||
from app.main.dao import users_dao
|
||||
from app.main.forms import NewPasswordForm
|
||||
from datetime import datetime
|
||||
from app import user_api_client
|
||||
|
||||
|
||||
@main.route('/new-password/<path:token>', methods=['GET', 'POST'])
|
||||
@@ -20,10 +20,7 @@ def new_password(token):
|
||||
return redirect(url_for('.forgot_password'))
|
||||
|
||||
email_address = json.loads(token_data)['email']
|
||||
user = users_dao.get_user_by_email(email_address=email_address)
|
||||
# TODO: what should this be??
|
||||
if not user:
|
||||
abort(404, 'user not found')
|
||||
user = user_api_client.get_user_by_email(email_address)
|
||||
if user.password_changed_at and datetime.strptime(user.password_changed_at, '%Y-%m-%d %H:%M:%S.%f') > \
|
||||
datetime.strptime(json.loads(token_data)['created_at'], '%Y-%m-%d %H:%M:%S.%f'):
|
||||
flash('The link in the email has already been used')
|
||||
@@ -32,7 +29,7 @@ def new_password(token):
|
||||
form = NewPasswordForm()
|
||||
|
||||
if form.validate_on_submit():
|
||||
users_dao.send_verify_code(user.id, 'sms', user.mobile_number)
|
||||
user_api_client.send_verify_code(user.id, 'sms', user.mobile_number)
|
||||
session['user_details'] = {
|
||||
'id': user.id,
|
||||
'email': user.email_address,
|
||||
|
||||
@@ -16,8 +16,8 @@ from notifications_python_client import HTTPError
|
||||
from app import service_api_client
|
||||
from app.main import main
|
||||
from app.utils import user_has_permissions
|
||||
from app.main.dao.users_dao import verify_password
|
||||
from app.main.forms import ConfirmPasswordForm, ServiceNameForm
|
||||
from app import user_api_client
|
||||
|
||||
|
||||
@main.route("/services/<service_id>/service-settings")
|
||||
@@ -60,7 +60,7 @@ def service_name_change_confirm(service_id):
|
||||
|
||||
# Validate password for form
|
||||
def _check_password(pwd):
|
||||
return verify_password(current_user.id, pwd)
|
||||
return user_api_client.verify_password(current_user.id, pwd)
|
||||
form = ConfirmPasswordForm(_check_password)
|
||||
|
||||
if form.validate_on_submit():
|
||||
@@ -133,7 +133,7 @@ def service_status_change_confirm(service_id):
|
||||
|
||||
# Validate password for form
|
||||
def _check_password(pwd):
|
||||
return verify_password(current_user.id, pwd)
|
||||
return user_api_client.verify_password(current_user.id, pwd)
|
||||
form = ConfirmPasswordForm(_check_password)
|
||||
|
||||
if form.validate_on_submit():
|
||||
@@ -178,7 +178,7 @@ def service_delete_confirm(service_id):
|
||||
|
||||
# Validate password for form
|
||||
def _check_password(pwd):
|
||||
return verify_password(current_user.id, pwd)
|
||||
return user_api_client.verify_password(current_user.id, pwd)
|
||||
form = ConfirmPasswordForm(_check_password)
|
||||
|
||||
if form.validate_on_submit():
|
||||
|
||||
@@ -9,9 +9,9 @@ from flask import (
|
||||
|
||||
from flask_login import login_user, current_user
|
||||
from app.main import main
|
||||
from app.main.dao import users_dao
|
||||
from app.main.forms import TwoFactorForm
|
||||
from app import service_api_client
|
||||
from app import user_api_client
|
||||
|
||||
|
||||
@main.route('/two-factor', methods=['GET', 'POST'])
|
||||
@@ -23,18 +23,18 @@ def two_factor():
|
||||
return redirect('main.sign_in')
|
||||
|
||||
def _check_code(code):
|
||||
return users_dao.check_verify_code(user_id, code, "sms")
|
||||
return user_api_client.check_verify_code(user_id, code, "sms")
|
||||
|
||||
form = TwoFactorForm(_check_code)
|
||||
|
||||
if form.validate_on_submit():
|
||||
try:
|
||||
user = users_dao.get_user_by_id(user_id)
|
||||
user = user_api_client.get_user(user_id)
|
||||
services = service_api_client.get_services({'user_id': str(user_id)}).get('data', [])
|
||||
# Check if coming from new password page
|
||||
if 'password' in session['user_details']:
|
||||
user.set_password(session['user_details']['password'])
|
||||
users_dao.update_user(user)
|
||||
user_api_client.update_user(user)
|
||||
login_user(user, remember=True)
|
||||
finally:
|
||||
del session['user_details']
|
||||
|
||||
@@ -10,14 +10,6 @@ from flask.ext.login import current_user
|
||||
from flask_login import login_required
|
||||
from app.main import main
|
||||
|
||||
from app.main.dao.users_dao import (
|
||||
verify_password,
|
||||
update_user,
|
||||
check_verify_code,
|
||||
is_email_unique,
|
||||
send_verify_code
|
||||
)
|
||||
|
||||
from app.main.forms import (
|
||||
ChangePasswordForm,
|
||||
ChangeNameForm,
|
||||
@@ -50,7 +42,7 @@ def user_profile_name():
|
||||
|
||||
if form.validate_on_submit():
|
||||
current_user.name = form.new_name.data
|
||||
update_user(current_user)
|
||||
user_api_client.update_user(current_user)
|
||||
return redirect(url_for('.user_profile'))
|
||||
|
||||
return render_template(
|
||||
@@ -65,7 +57,7 @@ def user_profile_name():
|
||||
def user_profile_email():
|
||||
|
||||
def _is_email_unique(email):
|
||||
return is_email_unique(email)
|
||||
return user_api_client.is_email_unique(email)
|
||||
form = ChangeEmailForm(_is_email_unique,
|
||||
email_address=current_user.email_address)
|
||||
|
||||
@@ -84,7 +76,7 @@ def user_profile_email():
|
||||
def user_profile_email_authenticate():
|
||||
# Validate password for form
|
||||
def _check_password(pwd):
|
||||
return verify_password(current_user.id, pwd)
|
||||
return user_api_client.verify_password(current_user.id, pwd)
|
||||
form = ConfirmPasswordForm(_check_password)
|
||||
|
||||
if NEW_EMAIL not in session:
|
||||
@@ -92,7 +84,7 @@ def user_profile_email_authenticate():
|
||||
|
||||
if form.validate_on_submit():
|
||||
session[NEW_EMAIL_PASSWORD_CONFIRMED] = True
|
||||
send_verify_code(current_user.id, 'email', session[NEW_EMAIL])
|
||||
user_api_client.send_verify_code(current_user.id, 'email', session[NEW_EMAIL])
|
||||
return redirect(url_for('.user_profile_email_confirm'))
|
||||
|
||||
return render_template(
|
||||
@@ -109,7 +101,7 @@ def user_profile_email_confirm():
|
||||
|
||||
# Validate verify code for form
|
||||
def _check_code(cde):
|
||||
return check_verify_code(current_user.id, cde, 'email')
|
||||
return user_api_client.check_verify_code(current_user.id, cde, 'email')
|
||||
form = ConfirmEmailForm(_check_code)
|
||||
|
||||
if NEW_EMAIL_PASSWORD_CONFIRMED not in session:
|
||||
@@ -119,7 +111,7 @@ def user_profile_email_confirm():
|
||||
current_user.email_address = session[NEW_EMAIL]
|
||||
del session[NEW_EMAIL]
|
||||
del session[NEW_EMAIL_PASSWORD_CONFIRMED]
|
||||
update_user(current_user)
|
||||
user_api_client.update_user(current_user)
|
||||
return redirect(url_for('.user_profile'))
|
||||
|
||||
return render_template(
|
||||
@@ -152,7 +144,7 @@ def user_profile_mobile_number_authenticate():
|
||||
|
||||
# Validate password for form
|
||||
def _check_password(pwd):
|
||||
return verify_password(current_user.id, pwd)
|
||||
return user_api_client.verify_password(current_user.id, pwd)
|
||||
form = ConfirmPasswordForm(_check_password)
|
||||
|
||||
if NEW_MOBILE not in session:
|
||||
@@ -160,7 +152,7 @@ def user_profile_mobile_number_authenticate():
|
||||
|
||||
if form.validate_on_submit():
|
||||
session[NEW_MOBILE_PASSWORD_CONFIRMED] = True
|
||||
send_verify_code(current_user.id, 'sms', session[NEW_MOBILE])
|
||||
user_api_client.send_verify_code(current_user.id, 'sms', session[NEW_MOBILE])
|
||||
return redirect(url_for('.user_profile_mobile_number_confirm'))
|
||||
|
||||
return render_template(
|
||||
@@ -177,7 +169,7 @@ def user_profile_mobile_number_confirm():
|
||||
|
||||
# Validate verify code for form
|
||||
def _check_code(cde):
|
||||
return check_verify_code(current_user.id, cde, 'sms')
|
||||
return user_api_client.check_verify_code(current_user.id, cde, 'sms')
|
||||
|
||||
if NEW_MOBILE_PASSWORD_CONFIRMED not in session:
|
||||
return redirect(url_for('.user_profile_mobile_number'))
|
||||
@@ -188,7 +180,7 @@ def user_profile_mobile_number_confirm():
|
||||
current_user.mobile_number = session[NEW_MOBILE]
|
||||
del session[NEW_MOBILE]
|
||||
del session[NEW_MOBILE_PASSWORD_CONFIRMED]
|
||||
update_user(current_user)
|
||||
user_api_client.update_user(current_user)
|
||||
return redirect(url_for('.user_profile'))
|
||||
|
||||
return render_template(
|
||||
@@ -204,12 +196,12 @@ def user_profile_password():
|
||||
|
||||
# Validate password for form
|
||||
def _check_password(pwd):
|
||||
return verify_password(current_user.id, pwd)
|
||||
return user_api_client.verify_password(current_user.id, pwd)
|
||||
form = ChangePasswordForm(_check_password)
|
||||
|
||||
if form.validate_on_submit():
|
||||
current_user.set_password(form.new_password.data)
|
||||
update_user(current_user)
|
||||
user_api_client.update_user(current_user)
|
||||
return redirect(url_for('.user_profile'))
|
||||
|
||||
return render_template(
|
||||
|
||||
@@ -62,13 +62,6 @@ def invite_json(id, from_user, service_id, email_address, permissions, created_a
|
||||
TEST_USER_EMAIL = 'test@user.gov.uk'
|
||||
|
||||
|
||||
def create_test_user(state):
|
||||
from app.main.dao import users_dao
|
||||
user = None
|
||||
users_dao.insert_user(user)
|
||||
return user
|
||||
|
||||
|
||||
def create_test_api_user(state, permissions={}):
|
||||
from app.notify_client.user_api_client import User
|
||||
user_data = {'id': 1,
|
||||
@@ -83,18 +76,6 @@ def create_test_api_user(state, permissions={}):
|
||||
return user
|
||||
|
||||
|
||||
def create_another_test_user(state):
|
||||
from app.main.dao import users_dao
|
||||
user = None
|
||||
users_dao.insert_user(user)
|
||||
return user
|
||||
|
||||
|
||||
def get_test_user():
|
||||
from app.main.dao import users_dao
|
||||
return users_dao.get_user_by_email(TEST_USER_EMAIL)
|
||||
|
||||
|
||||
def job_json():
|
||||
import uuid
|
||||
import datetime
|
||||
|
||||
@@ -1,15 +1,14 @@
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
from app.main.forms import TwoFactorForm
|
||||
from app.main.dao import users_dao
|
||||
from tests import create_test_user
|
||||
from app import user_api_client
|
||||
|
||||
|
||||
def test_form_is_valid_returns_no_errors(app_, mock_check_verify_code):
|
||||
with app_.test_request_context(method='POST',
|
||||
data={'sms_code': '12345'}) as req:
|
||||
def _check_code(code):
|
||||
return users_dao.check_verify_code('1', code, "sms")
|
||||
return user_api_client.check_verify_code('1', code, "sms")
|
||||
form = TwoFactorForm(_check_code)
|
||||
assert form.validate() is True
|
||||
assert len(form.errors) == 0
|
||||
@@ -19,7 +18,7 @@ def test_returns_errors_when_code_is_too_short(app_, mock_check_verify_code):
|
||||
with app_.test_request_context(method='POST',
|
||||
data={'sms_code': '145'}) as req:
|
||||
def _check_code(code):
|
||||
return users_dao.check_verify_code('1', code, "sms")
|
||||
return user_api_client.check_verify_code('1', code, "sms")
|
||||
form = TwoFactorForm(_check_code)
|
||||
assert form.validate() is False
|
||||
assert len(form.errors) == 1
|
||||
@@ -30,7 +29,7 @@ def test_returns_errors_when_code_is_missing(app_, mock_check_verify_code):
|
||||
with app_.test_request_context(method='POST',
|
||||
data={}) as req:
|
||||
def _check_code(code):
|
||||
return users_dao.check_verify_code('1', code, "sms")
|
||||
return user_api_client.check_verify_code('1', code, "sms")
|
||||
form = TwoFactorForm(_check_code)
|
||||
assert form.validate() is False
|
||||
assert len(form.errors) == 1
|
||||
@@ -41,7 +40,7 @@ def test_returns_errors_when_code_contains_letters(app_, mock_check_verify_code)
|
||||
with app_.test_request_context(method='POST',
|
||||
data={'sms_code': 'asdfg'}) as req:
|
||||
def _check_code(code):
|
||||
return users_dao.check_verify_code('1', code, "sms")
|
||||
return user_api_client.check_verify_code('1', code, "sms")
|
||||
form = TwoFactorForm(_check_code)
|
||||
assert form.validate() is False
|
||||
assert len(form.errors) == 1
|
||||
@@ -53,7 +52,7 @@ def test_should_return_errors_when_code_is_expired(app_,
|
||||
with app_.test_request_context(method='POST',
|
||||
data={'sms_code': '23456'}) as req:
|
||||
def _check_code(code):
|
||||
return users_dao.check_verify_code('1', code, "sms")
|
||||
return user_api_client.check_verify_code('1', code, "sms")
|
||||
form = TwoFactorForm(_check_code)
|
||||
assert form.validate() is False
|
||||
errors = form.errors
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
from app.main.dao import users_dao
|
||||
from app.main.forms import RegisterUserForm
|
||||
|
||||
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
from tests import create_test_user
|
||||
from flask import url_for
|
||||
import pytest
|
||||
|
||||
|
||||
@@ -456,8 +456,15 @@ def mock_get_user_by_email_pending(mocker, api_user_pending):
|
||||
|
||||
|
||||
@pytest.fixture(scope='function')
|
||||
def mock_get_user_by_email_not_found(mocker):
|
||||
return mocker.patch('app.user_api_client.get_user_by_email', return_value=None)
|
||||
def mock_get_user_by_email_not_found(mocker, api_user_active):
|
||||
def _get_user(email):
|
||||
json_mock = Mock(return_value={'message': "Not found", 'result': 'error'})
|
||||
resp_mock = Mock(status_code=404, json=json_mock)
|
||||
http_error = HTTPError(response=resp_mock, message="Default message")
|
||||
raise http_error
|
||||
return mocker.patch(
|
||||
'app.user_api_client.get_user_by_email',
|
||||
side_effect=_get_user)
|
||||
|
||||
|
||||
@pytest.fixture(scope='function')
|
||||
@@ -489,7 +496,7 @@ def mock_is_email_not_unique(mocker):
|
||||
|
||||
@pytest.fixture(scope='function')
|
||||
def mock_get_all_users_from_api(mocker):
|
||||
return mocker.patch('app.main.dao.users_dao.user_api_client.get_users')
|
||||
return mocker.patch('app.user_api_client.get_users', return_value={'data': []})
|
||||
|
||||
|
||||
@pytest.fixture(scope='function')
|
||||
|
||||
Reference in New Issue
Block a user