mirror of
https://github.com/GSA/notifications-admin.git
synced 2026-06-24 01:11:15 -04:00
Merge pull request #3970 from alphagov/audit-perm-changes-178770155
Audit permissions when they change for a user
This commit is contained in:
@@ -1,13 +1,14 @@
|
||||
from flask import request
|
||||
|
||||
from app import events_api_client
|
||||
from app.notify_client.events_api_client import events_api_client
|
||||
|
||||
EVENT_SCHEMAS = {
|
||||
"sucessful_login": {"user_id"},
|
||||
"update_user_email": {"user_id", "updated_by_id", "original_email_address", "new_email_address"},
|
||||
"update_user_mobile_number": {"user_id", "updated_by_id", "original_mobile_number", "new_mobile_number"},
|
||||
"remove_user_from_service": {"user_id", "removed_by_id", "service_id"},
|
||||
"add_user_to_service": {"user_id", "invited_by_id", "service_id"},
|
||||
"add_user_to_service": {"user_id", "invited_by_id", "service_id", "ui_permissions"},
|
||||
"set_user_permissions": {"user_id", "service_id", "original_ui_permissions", "new_ui_permissions", "set_by_id"},
|
||||
"archive_user": {"user_id", "archived_by_id"},
|
||||
"change_broadcast_account_type": {"service_id", "changed_by_id", "service_mode", "broadcast_channel", "provider_restriction"}, # noqa: E501 (length)
|
||||
"archive_service": {"service_id", "archived_by_id"},
|
||||
@@ -36,6 +37,10 @@ def create_add_user_to_service_event(**kwargs):
|
||||
_send_event('add_user_to_service', **kwargs)
|
||||
|
||||
|
||||
def create_set_user_permissions_event(**kwargs):
|
||||
_send_event('set_user_permissions', **kwargs)
|
||||
|
||||
|
||||
def create_archive_user_event(**kwargs):
|
||||
_send_event('archive_user', **kwargs)
|
||||
|
||||
|
||||
@@ -141,6 +141,7 @@ def edit_user_permissions(service_id, user_id):
|
||||
service_id,
|
||||
permissions=form.permissions,
|
||||
folder_permissions=form.folder_permissions.data,
|
||||
set_by_id=current_user.id,
|
||||
)
|
||||
# Only change the auth type if this is supported for a service. If a user logs in with a
|
||||
# security key, we generally don't want them to be able to use something less secure.
|
||||
|
||||
@@ -4,6 +4,10 @@ from notifications_python_client.errors import HTTPError
|
||||
from notifications_utils.timezones import utc_string_to_aware_gmt_datetime
|
||||
from werkzeug.utils import cached_property
|
||||
|
||||
from app.event_handlers import (
|
||||
create_add_user_to_service_event,
|
||||
create_set_user_permissions_event,
|
||||
)
|
||||
from app.models import JSONModel, ModelList
|
||||
from app.models.organisation import Organisation
|
||||
from app.models.roles_and_permissions import (
|
||||
@@ -121,13 +125,20 @@ class User(JSONModel, UserMixin):
|
||||
datetime_string
|
||||
)
|
||||
|
||||
def set_permissions(self, service_id, permissions, folder_permissions):
|
||||
def set_permissions(self, service_id, permissions, folder_permissions, set_by_id):
|
||||
user_api_client.set_user_permissions(
|
||||
self.id,
|
||||
service_id,
|
||||
permissions=permissions,
|
||||
folder_permissions=folder_permissions,
|
||||
)
|
||||
create_set_user_permissions_event(
|
||||
user_id=self.id,
|
||||
service_id=service_id,
|
||||
original_ui_permissions=self.permissions_for_service(service_id),
|
||||
new_ui_permissions=permissions,
|
||||
set_by_id=set_by_id,
|
||||
)
|
||||
|
||||
def logged_in_elsewhere(self):
|
||||
return session.get('current_session_id') != self.current_session_id
|
||||
@@ -214,8 +225,7 @@ class User(JSONModel, UserMixin):
|
||||
return True
|
||||
|
||||
if any(
|
||||
self.has_permission_for_service(service_id, permission)
|
||||
for permission in permissions
|
||||
self.permissions_for_service(service_id) & set(permissions)
|
||||
):
|
||||
return True
|
||||
|
||||
@@ -225,6 +235,9 @@ class User(JSONModel, UserMixin):
|
||||
Service.from_id(service_id).organisation_id
|
||||
)
|
||||
|
||||
def permissions_for_service(self, service_id):
|
||||
return self._permissions.get(service_id, set())
|
||||
|
||||
def has_permission_for_service(self, service_id, permission):
|
||||
return permission in self._permissions.get(service_id, [])
|
||||
|
||||
@@ -401,8 +414,6 @@ class User(JSONModel, UserMixin):
|
||||
session['current_session_id'] = self.current_session_id
|
||||
|
||||
def add_to_service(self, service_id, permissions, folder_permissions, invited_by_id):
|
||||
from app.event_handlers import create_add_user_to_service_event
|
||||
|
||||
try:
|
||||
user_api_client.add_user_to_service(
|
||||
service_id,
|
||||
@@ -414,6 +425,7 @@ class User(JSONModel, UserMixin):
|
||||
user_id=self.id,
|
||||
invited_by_id=invited_by_id,
|
||||
service_id=service_id,
|
||||
ui_permissions=permissions,
|
||||
)
|
||||
except HTTPError as exception:
|
||||
if exception.status_code == 400 and 'already part of service' in exception.message:
|
||||
|
||||
@@ -46,7 +46,6 @@ def test_existing_user_accept_invite_calls_api_and_redirects_to_dashboard(
|
||||
):
|
||||
expected_service = service_one['id']
|
||||
expected_permissions = {'view_activity', 'send_messages', 'manage_service', 'manage_api_keys'}
|
||||
mock_audit_event = mocker.patch('app.event_handlers.create_add_user_to_service_event')
|
||||
|
||||
response = client.get(url_for('main.accept_invite', token='thisisnotarealtoken'))
|
||||
|
||||
@@ -62,11 +61,6 @@ def test_existing_user_accept_invite_calls_api_and_redirects_to_dashboard(
|
||||
|
||||
assert response.status_code == 302
|
||||
assert response.location == url_for('main.service_dashboard', service_id=expected_service, _external=True)
|
||||
mock_audit_event.assert_called_once_with(
|
||||
invited_by_id=service_one['users'][0],
|
||||
service_id=SERVICE_ONE_ID,
|
||||
user_id=api_user_active['id'],
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize('trial_mode, expected_endpoint', (
|
||||
@@ -270,8 +264,6 @@ def test_accept_invite_redirects_if_api_raises_an_error_that_they_are_already_pa
|
||||
mock_no_users_for_service,
|
||||
mock_get_user,
|
||||
):
|
||||
mock_audit_event = mocker.patch('app.event_handlers.create_add_user_to_service_event')
|
||||
|
||||
mocker.patch('app.user_api_client.add_user_to_service', side_effect=HTTPError(
|
||||
response=Mock(
|
||||
status_code=400,
|
||||
@@ -285,7 +277,6 @@ def test_accept_invite_redirects_if_api_raises_an_error_that_they_are_already_pa
|
||||
|
||||
response = client.get(url_for('main.accept_invite', token='thisisnotarealtoken'), follow_redirects=False)
|
||||
assert response.location == url_for('main.service_dashboard', service_id=SERVICE_ONE_ID, _external=True)
|
||||
assert not mock_audit_event.called
|
||||
|
||||
|
||||
def test_existing_signed_out_user_accept_invite_redirects_to_sign_in(
|
||||
@@ -574,8 +565,6 @@ def test_new_invited_user_verifies_and_added_to_service(
|
||||
mock_create_event,
|
||||
mocker,
|
||||
):
|
||||
mock_audit_event = mocker.patch('app.event_handlers.create_add_user_to_service_event')
|
||||
|
||||
# visit accept token page
|
||||
response = client.get(url_for('main.accept_invite', token='thisisnotarealtoken'))
|
||||
assert response.status_code == 302
|
||||
@@ -611,10 +600,6 @@ def test_new_invited_user_verifies_and_added_to_service(
|
||||
mock_check_verify_code.assert_called_once_with(new_user_id, '12345', 'sms')
|
||||
assert service_one['id'] == session['service_id']
|
||||
|
||||
mock_audit_event.assert_called_once_with(invited_by_id=service_one['users'][0],
|
||||
service_id=service_one['id'],
|
||||
user_id=new_user_id)
|
||||
|
||||
raw_html = response.data.decode('utf-8')
|
||||
page = BeautifulSoup(raw_html, 'html.parser')
|
||||
assert page.find('h1').text == 'Dashboard'
|
||||
|
||||
@@ -334,8 +334,6 @@ def test_register_from_email_auth_invite(
|
||||
fake_uuid,
|
||||
mocker,
|
||||
):
|
||||
mock_audit_event = mocker.patch('app.event_handlers.create_add_user_to_service_event')
|
||||
|
||||
sample_invite['auth_type'] = 'email_auth'
|
||||
sample_invite['email_address'] = invite_email_address
|
||||
with client.session_transaction() as session:
|
||||
@@ -373,10 +371,6 @@ def test_register_from_email_auth_invite(
|
||||
assert current_user.is_authenticated
|
||||
assert mock_add_user_to_service.called
|
||||
|
||||
mock_audit_event.assert_called_once_with(invited_by_id=service_one['users'][0],
|
||||
service_id=sample_invite['service'],
|
||||
user_id=fake_uuid)
|
||||
|
||||
with client.session_transaction() as session:
|
||||
# invited user details are still there so they can get added to the service
|
||||
assert session['invited_user_id'] == sample_invite['id']
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import pytest
|
||||
|
||||
from app.models.user import AnonymousUser, InvitedOrgUser, InvitedUser, User
|
||||
from tests.conftest import USER_ONE_ID
|
||||
from tests.conftest import SERVICE_ONE_ID, USER_ONE_ID
|
||||
|
||||
|
||||
def test_anonymous_user(notify_admin):
|
||||
@@ -135,3 +135,44 @@ def test_invited_org_user_from_session_uses_id(client, mocker, mock_get_invited_
|
||||
def test_invited_org_user_from_session_returns_none_if_nothing_present(client, mocker):
|
||||
mocker.patch.dict('app.models.user.session', values={}, clear=True)
|
||||
assert InvitedOrgUser.from_session() is None
|
||||
|
||||
|
||||
def test_set_permissions(client, mocker, active_user_view_permissions, fake_uuid):
|
||||
mock_api = mocker.patch('app.models.user.user_api_client.set_user_permissions')
|
||||
mock_event = mocker.patch('app.models.user.create_set_user_permissions_event')
|
||||
|
||||
User(active_user_view_permissions).set_permissions(
|
||||
service_id=SERVICE_ONE_ID,
|
||||
permissions={'manage_templates'},
|
||||
folder_permissions=[],
|
||||
set_by_id=fake_uuid,
|
||||
)
|
||||
|
||||
mock_api.assert_called_once()
|
||||
mock_event.assert_called_once_with(
|
||||
service_id=SERVICE_ONE_ID,
|
||||
user_id=active_user_view_permissions['id'],
|
||||
original_ui_permissions={'view_activity'},
|
||||
new_ui_permissions={'manage_templates'},
|
||||
set_by_id=fake_uuid,
|
||||
)
|
||||
|
||||
|
||||
def test_add_to_service(client, mocker, api_user_active, fake_uuid):
|
||||
mock_api = mocker.patch('app.models.user.user_api_client.add_user_to_service')
|
||||
mock_event = mocker.patch('app.models.user.create_add_user_to_service_event')
|
||||
|
||||
User(api_user_active).add_to_service(
|
||||
service_id=SERVICE_ONE_ID,
|
||||
permissions={'manage_templates'},
|
||||
folder_permissions=[],
|
||||
invited_by_id=fake_uuid,
|
||||
)
|
||||
|
||||
mock_api.assert_called_once()
|
||||
mock_event.assert_called_once_with(
|
||||
service_id=SERVICE_ONE_ID,
|
||||
user_id=api_user_active['id'],
|
||||
invited_by_id=fake_uuid,
|
||||
ui_permissions={'manage_templates'},
|
||||
)
|
||||
|
||||
@@ -10,6 +10,7 @@ from app.event_handlers import (
|
||||
create_mobile_number_change_event,
|
||||
create_remove_user_from_service_event,
|
||||
create_resume_service_event,
|
||||
create_set_user_permissions_event,
|
||||
create_suspend_service_event,
|
||||
on_user_logged_in,
|
||||
)
|
||||
@@ -48,7 +49,8 @@ def test_create_add_user_to_service_event_calls_events_api(client, mock_events):
|
||||
kwargs = {
|
||||
"user_id": str(uuid.uuid4()),
|
||||
"invited_by_id": str(uuid.uuid4()),
|
||||
"service_id": str(uuid.uuid4())
|
||||
"service_id": str(uuid.uuid4()),
|
||||
"ui_permissions": {'manage_templates'},
|
||||
}
|
||||
|
||||
create_add_user_to_service_event(**kwargs)
|
||||
@@ -129,3 +131,16 @@ def test_resume_service(client, mock_events):
|
||||
|
||||
create_resume_service_event(**kwargs)
|
||||
mock_events.assert_called_with('resume_service', event_dict(**kwargs))
|
||||
|
||||
|
||||
def test_set_user_permissions(client, mock_events):
|
||||
kwargs = {
|
||||
"user_id": str(uuid.uuid4()),
|
||||
"service_id": str(uuid.uuid4()),
|
||||
"original_ui_permissions": set("manage_templates"),
|
||||
"new_ui_permissions": set("view_activity"),
|
||||
"set_by_id": str(uuid.uuid4()),
|
||||
}
|
||||
|
||||
create_set_user_permissions_event(**kwargs)
|
||||
mock_events.assert_called_with('set_user_permissions', event_dict(**kwargs))
|
||||
|
||||
Reference in New Issue
Block a user