Files
notifications-admin/app/notify_client/user_api_client.py

226 lines
8.1 KiB
Python
Raw Normal View History

from notifications_python_client.errors import HTTPError
from app.notify_client import NotifyAdminAPIClient, cache
from app.utils.user_permissions import translate_permissions_from_ui_to_db
ALLOWED_ATTRIBUTES = {
'name',
'email_address',
'mobile_number',
'auth_type',
'updated_by',
'current_session_id'
}
class UserApiClient(NotifyAdminAPIClient):
def init_app(self, app):
super().init_app(app)
self.admin_url = app.config['ADMIN_BASE_URL']
def register_user(self, name, email_address, mobile_number, password, auth_type):
data = {
"name": name,
"email_address": email_address,
"mobile_number": mobile_number,
"password": password,
"auth_type": auth_type
}
user_data = self.post("/user", data)
return user_data['data']
Cache `GET /user` response in Redis In the same way, and for the same reasons that we’re caching the service object. Here’s a sample of the data returned by the API – so we should make sure that any changes to this data invalidate the cache. If we ever change a user’s phone number (for example) directly in the database, then we will need to invalidate this cache manually. ```python {      'data':{         'organisations':[            '4c707b81-4c6d-4d33-9376-17f0de6e0405'       ],       'logged_in_at':'2018-04-10T11:41:03.781990Z',       'id':'2c45486e-177e-40b8-997d-5f4f81a461ca',       'email_address':'test@example.gov.uk',       'platform_admin':False,       'password_changed_at':'2018-01-01 10:10:10.100000',       'permissions':{            '42a9d4f2-1444-4e22-9133-52d9e406213f':[               'manage_api_keys',             'send_letters',             'manage_users',             'manage_templates',             'view_activity',             'send_texts',             'send_emails',             'manage_settings'          ],          'a928eef8-0f25-41ca-b480-0447f29b2c20':[               'manage_users',             'manage_templates',             'manage_settings',             'send_texts',             'send_emails',             'send_letters',             'manage_api_keys',             'view_activity'          ],       },       'state':'active',       'mobile_number':'07700900123',       'failed_login_count':0,       'name':'Example',       'services':[            '6078a8c0-52f5-4c4f-b724-d7d1ff2d3884',          '6afe3c1c-7fda-4d8d-aa8d-769c4bdf7803',       ],       'current_session_id':'fea2ade1-db0a-4c90-93e7-c64a877ce83e',       'auth_type':'sms_auth'    } } ```
2018-04-10 13:30:52 +01:00
def get_user(self, user_id):
return self._get_user(user_id)['data']
Cache `GET /user` response in Redis In the same way, and for the same reasons that we’re caching the service object. Here’s a sample of the data returned by the API – so we should make sure that any changes to this data invalidate the cache. If we ever change a user’s phone number (for example) directly in the database, then we will need to invalidate this cache manually. ```python {      'data':{         'organisations':[            '4c707b81-4c6d-4d33-9376-17f0de6e0405'       ],       'logged_in_at':'2018-04-10T11:41:03.781990Z',       'id':'2c45486e-177e-40b8-997d-5f4f81a461ca',       'email_address':'test@example.gov.uk',       'platform_admin':False,       'password_changed_at':'2018-01-01 10:10:10.100000',       'permissions':{            '42a9d4f2-1444-4e22-9133-52d9e406213f':[               'manage_api_keys',             'send_letters',             'manage_users',             'manage_templates',             'view_activity',             'send_texts',             'send_emails',             'manage_settings'          ],          'a928eef8-0f25-41ca-b480-0447f29b2c20':[               'manage_users',             'manage_templates',             'manage_settings',             'send_texts',             'send_emails',             'send_letters',             'manage_api_keys',             'view_activity'          ],       },       'state':'active',       'mobile_number':'07700900123',       'failed_login_count':0,       'name':'Example',       'services':[            '6078a8c0-52f5-4c4f-b724-d7d1ff2d3884',          '6afe3c1c-7fda-4d8d-aa8d-769c4bdf7803',       ],       'current_session_id':'fea2ade1-db0a-4c90-93e7-c64a877ce83e',       'auth_type':'sms_auth'    } } ```
2018-04-10 13:30:52 +01:00
@cache.set('user-{user_id}')
Cache `GET /user` response in Redis In the same way, and for the same reasons that we’re caching the service object. Here’s a sample of the data returned by the API – so we should make sure that any changes to this data invalidate the cache. If we ever change a user’s phone number (for example) directly in the database, then we will need to invalidate this cache manually. ```python {      'data':{         'organisations':[            '4c707b81-4c6d-4d33-9376-17f0de6e0405'       ],       'logged_in_at':'2018-04-10T11:41:03.781990Z',       'id':'2c45486e-177e-40b8-997d-5f4f81a461ca',       'email_address':'test@example.gov.uk',       'platform_admin':False,       'password_changed_at':'2018-01-01 10:10:10.100000',       'permissions':{            '42a9d4f2-1444-4e22-9133-52d9e406213f':[               'manage_api_keys',             'send_letters',             'manage_users',             'manage_templates',             'view_activity',             'send_texts',             'send_emails',             'manage_settings'          ],          'a928eef8-0f25-41ca-b480-0447f29b2c20':[               'manage_users',             'manage_templates',             'manage_settings',             'send_texts',             'send_emails',             'send_letters',             'manage_api_keys',             'view_activity'          ],       },       'state':'active',       'mobile_number':'07700900123',       'failed_login_count':0,       'name':'Example',       'services':[            '6078a8c0-52f5-4c4f-b724-d7d1ff2d3884',          '6afe3c1c-7fda-4d8d-aa8d-769c4bdf7803',       ],       'current_session_id':'fea2ade1-db0a-4c90-93e7-c64a877ce83e',       'auth_type':'sms_auth'    } } ```
2018-04-10 13:30:52 +01:00
def _get_user(self, user_id):
return self.get("/user/{}".format(user_id))
def get_user_by_email(self, email_address):
2021-03-05 15:05:48 +00:00
user_data = self.post('/user/email', data={'email': email_address})
return user_data['data']
def get_user_by_email_or_none(self, email_address):
try:
return self.get_user_by_email(email_address)
except HTTPError as e:
if e.status_code == 404:
return None
raise e
2016-01-21 11:33:53 +00:00
@cache.delete('user-{user_id}')
def update_user_attribute(self, user_id, **kwargs):
data = dict(kwargs)
disallowed_attributes = set(data.keys()) - ALLOWED_ATTRIBUTES
if disallowed_attributes:
raise TypeError('Not allowed to update user attributes: {}'.format(
", ".join(disallowed_attributes)
))
2016-11-10 12:10:01 +00:00
url = "/user/{}".format(user_id)
user_data = self.post(url, data=data)
return user_data['data']
@cache.delete('user-{user_id}')
def archive_user(self, user_id):
return self.post('/user/{}/archive'.format(user_id), data=None)
@cache.delete('user-{user_id}')
def reset_failed_login_count(self, user_id):
url = "/user/{}/reset-failed-login-count".format(user_id)
user_data = self.post(url, data={})
return user_data['data']
@cache.delete('user-{user_id}')
def update_password(self, user_id, password, validated_email_access=False):
data = {"_password": password}
if validated_email_access:
data["validated_email_access"] = validated_email_access
url = "/user/{}/update-password".format(user_id)
user_data = self.post(url, data=data)
return user_data['data']
@cache.delete('user-{user_id}')
2016-01-27 17:26:22 +00:00
def verify_password(self, user_id, password):
2016-01-21 11:33:53 +00:00
try:
2016-01-27 17:26:22 +00:00
url = "/user/{}/verify/password".format(user_id)
data = {"password": password}
2016-01-27 17:13:56 +00:00
self.post(url, data=data)
return True
2016-01-21 11:33:53 +00:00
except HTTPError as e:
if e.status_code == 400 or e.status_code == 404:
return False
def send_verify_code(self, user_id, code_type, to, next_string=None):
data = {'to': to}
if next_string:
data['next'] = next_string
if code_type == 'email':
data['email_auth_link_host'] = self.admin_url
endpoint = '/user/{0}/{1}-code'.format(user_id, code_type)
self.post(endpoint, data=data)
def send_verify_email(self, user_id, to):
data = {'to': to}
endpoint = '/user/{0}/email-verification'.format(user_id)
self.post(endpoint, data=data)
def send_already_registered_email(self, user_id, to):
data = {'email': to}
endpoint = '/user/{0}/email-already-registered'.format(user_id)
self.post(endpoint, data=data)
@cache.delete('user-{user_id}')
def check_verify_code(self, user_id, code, code_type):
data = {'code_type': code_type, 'code': code}
endpoint = '/user/{}/verify/code'.format(user_id)
try:
self.post(endpoint, data=data)
return True, ''
except HTTPError as e:
if e.status_code == 400 or e.status_code == 404:
return False, e.message
raise e
@cache.delete('user-{user_id}')
def complete_webauthn_login_attempt(self, user_id, is_successful):
data = {'successful': is_successful}
endpoint = f'/user/{user_id}/complete/webauthn-login'
try:
self.post(endpoint, data=data)
return True, ''
except HTTPError as e:
if e.status_code == 403:
return False, e.message
raise e
def get_users_for_service(self, service_id):
endpoint = '/service/{}/users'.format(service_id)
return self.get(endpoint)['data']
2018-02-19 16:53:29 +00:00
def get_users_for_organisation(self, org_id):
endpoint = '/organisations/{}/users'.format(org_id)
return self.get(endpoint)['data']
2018-02-19 16:53:29 +00:00
@cache.delete('service-{service_id}')
@cache.delete('service-{service_id}-template-folders')
@cache.delete('user-{user_id}')
def add_user_to_service(self, service_id, user_id, permissions, folder_permissions):
# permissions passed in are the combined admin roles, not db permissions
endpoint = '/service/{}/users/{}'.format(service_id, user_id)
data = {
'permissions': [{'permission': x} for x in translate_permissions_from_ui_to_db(permissions)],
'folder_permissions': folder_permissions,
}
self.post(endpoint, data=data)
@cache.delete('user-{user_id}')
2018-02-19 16:53:29 +00:00
def add_user_to_organisation(self, org_id, user_id):
resp = self.post('/organisations/{}/users/{}'.format(org_id, user_id), data={})
return resp['data']
2018-02-19 16:53:29 +00:00
@cache.delete('service-{service_id}-template-folders')
@cache.delete('user-{user_id}')
def set_user_permissions(self, user_id, service_id, permissions, folder_permissions=None):
# permissions passed in are the combined admin roles, not db permissions
data = {
'permissions': [{'permission': x} for x in translate_permissions_from_ui_to_db(permissions)],
}
if folder_permissions is not None:
data['folder_permissions'] = folder_permissions
endpoint = '/user/{}/service/{}/permission'.format(user_id, service_id)
2016-03-07 18:18:52 +00:00
self.post(endpoint, data=data)
def send_reset_password_url(self, email_address, next_string=None):
2016-03-07 18:18:52 +00:00
endpoint = '/user/reset-password'
data = {'email': email_address}
if next_string:
data['next'] = next_string
2016-03-07 18:18:52 +00:00
self.post(endpoint, data=data)
def find_users_by_full_or_partial_email(self, email_address):
endpoint = '/user/find-users-by-email'
data = {'email': email_address}
users = self.post(endpoint, data=data)
return users
@cache.delete('user-{user_id}')
def activate_user(self, user_id):
Cache `GET /user` response in Redis In the same way, and for the same reasons that we’re caching the service object. Here’s a sample of the data returned by the API – so we should make sure that any changes to this data invalidate the cache. If we ever change a user’s phone number (for example) directly in the database, then we will need to invalidate this cache manually. ```python {      'data':{         'organisations':[            '4c707b81-4c6d-4d33-9376-17f0de6e0405'       ],       'logged_in_at':'2018-04-10T11:41:03.781990Z',       'id':'2c45486e-177e-40b8-997d-5f4f81a461ca',       'email_address':'test@example.gov.uk',       'platform_admin':False,       'password_changed_at':'2018-01-01 10:10:10.100000',       'permissions':{            '42a9d4f2-1444-4e22-9133-52d9e406213f':[               'manage_api_keys',             'send_letters',             'manage_users',             'manage_templates',             'view_activity',             'send_texts',             'send_emails',             'manage_settings'          ],          'a928eef8-0f25-41ca-b480-0447f29b2c20':[               'manage_users',             'manage_templates',             'manage_settings',             'send_texts',             'send_emails',             'send_letters',             'manage_api_keys',             'view_activity'          ],       },       'state':'active',       'mobile_number':'07700900123',       'failed_login_count':0,       'name':'Example',       'services':[            '6078a8c0-52f5-4c4f-b724-d7d1ff2d3884',          '6afe3c1c-7fda-4d8d-aa8d-769c4bdf7803',       ],       'current_session_id':'fea2ade1-db0a-4c90-93e7-c64a877ce83e',       'auth_type':'sms_auth'    } } ```
2018-04-10 13:30:52 +01:00
return self.post("/user/{}/activate".format(user_id), data=None)
def send_change_email_verification(self, user_id, new_email):
endpoint = '/user/{}/change-email-verification'.format(user_id)
data = {'email': new_email}
self.post(endpoint, data)
def get_organisations_and_services_for_user(self, user_id):
endpoint = '/user/{}/organisations-and-services'.format(user_id)
return self.get(endpoint)
def get_webauthn_credentials_for_user(self, user_id):
endpoint = f'/user/{user_id}/webauthn'
return self.get(endpoint)['data']
Support registering a new authenticator This adds Yubico's FIDO2 library and two APIs for working with the "navigator.credentials.create()" function in JavaScript. The GET API uses the library to generate options for the "create()" function, and the POST API decodes and verifies the resulting credential. While the options and response are dict-like, CBOR is necessary to encode some of the byte-level values, which can't be represented in JSON. Much of the code here is based on the Yubico library example [1][2]. Implementation notes: - There are definitely better ways to alert the user about failure, but window.alert() will do for the time being. Using location.reload() is also a bit jarring if the page scrolls, but not a major issue. - Ideally we would use window.fetch() to do AJAX calls, but we don't have a polyfill for this, and we use $.ajax() elsewhere [3]. We need to do a few weird tricks [6] to stop jQuery trashing the data. - The FIDO2 server doesn't serve web requests; it's just a "server" in the sense of WebAuthn terminology. It lives in its own module, since it needs to be initialised with the app / config. - $.ajax returns a promise-like object. Although we've used ".fail()" elsewhere [3], I couldn't find a stub object that supports it, so I've gone for ".catch()", and used a Promise stub object in tests. - WebAuthn only works over HTTPS, but there's an exception for "localhost" [4]. However, the library is a bit too strict [5], so we have to disable origin verification to avoid needing HTTPS for dev work. [1]: https://github.com/Yubico/python-fido2/blob/c42d9628a4f33d20c4401096fa8d3fc466d5b77f/examples/server/server.py [2]: https://github.com/Yubico/python-fido2/blob/c42d9628a4f33d20c4401096fa8d3fc466d5b77f/examples/server/static/register.html [3]: https://github.com/alphagov/notifications-admin/blob/91453d36395b7a0cf2998dfb8a5f52cc9e96640f/app/assets/javascripts/updateContent.js#L33 [4]: https://stackoverflow.com/questions/55971593/navigator-credentials-is-null-on-local-server [5]: https://github.com/Yubico/python-fido2/blob/c42d9628a4f33d20c4401096fa8d3fc466d5b77f/fido2/rpid.py#L69 [6]: https://stackoverflow.com/questions/12394622/does-jquery-ajax-or-load-allow-for-responsetype-arraybuffer
2021-05-07 18:10:07 +01:00
def create_webauthn_credential_for_user(self, user_id, credential):
endpoint = f'/user/{user_id}/webauthn'
Support registering a new authenticator This adds Yubico's FIDO2 library and two APIs for working with the "navigator.credentials.create()" function in JavaScript. The GET API uses the library to generate options for the "create()" function, and the POST API decodes and verifies the resulting credential. While the options and response are dict-like, CBOR is necessary to encode some of the byte-level values, which can't be represented in JSON. Much of the code here is based on the Yubico library example [1][2]. Implementation notes: - There are definitely better ways to alert the user about failure, but window.alert() will do for the time being. Using location.reload() is also a bit jarring if the page scrolls, but not a major issue. - Ideally we would use window.fetch() to do AJAX calls, but we don't have a polyfill for this, and we use $.ajax() elsewhere [3]. We need to do a few weird tricks [6] to stop jQuery trashing the data. - The FIDO2 server doesn't serve web requests; it's just a "server" in the sense of WebAuthn terminology. It lives in its own module, since it needs to be initialised with the app / config. - $.ajax returns a promise-like object. Although we've used ".fail()" elsewhere [3], I couldn't find a stub object that supports it, so I've gone for ".catch()", and used a Promise stub object in tests. - WebAuthn only works over HTTPS, but there's an exception for "localhost" [4]. However, the library is a bit too strict [5], so we have to disable origin verification to avoid needing HTTPS for dev work. [1]: https://github.com/Yubico/python-fido2/blob/c42d9628a4f33d20c4401096fa8d3fc466d5b77f/examples/server/server.py [2]: https://github.com/Yubico/python-fido2/blob/c42d9628a4f33d20c4401096fa8d3fc466d5b77f/examples/server/static/register.html [3]: https://github.com/alphagov/notifications-admin/blob/91453d36395b7a0cf2998dfb8a5f52cc9e96640f/app/assets/javascripts/updateContent.js#L33 [4]: https://stackoverflow.com/questions/55971593/navigator-credentials-is-null-on-local-server [5]: https://github.com/Yubico/python-fido2/blob/c42d9628a4f33d20c4401096fa8d3fc466d5b77f/fido2/rpid.py#L69 [6]: https://stackoverflow.com/questions/12394622/does-jquery-ajax-or-load-allow-for-responsetype-arraybuffer
2021-05-07 18:10:07 +01:00
return self.post(endpoint, data=credential.serialize())
Support registering a new authenticator This adds Yubico's FIDO2 library and two APIs for working with the "navigator.credentials.create()" function in JavaScript. The GET API uses the library to generate options for the "create()" function, and the POST API decodes and verifies the resulting credential. While the options and response are dict-like, CBOR is necessary to encode some of the byte-level values, which can't be represented in JSON. Much of the code here is based on the Yubico library example [1][2]. Implementation notes: - There are definitely better ways to alert the user about failure, but window.alert() will do for the time being. Using location.reload() is also a bit jarring if the page scrolls, but not a major issue. - Ideally we would use window.fetch() to do AJAX calls, but we don't have a polyfill for this, and we use $.ajax() elsewhere [3]. We need to do a few weird tricks [6] to stop jQuery trashing the data. - The FIDO2 server doesn't serve web requests; it's just a "server" in the sense of WebAuthn terminology. It lives in its own module, since it needs to be initialised with the app / config. - $.ajax returns a promise-like object. Although we've used ".fail()" elsewhere [3], I couldn't find a stub object that supports it, so I've gone for ".catch()", and used a Promise stub object in tests. - WebAuthn only works over HTTPS, but there's an exception for "localhost" [4]. However, the library is a bit too strict [5], so we have to disable origin verification to avoid needing HTTPS for dev work. [1]: https://github.com/Yubico/python-fido2/blob/c42d9628a4f33d20c4401096fa8d3fc466d5b77f/examples/server/server.py [2]: https://github.com/Yubico/python-fido2/blob/c42d9628a4f33d20c4401096fa8d3fc466d5b77f/examples/server/static/register.html [3]: https://github.com/alphagov/notifications-admin/blob/91453d36395b7a0cf2998dfb8a5f52cc9e96640f/app/assets/javascripts/updateContent.js#L33 [4]: https://stackoverflow.com/questions/55971593/navigator-credentials-is-null-on-local-server [5]: https://github.com/Yubico/python-fido2/blob/c42d9628a4f33d20c4401096fa8d3fc466d5b77f/fido2/rpid.py#L69 [6]: https://stackoverflow.com/questions/12394622/does-jquery-ajax-or-load-allow-for-responsetype-arraybuffer
2021-05-07 18:10:07 +01:00
def update_webauthn_credential_name_for_user(self, *, user_id, credential_id, new_name_for_credential):
endpoint = f'/user/{user_id}/webauthn/{credential_id}'
return self.post(endpoint, data={"name": new_name_for_credential})
def delete_webauthn_credential_for_user(self, *, user_id, credential_id):
endpoint = f'/user/{user_id}/webauthn/{credential_id}'
return self.delete(endpoint)
user_api_client = UserApiClient()