Add logging around auth{entication,orization} calls

This commit is contained in:
Ryan Ahearn
2023-04-26 11:14:25 -04:00
parent 99c94851c0
commit a42617f776
5 changed files with 42 additions and 11 deletions

View File

@@ -15,7 +15,10 @@ from notifications_python_client.errors import HTTPError
from notifications_utils.url_safe_token import check_token
from app import user_api_client
from app.event_handlers import create_email_change_event, create_mobile_number_change_event
from app.event_handlers import (
create_email_change_event,
create_mobile_number_change_event,
)
from app.main import main
from app.main.forms import (
ChangeEmailForm,
@@ -95,7 +98,7 @@ def user_profile_email_authenticate():
create_email_change_event(
user_id=current_user.id,
updated_by_id=current_user.id,
original_email_address=current_user.email,
original_email_address=current_user.email_address,
new_email_address=session[NEW_EMAIL],
)
return render_template('views/change-email-continue.html',

View File

@@ -1,6 +1,6 @@
from datetime import datetime
from flask import abort, request, session
from flask import abort, current_app, request, session
from flask_login import AnonymousUserMixin, UserMixin, login_user, logout_user
from notifications_python_client.errors import HTTPError
from notifications_utils.timezones import convert_utc_to_local_timezone
@@ -175,6 +175,7 @@ class User(JSONModel, UserMixin):
# Update the db so the server also knows the user is logged out.
self.update(current_session_id=None)
logout_user()
current_app.logger.info(f"Logged out {self.id}")
@property
def sms_auth(self):
@@ -201,7 +202,9 @@ class User(JSONModel, UserMixin):
@property
def is_gov_user(self):
return is_gov_user(self.email_address)
is_gov = is_gov_user(self.email_address)
current_app.logger.info(f"User {self.id} is_gov_user: {is_gov}")
return is_gov
@property
def is_authenticated(self):
@@ -212,6 +215,7 @@ class User(JSONModel, UserMixin):
@property
def platform_admin(self):
current_app.logger.warn(f"Checking User {self.id} for platform admin: {self._platform_admin}")
return self._platform_admin and not session.get('disable_platform_admin_view', False)
def has_permissions(self, *permissions, restrict_admin_usage=False, allow_org_user=False):
@@ -228,32 +232,45 @@ class User(JSONModel, UserMixin):
# use @user_is_platform_admin for platform admin only pages
raise NotImplementedError
log_msg = f"has_permissions user: {self.id} service: {service_id}"
# platform admins should be able to do most things (except eg send messages, or create api keys)
if self.platform_admin and not restrict_admin_usage:
current_app.logger.warn(f"{log_msg} true because user is platform_admin")
return True
if org_id:
return self.belongs_to_organisation(org_id)
value = self.belongs_to_organisation(org_id)
current_app.logger.warn(f"{log_msg} org: {org_id} returning {value}")
return value
if not permissions and self.belongs_to_service(service_id):
current_app.logger.warn(f"{log_msg} True because belongs_to_service")
return True
if any(
self.permissions_for_service(service_id) & set(permissions)
):
current_app.logger.warn(f"{log_msg} permissions valid")
return True
from app.models.service import Service
return allow_org_user and self.belongs_to_organisation(
org_value = allow_org_user and self.belongs_to_organisation(
Service.from_id(service_id).organisation_id
)
current_app.logger.warn(f"{log_msg} returning {org_value}")
return org_value
def permissions_for_service(self, service_id):
return self._permissions.get(service_id, set())
def has_permission_for_service(self, service_id, permission):
return permission in self._permissions.get(service_id, [])
has_permission = permission in self.permissions_for_service(service_id)
current_app.logger.warn(
f"has_permission_for_service user: {self.id} service: {service_id} "
f"permission: {permission} retuning {has_permission}"
)
return has_permission
def has_template_folder_permission(self, template_folder, service=None):
if self.platform_admin:
@@ -547,11 +564,13 @@ class InvitedUser(JSONModel):
return cls.by_id(invited_user_id) if invited_user_id else None
def has_permissions(self, *permissions):
current_app.logger.warn(f"Checking invited user {self.id} for permissions: {permissions}")
if self.status == 'cancelled':
return False
return set(self.permissions) > set(permissions)
def has_permission_for_service(self, service_id, permission):
current_app.logger.warn(f"Checking invited user {self.id} for permission: {permission} on service {service_id}")
if self.status == 'cancelled':
return False
return self.service == service_id and permission in self.permissions

View File

@@ -1,3 +1,4 @@
from flask import current_app
from notifications_python_client.errors import HTTPError
from app.notify_client import NotifyAdminAPIClient, cache
@@ -83,13 +84,16 @@ class UserApiClient(NotifyAdminAPIClient):
@cache.delete('user-{user_id}')
def verify_password(self, user_id, password):
try:
current_app.logger.warn(f"Checking password for {user_id}")
url = "/user/{}/verify/password".format(user_id)
data = {"password": password}
self.post(url, data=data)
return True
except HTTPError as e:
if e.status_code == 400 or e.status_code == 404:
current_app.logger.error(f"Password for {user_id} was invalid")
return False
raise
def send_verify_code(self, user_id, code_type, to, next_string=None):
data = {'to': to}
@@ -98,6 +102,7 @@ class UserApiClient(NotifyAdminAPIClient):
if code_type == 'email':
data['email_auth_link_host'] = self.admin_url
endpoint = '/user/{0}/{1}-code'.format(user_id, code_type)
current_app.logger.warn(f"Sending verify_code {code_type} to {user_id}")
self.post(endpoint, data=data)
def send_verify_email(self, user_id, to):
@@ -118,24 +123,28 @@ class UserApiClient(NotifyAdminAPIClient):
data = {'code_type': code_type, 'code': code}
endpoint = '/user/{}/verify/code'.format(user_id)
try:
current_app.logger.warn(f"Checking verify code for {user_id}")
self.post(endpoint, data=data)
return True, ''
except HTTPError as e:
if e.status_code == 400 or e.status_code == 404:
current_app.logger.error(f"Verify code for {user_id} was invalid")
return False, e.message
raise e
raise
@cache.delete('user-{user_id}')
def complete_webauthn_login_attempt(self, user_id, is_successful):
data = {'successful': is_successful}
endpoint = f'/user/{user_id}/complete/webauthn-login'
try:
current_app.logger.warn(f"Sending webauthn-login for {user_id}")
self.post(endpoint, data=data)
return True, ''
except HTTPError as e:
if e.status_code == 403:
current_app.logger.error(f"Webauthn-login attempt for {user_id} was invalid")
return False, e.message
raise e
raise
def get_users_for_service(self, service_id):
endpoint = '/service/{}/users'.format(service_id)

View File

@@ -71,7 +71,7 @@ def test_platform_admin_flag_set_in_session(
mocker.patch.dict('app.models.user.session', values=session_dict, clear=True)
assert User({'platform_admin': is_platform_admin}).platform_admin == expected_result
assert User({'id': 1, 'platform_admin': is_platform_admin}).platform_admin == expected_result
def test_has_live_services(

View File

@@ -1000,7 +1000,7 @@ def active_user_no_settings_permission():
def api_user_locked(fake_uuid):
return create_user(
id=fake_uuid,
failed_login_count=5,
failed_login_count=10,
password_changed_at=None,
)