Files
notifications-admin/app/models/user.py
Chris Hill-Scott f34a252e72 Remove defaults from User model
the api always returns exactly:
```
id
name
email_address
auth_type
current_session_id
failed_login_count
logged_in_at
mobile_number
organisations
password_changed_at
permissions
platform_admin
services
state
```

it does this through `models.py::User.serialize` – there is an old
Marshmallow `user_schema` in `schemas.py` but this isn’t used for
dumping return data, only parsing the json in the create user rest
endpoint.

This means we can rely on these keys always being in the dictionary.
2019-06-05 14:55:43 +01:00

557 lines
17 KiB
Python

from collections.abc import Sequence
from flask import abort, current_app, request, session
from flask_login import AnonymousUserMixin, UserMixin, login_user
from notifications_python_client.errors import HTTPError
from werkzeug.utils import cached_property
from app.models import JSONModel
from app.models.organisation import Organisation
from app.models.roles_and_permissions import (
all_permissions,
translate_permissions_from_db_to_admin_roles,
)
from app.notify_client import InviteTokenError
from app.notify_client.invite_api_client import invite_api_client
from app.notify_client.org_invite_api_client import org_invite_api_client
from app.notify_client.organisations_api_client import organisations_client
from app.notify_client.user_api_client import user_api_client
from app.utils import is_gov_user
def _get_service_id_from_view_args():
return str(request.view_args.get('service_id', '')) or None
def _get_org_id_from_view_args():
return str(request.view_args.get('org_id', '')) or None
class User(JSONModel, UserMixin):
ALLOWED_PROPERTIES = {
'id',
'name',
'email_address',
'auth_type',
'current_session_id',
'failed_login_count',
'logged_in_at',
'mobile_number',
'organisations',
'password_changed_at',
'permissions',
'platform_admin',
'services',
'state',
}
def __init__(self, _dict):
super().__init__(_dict)
self.permissions = _dict.get('permissions', {})
self.max_failed_login_count = current_app.config['MAX_FAILED_LOGIN_COUNT']
@classmethod
def from_id(cls, user_id):
return cls(user_api_client.get_user(user_id))
@classmethod
def from_email_address(cls, email_address):
return cls(user_api_client.get_user_by_email(email_address))
@classmethod
def from_email_address_or_none(cls, email_address):
response = user_api_client.get_user_by_email_or_none(email_address)
if response:
return cls(response)
return None
@staticmethod
def already_registered(email_address):
return bool(User.from_email_address_or_none(email_address))
@classmethod
def from_email_address_and_password_or_none(cls, email_address, password):
user = cls.from_email_address_or_none(email_address)
if not user:
return None
if user.locked:
return None
if not user_api_client.verify_password(user.id, password):
return None
return user
@property
def permissions(self):
return self._permissions
@permissions.setter
def permissions(self, permissions_by_service):
"""
Permissions is a dict {'service_id': ['permission a', 'permission b', 'permission c']}
The api currently returns some granular permissions that we don't set or use separately (but may want
to in the future):
* send_texts, send_letters and send_emails become send_messages
* manage_user and manage_settings become
users either have all three permissions for a service or none of them, they're not helpful to distinguish
between on the front end. So lets collapse them into "send_messages" and "manage_service". If we want to split
them out later, we'll need to rework this function.
"""
self._permissions = {
service: translate_permissions_from_db_to_admin_roles(permissions)
for service, permissions
in permissions_by_service.items()
}
def update(self, **kwargs):
response = user_api_client.update_user_attribute(self.id, **kwargs)
self.__init__(response)
def update_password(self, password):
response = user_api_client.update_password(self.id, password)
self.__init__(response)
def set_permissions(self, service_id, permissions, folder_permissions):
user_api_client.set_user_permissions(
self.id,
service_id,
permissions=permissions,
folder_permissions=folder_permissions,
)
def logged_in_elsewhere(self):
# if the current user (ie: db object) has no session, they've never logged in before
return self.current_session_id is not None and session.get('current_session_id') != self.current_session_id
def activate(self):
if self.state == 'pending':
user_data = user_api_client.activate_user(self.id)
return self.__class__(user_data['data'])
else:
return self
def login(self):
login_user(self)
def sign_in(self):
session['user_details'] = {"email": self.email_address, "id": self.id}
if not self.is_active:
return False
if self.email_auth:
user_api_client.send_verify_code(self.id, 'email', None, request.args.get('next'))
if self.sms_auth:
user_api_client.send_verify_code(self.id, 'sms', self.mobile_number)
return True
@property
def sms_auth(self):
return self.auth_type == 'sms_auth'
@property
def email_auth(self):
return self.auth_type == 'email_auth'
def reset_failed_login_count(self):
user_api_client.reset_failed_login_count(self.id)
@property
def is_active(self):
return self.state == 'active'
@property
def is_gov_user(self):
return is_gov_user(self.email_address)
@property
def is_authenticated(self):
return (
not self.logged_in_elsewhere() and
super(User, self).is_authenticated
)
def has_permissions(self, *permissions, restrict_admin_usage=False):
unknown_permissions = set(permissions) - all_permissions
if unknown_permissions:
raise TypeError('{} are not valid permissions'.format(list(unknown_permissions)))
# Service id is always set on the request for service specific views.
service_id = _get_service_id_from_view_args()
org_id = _get_org_id_from_view_args()
if not service_id and not org_id:
# we shouldn't have any pages that require permissions, but don't specify a service or organisation.
# use @user_is_platform_admin for platform admin only pages
raise NotImplementedError
# platform admins should be able to do most things (except eg send messages, or create api keys)
if self.platform_admin and not restrict_admin_usage:
return True
if org_id:
return org_id in self.organisations
if not permissions:
return service_id in self.services
if service_id:
return any(x in self._permissions.get(service_id, []) for x in permissions)
def has_permission_for_service(self, service_id, permission):
return permission in self._permissions.get(service_id, [])
def has_template_folder_permission(self, template_folder, service=None):
if self.platform_admin:
return True
# Top-level templates are always visible
if template_folder is None or template_folder['id'] is None:
return True
return self.id in template_folder.get("users_with_permission", [])
def template_folders_for_service(self, service=None):
"""
Returns list of template folders that a user can view for a given service
"""
return [
template_folder
for template_folder in service.all_template_folders
if self.id in template_folder.get("users_with_permission", [])
]
def belongs_to_service(self, service_id):
return str(service_id) in self.services
def belongs_to_service_or_403(self, service_id):
if not self.belongs_to_service(service_id):
abort(403)
@property
def locked(self):
return self.failed_login_count >= self.max_failed_login_count
@property
def email_domain(self):
return self.email_address.split('@')[-1]
@cached_property
def default_organisation(self):
return Organisation(
organisations_client.get_organisation_by_domain(self.email_domain)
)
@property
def default_organisation_type(self):
if self.default_organisation:
return self.default_organisation.organisation_type
if self.has_nhs_email_address:
return 'nhs'
return None
@property
def has_nhs_email_address(self):
return self.email_address.lower().endswith((
'@nhs.uk', '.nhs.uk', '@nhs.net', '.nhs.net',
))
def serialize(self):
dct = {
"id": self.id,
"name": self.name,
"email_address": self.email_address,
"mobile_number": self.mobile_number,
"password_changed_at": self.password_changed_at,
"state": self.state,
"failed_login_count": self.failed_login_count,
"permissions": [x for x in self._permissions],
"organisations": self.organisations,
"current_session_id": self.current_session_id
}
if hasattr(self, '_password'):
dct['password'] = self._password
return dct
@classmethod
def register(
cls,
name,
email_address,
mobile_number,
password,
auth_type,
):
return cls(user_api_client.register_user(
name,
email_address,
mobile_number or None,
password,
auth_type,
))
def set_password(self, pwd):
self._password = pwd
def send_verify_email(self):
user_api_client.send_verify_email(self.id, self.email_address)
def send_verify_code(self, to=None):
user_api_client.send_verify_code(self.id, 'sms', to or self.mobile_number)
def send_already_registered_email(self):
user_api_client.send_already_registered_email(self.id, self.email_address)
def refresh_session_id(self):
self.current_session_id = user_api_client.get_user(self.id).get('current_session_id')
session['current_session_id'] = self.current_session_id
class InvitedUser(JSONModel):
ALLOWED_PROPERTIES = {
'id',
'service',
'email_address',
'permissions',
'status',
'created_at',
'auth_type',
'folder_permissions',
}
def __init__(self, _dict):
super().__init__(_dict)
self.permissions = _dict.get('permissions') or []
self._from_user = _dict['from_user']
@classmethod
def create(
cls,
invite_from_id,
service_id,
email_address,
permissions,
auth_type,
folder_permissions,
):
return cls(invite_api_client.create_invite(
invite_from_id,
service_id,
email_address,
permissions,
auth_type,
folder_permissions,
))
def accept_invite(self):
invite_api_client.accept_invite(self.service, self.id)
def add_to_service(self):
user_api_client.add_user_to_service(
self.service,
self.id,
self.permissions,
self.folder_permissions,
)
@property
def permissions(self):
return self._permissions
@permissions.setter
def permissions(self, permissions):
if isinstance(permissions, list):
self._permissions = permissions
else:
self._permissions = permissions.split(',')
self._permissions = translate_permissions_from_db_to_admin_roles(self.permissions)
@property
def from_user(self):
return User.from_id(self._from_user)
@property
def sms_auth(self):
return self.auth_type == 'sms_auth'
@property
def email_auth(self):
return self.auth_type == 'email_auth'
@classmethod
def from_token(cls, token):
try:
return cls(invite_api_client.check_token(token))
except HTTPError as exception:
if exception.status_code == 400 and 'invitation' in exception.message:
raise InviteTokenError(exception.message['invitation'])
else:
raise exception
@classmethod
def from_session(cls):
invited_user = session.get('invited_user')
return cls(invited_user) if invited_user else None
def has_permissions(self, *permissions):
if self.status == 'cancelled':
return False
return set(self.permissions) > set(permissions)
def has_permission_for_service(self, service_id, permission):
if self.status == 'cancelled':
return False
return self.service == service_id and permission in self.permissions
def __eq__(self, other):
return ((self.id,
self.service,
self._from_user,
self.email_address,
self.auth_type,
self.status) == (other.id,
other.service,
other._from_user,
other.email_address,
other.auth_type,
other.status))
def serialize(self, permissions_as_string=False):
data = {'id': self.id,
'service': self.service,
'from_user': self._from_user,
'email_address': self.email_address,
'status': self.status,
'created_at': str(self.created_at),
'auth_type': self.auth_type,
'folder_permissions': self.folder_permissions
}
if permissions_as_string:
data['permissions'] = ','.join(self.permissions)
else:
data['permissions'] = sorted(self.permissions)
return data
def template_folders_for_service(self, service=None):
# only used on the manage users page to display the count, so okay to not be fully fledged for now
return [{'id': x} for x in self.folder_permissions]
class InvitedOrgUser(JSONModel):
ALLOWED_PROPERTIES = {
'id',
'organisation',
'email_address',
'status',
'created_at',
}
def __init__(self, _dict):
super().__init__(_dict)
self._invited_by = _dict['invited_by']
def __eq__(self, other):
return ((self.id,
self.organisation,
self._invited_by,
self.email_address,
self.status) == (other.id,
other.organisation,
other._invited_by,
other.email_address,
other.status))
@classmethod
def create(cls, invite_from_id, org_id, email_address):
return cls(org_invite_api_client.create_invite(
invite_from_id, org_id, email_address
))
@classmethod
def from_session(cls):
invited_org_user = session.get('invited_org_user')
return cls(invited_org_user) if invited_org_user else None
def serialize(self, permissions_as_string=False):
data = {'id': self.id,
'organisation': self.organisation,
'invited_by': self._invited_by,
'email_address': self.email_address,
'status': self.status,
'created_at': str(self.created_at)
}
return data
@property
def invited_by(self):
return User.from_id(self._invited_by)
@classmethod
def from_token(cls, token):
try:
return cls(org_invite_api_client.check_token(token))
except HTTPError as exception:
if exception.status_code == 400 and 'invitation' in exception.message:
raise InviteTokenError(exception.message['invitation'])
else:
raise exception
def accept_invite(self):
org_invite_api_client.accept_invite(self.organisation, self.id)
def add_to_organisation(self):
user_api_client.add_user_to_organisation(self.organisation, self.id)
class AnonymousUser(AnonymousUserMixin):
# set the anonymous user so that if a new browser hits us we don't error http://stackoverflow.com/a/19275188
def logged_in_elsewhere(self):
return False
@property
def default_organisation(self):
return Organisation(None)
class Users(Sequence):
client = user_api_client.get_users_for_service
model = User
def __init__(self, service_id):
self.users = self.client(service_id)
def __getitem__(self, index):
return self.model(self.users[index])
def __len__(self):
return len(self.users)
def __add__(self, other):
return list(self) + list(other)
class OrganisationUsers(Users):
client = user_api_client.get_users_for_organisation
class InvitedUsers(Users):
client = invite_api_client.get_invites_for_service
model = InvitedUser
def __init__(self, service_id):
self.users = [
user for user in self.client(service_id)
if user['status'] != 'accepted'
]
class OrganisationInvitedUsers(InvitedUsers):
client = org_invite_api_client.get_invites_for_organisation
model = InvitedOrgUser