Files
notifications-admin/app/models/user.py

565 lines
17 KiB
Python
Raw Normal View History

from collections.abc import Sequence
from flask import abort, current_app, request, session
from flask_login import AnonymousUserMixin, UserMixin, login_user
from notifications_python_client.errors import HTTPError
from werkzeug.utils import cached_property
from app.models import JSONModel
from app.models.organisation import Organisation
from app.models.roles_and_permissions import (
all_permissions,
translate_permissions_from_db_to_admin_roles,
)
from app.notify_client import InviteTokenError
from app.notify_client.invite_api_client import invite_api_client
from app.notify_client.org_invite_api_client import org_invite_api_client
from app.notify_client.organisations_api_client import organisations_client
from app.notify_client.user_api_client import user_api_client
from app.utils import is_gov_user
def _get_service_id_from_view_args():
return str(request.view_args.get('service_id', '')) or None
def _get_org_id_from_view_args():
return str(request.view_args.get('org_id', '')) or None
class User(JSONModel, UserMixin):
ALLOWED_PROPERTIES = {
'id',
'name',
'email_address',
'mobile_number',
'password_changed_at',
'auth_type',
'failed_login_count',
'state',
'logged_in_at',
'platform_admin',
'current_session_id',
'services',
'organisations',
}
DEFAULTS = {
'current_session_id': None,
'services': [],
'organisations': [],
'logged_in_at': None,
'platform_admin': False,
}
def __init__(self, _dict):
super().__init__(_dict)
self.permissions = _dict.get('permissions', {})
self.max_failed_login_count = current_app.config['MAX_FAILED_LOGIN_COUNT']
@classmethod
def from_id(cls, user_id):
return cls(user_api_client.get_user(user_id))
@classmethod
def from_email_address(cls, email_address):
return cls(user_api_client.get_user_by_email(email_address))
@classmethod
def from_email_address_or_none(cls, email_address):
response = user_api_client.get_user_by_email_or_none(email_address)
if response:
return cls(response)
return None
@staticmethod
def already_registered(email_address):
return bool(User.from_email_address_or_none(email_address))
@classmethod
def from_email_address_and_password_or_none(cls, email_address, password):
user = cls.from_email_address_or_none(email_address)
if not user:
return None
if user.locked:
return None
if not user_api_client.verify_password(user.id, password):
return None
return user
@property
def permissions(self):
return self._permissions
@permissions.setter
def permissions(self, permissions_by_service):
"""
Permissions is a dict {'service_id': ['permission a', 'permission b', 'permission c']}
The api currently returns some granular permissions that we don't set or use separately (but may want
to in the future):
* send_texts, send_letters and send_emails become send_messages
* manage_user and manage_settings become
users either have all three permissions for a service or none of them, they're not helpful to distinguish
between on the front end. So lets collapse them into "send_messages" and "manage_service". If we want to split
them out later, we'll need to rework this function.
"""
self._permissions = {
service: translate_permissions_from_db_to_admin_roles(permissions)
for service, permissions
in permissions_by_service.items()
}
def update(self, **kwargs):
response = user_api_client.update_user_attribute(self.id, **kwargs)
self.__init__(response)
def update_password(self, password):
response = user_api_client.update_password(self.id, password)
self.__init__(response)
def set_permissions(self, service_id, permissions, folder_permissions):
user_api_client.set_user_permissions(
self.id,
service_id,
permissions=permissions,
folder_permissions=folder_permissions,
)
def logged_in_elsewhere(self):
# if the current user (ie: db object) has no session, they've never logged in before
return self.current_session_id is not None and session.get('current_session_id') != self.current_session_id
def activate(self):
if self.state == 'pending':
user_data = user_api_client.activate_user(self.id)
return self.__class__(user_data['data'])
else:
return self
def login(self):
login_user(self)
def sign_in(self):
session['user_details'] = {"email": self.email_address, "id": self.id}
if not self.is_active:
return False
if self.email_auth:
user_api_client.send_verify_code(self.id, 'email', None, request.args.get('next'))
if self.sms_auth:
user_api_client.send_verify_code(self.id, 'sms', self.mobile_number)
return True
@property
def sms_auth(self):
return self.auth_type == 'sms_auth'
@property
def email_auth(self):
return self.auth_type == 'email_auth'
def reset_failed_login_count(self):
user_api_client.reset_failed_login_count(self.id)
2016-05-04 13:01:55 +01:00
@property
def is_active(self):
return self.state == 'active'
@property
def is_gov_user(self):
return is_gov_user(self.email_address)
2016-05-04 13:01:55 +01:00
@property
def is_authenticated(self):
return (
not self.logged_in_elsewhere() and
super(User, self).is_authenticated
)
def has_permissions(self, *permissions, restrict_admin_usage=False):
unknown_permissions = set(permissions) - all_permissions
if unknown_permissions:
raise TypeError('{} are not valid permissions'.format(list(unknown_permissions)))
# Service id is always set on the request for service specific views.
service_id = _get_service_id_from_view_args()
org_id = _get_org_id_from_view_args()
if not service_id and not org_id:
# we shouldn't have any pages that require permissions, but don't specify a service or organisation.
# use @user_is_platform_admin for platform admin only pages
raise NotImplementedError
# platform admins should be able to do most things (except eg send messages, or create api keys)
if self.platform_admin and not restrict_admin_usage:
return True
if org_id:
return org_id in self.organisations
if not permissions:
return service_id in self.services
if service_id:
return any(x in self._permissions.get(service_id, []) for x in permissions)
def has_permission_for_service(self, service_id, permission):
return permission in self._permissions.get(service_id, [])
def has_template_folder_permission(self, template_folder, service=None):
if self.platform_admin:
return True
# Top-level templates are always visible
if template_folder is None or template_folder['id'] is None:
return True
return self.id in template_folder.get("users_with_permission", [])
def template_folders_for_service(self, service=None):
"""
Returns list of template folders that a user can view for a given service
"""
return [
template_folder
for template_folder in service.all_template_folders
if self.id in template_folder.get("users_with_permission", [])
]
def belongs_to_service(self, service_id):
return str(service_id) in self.services
def belongs_to_service_or_403(self, service_id):
if not self.belongs_to_service(service_id):
abort(403)
@property
def locked(self):
return self.failed_login_count >= self.max_failed_login_count
@property
def email_domain(self):
return self.email_address.split('@')[-1]
@cached_property
def default_organisation(self):
return Organisation(
organisations_client.get_organisation_by_domain(self.email_domain)
)
@property
def default_organisation_type(self):
if self.default_organisation:
return self.default_organisation.organisation_type
if self.has_nhs_email_address:
return 'nhs'
return None
@property
def has_nhs_email_address(self):
return self.email_address.lower().endswith((
'@nhs.uk', '.nhs.uk', '@nhs.net', '.nhs.net',
))
def serialize(self):
dct = {
"id": self.id,
"name": self.name,
"email_address": self.email_address,
"mobile_number": self.mobile_number,
"password_changed_at": self.password_changed_at,
"state": self.state,
"failed_login_count": self.failed_login_count,
"permissions": [x for x in self._permissions],
2018-02-19 16:53:29 +00:00
"organisations": self.organisations,
"current_session_id": self.current_session_id
}
if hasattr(self, '_password'):
dct['password'] = self._password
return dct
@classmethod
def register(
cls,
name,
email_address,
mobile_number,
password,
auth_type,
):
return cls(user_api_client.register_user(
name,
email_address,
mobile_number or None,
password,
auth_type,
))
def set_password(self, pwd):
self._password = pwd
def send_verify_email(self):
user_api_client.send_verify_email(self.id, self.email_address)
def send_verify_code(self, to=None):
user_api_client.send_verify_code(self.id, 'sms', to or self.mobile_number)
def send_already_registered_email(self):
user_api_client.send_already_registered_email(self.id, self.email_address)
def refresh_session_id(self):
self.current_session_id = user_api_client.get_user(self.id).get('current_session_id')
session['current_session_id'] = self.current_session_id
class InvitedUser(JSONModel):
ALLOWED_PROPERTIES = {
'id',
'service',
'from_user',
'email_address',
'permissions',
'status',
'created_at',
'auth_type',
'folder_permissions',
}
def __init__(self, _dict):
super().__init__(_dict)
self.permissions = _dict.get('permissions') or []
self._from_user = _dict['from_user']
@classmethod
def create(
cls,
invite_from_id,
service_id,
email_address,
permissions,
auth_type,
folder_permissions,
):
return cls(invite_api_client.create_invite(
invite_from_id,
service_id,
email_address,
permissions,
auth_type,
folder_permissions,
))
def accept_invite(self):
invite_api_client.accept_invite(self.service, self.id)
def add_to_service(self):
user_api_client.add_user_to_service(
self.service,
self.id,
self.permissions,
self.folder_permissions,
)
@property
def permissions(self):
return self._permissions
@permissions.setter
def permissions(self, permissions):
if isinstance(permissions, list):
self._permissions = permissions
else:
self._permissions = permissions.split(',')
self._permissions = translate_permissions_from_db_to_admin_roles(self.permissions)
@property
def from_user(self):
return User.from_id(self._from_user)
@property
def sms_auth(self):
return self.auth_type == 'sms_auth'
@property
def email_auth(self):
return self.auth_type == 'email_auth'
@classmethod
def from_token(cls, token):
try:
return cls(invite_api_client.check_token(token))
except HTTPError as exception:
if exception.status_code == 400 and 'invitation' in exception.message:
raise InviteTokenError(exception.message['invitation'])
else:
raise exception
@classmethod
def from_session(cls):
invited_user = session.get('invited_user')
return cls(invited_user) if invited_user else None
def has_permissions(self, *permissions):
if self.status == 'cancelled':
return False
2016-03-03 13:00:12 +00:00
return set(self.permissions) > set(permissions)
def has_permission_for_service(self, service_id, permission):
if self.status == 'cancelled':
return False
return self.service == service_id and permission in self.permissions
def __eq__(self, other):
return ((self.id,
self.service,
self._from_user,
self.email_address,
self.auth_type,
self.status) == (other.id,
other.service,
other._from_user,
other.email_address,
other.auth_type,
other.status))
def serialize(self, permissions_as_string=False):
data = {'id': self.id,
'service': self.service,
'from_user': self._from_user,
'email_address': self.email_address,
'status': self.status,
'created_at': str(self.created_at),
'auth_type': self.auth_type,
'folder_permissions': self.folder_permissions
}
if permissions_as_string:
data['permissions'] = ','.join(self.permissions)
else:
data['permissions'] = sorted(self.permissions)
return data
def template_folders_for_service(self, service=None):
# only used on the manage users page to display the count, so okay to not be fully fledged for now
return [{'id': x} for x in self.folder_permissions]
class InvitedOrgUser(JSONModel):
2018-02-19 16:53:29 +00:00
ALLOWED_PROPERTIES = {
'id',
'organisation',
'email_address',
'status',
'created_at',
}
def __init__(self, _dict):
super().__init__(_dict)
self._invited_by = _dict['invited_by']
2018-02-19 16:53:29 +00:00
def __eq__(self, other):
return ((self.id,
self.organisation,
self._invited_by,
2018-02-19 16:53:29 +00:00
self.email_address,
self.status) == (other.id,
other.organisation,
other._invited_by,
2018-02-19 16:53:29 +00:00
other.email_address,
other.status))
@classmethod
def create(cls, invite_from_id, org_id, email_address):
return cls(org_invite_api_client.create_invite(
invite_from_id, org_id, email_address
))
@classmethod
def from_session(cls):
invited_org_user = session.get('invited_org_user')
return cls(invited_org_user) if invited_org_user else None
2018-02-19 16:53:29 +00:00
def serialize(self, permissions_as_string=False):
data = {'id': self.id,
'organisation': self.organisation,
'invited_by': self._invited_by,
2018-02-19 16:53:29 +00:00
'email_address': self.email_address,
'status': self.status,
'created_at': str(self.created_at)
}
return data
@property
def invited_by(self):
return User.from_id(self._invited_by)
@classmethod
def from_token(cls, token):
try:
return cls(org_invite_api_client.check_token(token))
except HTTPError as exception:
if exception.status_code == 400 and 'invitation' in exception.message:
raise InviteTokenError(exception.message['invitation'])
else:
raise exception
def accept_invite(self):
org_invite_api_client.accept_invite(self.organisation, self.id)
def add_to_organisation(self):
user_api_client.add_user_to_organisation(self.organisation, self.id)
2018-02-19 16:53:29 +00:00
class AnonymousUser(AnonymousUserMixin):
# set the anonymous user so that if a new browser hits us we don't error http://stackoverflow.com/a/19275188
def logged_in_elsewhere(self):
return False
@property
def default_organisation(self):
return Organisation(None)
class Users(Sequence):
client = user_api_client.get_users_for_service
model = User
def __init__(self, service_id):
self.users = self.client(service_id)
def __getitem__(self, index):
return self.model(self.users[index])
def __len__(self):
return len(self.users)
def __add__(self, other):
return list(self) + list(other)
class OrganisationUsers(Users):
client = user_api_client.get_users_for_organisation
class InvitedUsers(Users):
client = invite_api_client.get_invites_for_service
model = InvitedUser
def __init__(self, service_id):
self.users = [
user for user in self.client(service_id)
if user['status'] != 'accepted'
]
class OrganisationInvitedUsers(InvitedUsers):
client = org_invite_api_client.get_invites_for_organisation
model = InvitedOrgUser