mirror of
https://github.com/GSA/notifications-api.git
synced 2026-06-21 05:30:55 -04:00
@@ -178,6 +178,7 @@ def register_blueprint(application):
|
||||
)
|
||||
from app.upload.rest import upload_blueprint
|
||||
from app.user.rest import user_blueprint
|
||||
from app.webauthn.rest import webauthn_blueprint
|
||||
|
||||
service_blueprint.before_request(requires_admin_auth)
|
||||
application.register_blueprint(service_blueprint, url_prefix='/service')
|
||||
@@ -185,6 +186,9 @@ def register_blueprint(application):
|
||||
user_blueprint.before_request(requires_admin_auth)
|
||||
application.register_blueprint(user_blueprint, url_prefix='/user')
|
||||
|
||||
webauthn_blueprint.before_request(requires_admin_auth)
|
||||
application.register_blueprint(webauthn_blueprint)
|
||||
|
||||
template_blueprint.before_request(requires_admin_auth)
|
||||
application.register_blueprint(template_blueprint)
|
||||
|
||||
|
||||
40
app/dao/webauthn_credential_dao.py
Normal file
40
app/dao/webauthn_credential_dao.py
Normal file
@@ -0,0 +1,40 @@
|
||||
from app import db
|
||||
from app.dao.dao_utils import autocommit
|
||||
from app.models import WebauthnCredential
|
||||
|
||||
|
||||
def dao_get_webauthn_credential_by_user_and_id(user_id, webauthn_credential_id):
|
||||
return WebauthnCredential.query.filter(
|
||||
WebauthnCredential.user_id == user_id,
|
||||
WebauthnCredential.id == webauthn_credential_id
|
||||
).one()
|
||||
|
||||
|
||||
@autocommit
|
||||
def dao_create_webauthn_credential(
|
||||
*,
|
||||
user_id,
|
||||
name,
|
||||
credential_data,
|
||||
registration_response,
|
||||
):
|
||||
webauthn_credential = WebauthnCredential(
|
||||
user_id=user_id,
|
||||
name=name,
|
||||
credential_data=credential_data,
|
||||
registration_response=registration_response
|
||||
)
|
||||
db.session.add(webauthn_credential)
|
||||
return webauthn_credential
|
||||
|
||||
|
||||
@autocommit
|
||||
def dao_update_webauthn_credential_name(webauthn_credential, new_name):
|
||||
webauthn_credential.name = new_name
|
||||
db.session.add(webauthn_credential)
|
||||
return webauthn_credential
|
||||
|
||||
|
||||
@autocommit
|
||||
def dao_delete_webauthn_credential(webauthn_credential):
|
||||
db.session.delete(webauthn_credential)
|
||||
@@ -2592,3 +2592,36 @@ class ServiceBroadcastProviderRestriction(db.Model):
|
||||
provider = db.Column(db.String, nullable=False)
|
||||
|
||||
created_at = db.Column(db.DateTime, nullable=False, default=datetime.datetime.utcnow)
|
||||
|
||||
|
||||
class WebauthnCredential(db.Model):
|
||||
"""
|
||||
A table that stores data for registered webauthn credentials.
|
||||
"""
|
||||
__tablename__ = "webauthn_credential"
|
||||
|
||||
id = db.Column(UUID(as_uuid=True), primary_key=True, nullable=False, default=uuid.uuid4)
|
||||
|
||||
user_id = db.Column(UUID(as_uuid=True), db.ForeignKey('users.id'), nullable=False)
|
||||
user = db.relationship(User, backref=db.backref("webauthn_credentials"))
|
||||
|
||||
name = db.Column(db.String, nullable=False)
|
||||
|
||||
# base64 encoded CBOR. used for logging in. https://w3c.github.io/webauthn/#sctn-attested-credential-data
|
||||
credential_data = db.Column(db.String, nullable=False)
|
||||
|
||||
# base64 encoded CBOR. used for auditing. https://www.w3.org/TR/webauthn-2/#authenticatorattestationresponse
|
||||
registration_response = db.Column(db.String, nullable=False)
|
||||
|
||||
created_at = db.Column(db.DateTime, nullable=False, default=datetime.datetime.utcnow)
|
||||
updated_at = db.Column(db.DateTime, nullable=True, onupdate=datetime.datetime.utcnow)
|
||||
|
||||
def serialize(self):
|
||||
return {
|
||||
'id': str(self.id),
|
||||
'user_id': str(self.user_id),
|
||||
'name': self.name,
|
||||
'credential_data': self.credential_data,
|
||||
'created_at': self.created_at.strftime(DATETIME_FORMAT),
|
||||
'updated_at': get_dt_string_or_none(self.updated_at),
|
||||
}
|
||||
|
||||
0
app/webauthn/__init__.py
Normal file
0
app/webauthn/__init__.py
Normal file
64
app/webauthn/rest.py
Normal file
64
app/webauthn/rest.py
Normal file
@@ -0,0 +1,64 @@
|
||||
from flask import Blueprint, jsonify, request
|
||||
|
||||
from app.dao.users_dao import get_user_by_id
|
||||
from app.dao.webauthn_credential_dao import (
|
||||
dao_create_webauthn_credential,
|
||||
dao_delete_webauthn_credential,
|
||||
dao_get_webauthn_credential_by_user_and_id,
|
||||
dao_update_webauthn_credential_name,
|
||||
)
|
||||
from app.errors import InvalidRequest, register_errors
|
||||
from app.schema_validation import validate
|
||||
from app.webauthn.webauthn_schema import (
|
||||
post_create_webauthn_credential_schema,
|
||||
post_update_webauthn_credential_schema,
|
||||
)
|
||||
|
||||
webauthn_blueprint = Blueprint('webauthn', __name__, url_prefix='/user/<uuid:user_id>/webauthn')
|
||||
register_errors(webauthn_blueprint)
|
||||
|
||||
|
||||
@webauthn_blueprint.route('', methods=['GET'])
|
||||
def get_webauthn_credentials(user_id):
|
||||
user = get_user_by_id(user_id)
|
||||
return jsonify(data=[cred.serialize() for cred in user.webauthn_credentials]), 200
|
||||
|
||||
|
||||
@webauthn_blueprint.route('', methods=['POST'])
|
||||
def create_webauthn_credential(user_id):
|
||||
data = request.get_json()
|
||||
validate(data, post_create_webauthn_credential_schema)
|
||||
webauthn_credential = dao_create_webauthn_credential(
|
||||
user_id=user_id,
|
||||
name=data['name'],
|
||||
credential_data=data['credential_data'],
|
||||
registration_response=data['registration_response']
|
||||
)
|
||||
|
||||
return jsonify(data=webauthn_credential.serialize()), 201
|
||||
|
||||
|
||||
@webauthn_blueprint.route('/<uuid:webauthn_credential_id>', methods=['POST'])
|
||||
def update_webauthn_credential(user_id, webauthn_credential_id):
|
||||
data = request.get_json()
|
||||
validate(data, post_update_webauthn_credential_schema)
|
||||
|
||||
webauthn_credential = dao_get_webauthn_credential_by_user_and_id(user_id, webauthn_credential_id)
|
||||
|
||||
dao_update_webauthn_credential_name(webauthn_credential, data['name'])
|
||||
|
||||
return jsonify(data=webauthn_credential.serialize()), 200
|
||||
|
||||
|
||||
@webauthn_blueprint.route('/<uuid:webauthn_credential_id>', methods=['DELETE'])
|
||||
def delete_webauthn_credential(user_id, webauthn_credential_id):
|
||||
webauthn_credential = dao_get_webauthn_credential_by_user_and_id(user_id, webauthn_credential_id)
|
||||
user = get_user_by_id(user_id)
|
||||
|
||||
if len(user.webauthn_credentials) == 1:
|
||||
# TODO: Only raise an error if user has auth type webauthn_auth
|
||||
raise InvalidRequest('Cannot delete last remaining webauthn credential for user', status_code=400)
|
||||
|
||||
dao_delete_webauthn_credential(webauthn_credential)
|
||||
|
||||
return '', 204
|
||||
23
app/webauthn/webauthn_schema.py
Normal file
23
app/webauthn/webauthn_schema.py
Normal file
@@ -0,0 +1,23 @@
|
||||
post_create_webauthn_credential_schema = {
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"description": "POST webauthn_credential schema",
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"name": {"type": "string", "minLength": 1},
|
||||
"credential_data": {"type": "string", "minLength": 1},
|
||||
"registration_response": {"type": "string", "minLength": 1},
|
||||
},
|
||||
"required": ["name", "credential_data", "registration_response"],
|
||||
"additionalProperties": False
|
||||
}
|
||||
|
||||
post_update_webauthn_credential_schema = {
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"description": "POST update webauthn_credential schema",
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"name": {"type": "string", "minLength": 1},
|
||||
},
|
||||
"required": ["name"],
|
||||
"additionalProperties": False
|
||||
}
|
||||
35
migrations/versions/0355_add_webauthn_table.py
Normal file
35
migrations/versions/0355_add_webauthn_table.py
Normal file
@@ -0,0 +1,35 @@
|
||||
"""
|
||||
|
||||
Revision ID: 0355_add_webauthn_table
|
||||
Revises: 0354_government_channel
|
||||
Create Date: 2021-05-07 17:04:22.017137
|
||||
|
||||
"""
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
from sqlalchemy.dialects import postgresql
|
||||
|
||||
revision = '0355_add_webauthn_table'
|
||||
down_revision = '0354_government_channel'
|
||||
|
||||
|
||||
def upgrade():
|
||||
op.create_table(
|
||||
'webauthn_credential',
|
||||
sa.Column('id', postgresql.UUID(as_uuid=True), nullable=False),
|
||||
sa.Column('user_id', postgresql.UUID(as_uuid=True), nullable=False),
|
||||
sa.Column('name', sa.String(), nullable=False),
|
||||
|
||||
sa.Column('credential_data', sa.String(), nullable=False),
|
||||
sa.Column('registration_response', sa.String(), nullable=False),
|
||||
|
||||
sa.Column('created_at', sa.DateTime(), nullable=False),
|
||||
sa.Column('updated_at', sa.DateTime(), nullable=True),
|
||||
|
||||
sa.ForeignKeyConstraint(['user_id'], ['users.id'], ),
|
||||
sa.PrimaryKeyConstraint('id')
|
||||
)
|
||||
|
||||
|
||||
def downgrade():
|
||||
op.drop_table('webauthn_credential')
|
||||
@@ -74,6 +74,7 @@ from app.models import (
|
||||
Template,
|
||||
TemplateFolder,
|
||||
User,
|
||||
WebauthnCredential,
|
||||
)
|
||||
|
||||
|
||||
@@ -1202,3 +1203,22 @@ def create_broadcast_provider_message(
|
||||
db.session.add(provider_message_number)
|
||||
db.session.commit()
|
||||
return provider_message
|
||||
|
||||
|
||||
def create_webauthn_credential(
|
||||
user,
|
||||
name='my key',
|
||||
*,
|
||||
credential_data='ABC123',
|
||||
registration_response='DEF456',
|
||||
):
|
||||
webauthn_credential = WebauthnCredential(
|
||||
user=user,
|
||||
name=name,
|
||||
credential_data=credential_data,
|
||||
registration_response=registration_response
|
||||
)
|
||||
|
||||
db.session.add(webauthn_credential)
|
||||
db.session.commit()
|
||||
return webauthn_credential
|
||||
|
||||
0
tests/app/webauthn/__init__.py
Normal file
0
tests/app/webauthn/__init__.py
Normal file
215
tests/app/webauthn/test_rest.py
Normal file
215
tests/app/webauthn/test_rest.py
Normal file
@@ -0,0 +1,215 @@
|
||||
import uuid
|
||||
from unittest.mock import ANY
|
||||
|
||||
import pytest
|
||||
|
||||
from tests.app.db import create_user, create_webauthn_credential
|
||||
|
||||
|
||||
def test_get_webauthn_credentials_returns_all_credentials_for_user(admin_request, notify_db_session):
|
||||
me = create_user(email='a')
|
||||
other = create_user(email='b')
|
||||
first = create_webauthn_credential(me, '1')
|
||||
create_webauthn_credential(me, '2')
|
||||
create_webauthn_credential(other, '3')
|
||||
|
||||
response = admin_request.get(
|
||||
'webauthn.get_webauthn_credentials',
|
||||
user_id=me.id,
|
||||
)
|
||||
|
||||
creds = sorted(response['data'], key=lambda x: x['name'])
|
||||
assert len(creds) == 2
|
||||
|
||||
assert creds[0] == {
|
||||
'id': str(first.id),
|
||||
'user_id': str(me.id),
|
||||
'name': '1',
|
||||
'credential_data': 'ABC123',
|
||||
'created_at': ANY,
|
||||
'updated_at': None
|
||||
}
|
||||
|
||||
assert creds[1]['name'] == '2'
|
||||
|
||||
|
||||
def test_get_webauthn_credentials_returns_empty_list_if_no_creds(admin_request, sample_user):
|
||||
response = admin_request.get('webauthn.get_webauthn_credentials', user_id=sample_user.id)
|
||||
assert response == {'data': []}
|
||||
|
||||
|
||||
def test_get_webauthn_credentials_errors_if_user_doesnt_exist(admin_request, sample_user):
|
||||
create_webauthn_credential(sample_user, '1')
|
||||
|
||||
admin_request.get(
|
||||
'webauthn.get_webauthn_credentials',
|
||||
user_id=uuid.uuid4(),
|
||||
_expected_status=404
|
||||
)
|
||||
|
||||
|
||||
def test_create_webauthn_credential_returns_201(admin_request, sample_user):
|
||||
response = admin_request.post(
|
||||
'webauthn.create_webauthn_credential',
|
||||
user_id=sample_user.id,
|
||||
_data={
|
||||
'name': 'my key',
|
||||
'credential_data': 'ABC123',
|
||||
'registration_response': 'DEF456',
|
||||
},
|
||||
_expected_status=201
|
||||
)
|
||||
assert len(sample_user.webauthn_credentials) == 1
|
||||
|
||||
new_cred = sample_user.webauthn_credentials[0]
|
||||
|
||||
assert new_cred.name == 'my key'
|
||||
assert new_cred.credential_data == 'ABC123'
|
||||
assert new_cred.registration_response == 'DEF456'
|
||||
assert response['data']['id'] == str(new_cred.id)
|
||||
|
||||
|
||||
@pytest.mark.parametrize('data, err_msg', [
|
||||
# missing registration_response
|
||||
(
|
||||
{'name': 'my key', 'credential_data': 'ABC123'},
|
||||
'registration_response is a required property'
|
||||
),
|
||||
# name is null
|
||||
(
|
||||
{'name': None, 'credential_data': 'ABC123'},
|
||||
'name None is not of type string'
|
||||
),
|
||||
# name is empty
|
||||
(
|
||||
{'name': '', 'credential_data': 'ABC123'},
|
||||
'name is too short'
|
||||
),
|
||||
])
|
||||
def test_create_webauthn_credential_errors_if_schema_violation(admin_request, sample_user, data, err_msg):
|
||||
response = admin_request.post(
|
||||
'webauthn.create_webauthn_credential',
|
||||
user_id=sample_user.id,
|
||||
_data=data,
|
||||
_expected_status=400
|
||||
)
|
||||
assert response['errors'][0] == {
|
||||
'error': 'ValidationError',
|
||||
'message': err_msg
|
||||
}
|
||||
|
||||
|
||||
def test_update_webauthn_credential_returns_200(admin_request, sample_user):
|
||||
cred = create_webauthn_credential(sample_user)
|
||||
assert cred.name != 'new name'
|
||||
|
||||
response = admin_request.post(
|
||||
'webauthn.update_webauthn_credential',
|
||||
user_id=sample_user.id,
|
||||
webauthn_credential_id=cred.id,
|
||||
_data={
|
||||
'name': 'new name',
|
||||
},
|
||||
)
|
||||
assert response['data']['id'] == str(cred.id)
|
||||
assert response['data']['name'] == 'new name'
|
||||
|
||||
|
||||
@pytest.mark.parametrize('data, err_msg', [
|
||||
# you can't update credential_data
|
||||
(
|
||||
{'name': 'my key', 'credential_data': 'NAUGHTY123'},
|
||||
'Additional properties are not allowed (credential_data was unexpected)'
|
||||
),
|
||||
# name is null
|
||||
(
|
||||
{'name': None},
|
||||
'name None is not of type string'
|
||||
),
|
||||
# name is empty
|
||||
(
|
||||
{'name': ''},
|
||||
'name is too short'
|
||||
),
|
||||
])
|
||||
def test_update_webauthn_credential_errors_if_schema_violation(admin_request, sample_user, data, err_msg):
|
||||
cred = create_webauthn_credential(sample_user)
|
||||
response = admin_request.post(
|
||||
'webauthn.update_webauthn_credential',
|
||||
user_id=sample_user.id,
|
||||
webauthn_credential_id=cred.id,
|
||||
_data=data,
|
||||
_expected_status=400
|
||||
)
|
||||
assert response['errors'][0] == {
|
||||
'error': 'ValidationError',
|
||||
'message': err_msg
|
||||
}
|
||||
|
||||
|
||||
def test_update_webauthn_credential_errors_if_webauthn_credential_doesnt_exist(admin_request, sample_user):
|
||||
admin_request.post(
|
||||
'webauthn.update_webauthn_credential',
|
||||
user_id=sample_user.id,
|
||||
webauthn_credential_id=uuid.uuid4(),
|
||||
_data={
|
||||
'name': 'my key',
|
||||
},
|
||||
_expected_status=404
|
||||
)
|
||||
|
||||
|
||||
def test_update_webauthn_credential_errors_if_user_id_doesnt_match(admin_request, notify_db_session):
|
||||
user_1 = create_user(email='1')
|
||||
user_2 = create_user(email='2')
|
||||
cred_2 = create_webauthn_credential(user_2)
|
||||
|
||||
response = admin_request.post(
|
||||
'webauthn.update_webauthn_credential',
|
||||
user_id=user_1.id,
|
||||
webauthn_credential_id=cred_2.id,
|
||||
_data={
|
||||
'name': 'new key name',
|
||||
},
|
||||
_expected_status=404
|
||||
)
|
||||
|
||||
assert response['message'] == 'No result found'
|
||||
|
||||
|
||||
def test_delete_webauthn_credential_returns_204(admin_request, sample_user):
|
||||
cred1 = create_webauthn_credential(sample_user)
|
||||
cred2 = create_webauthn_credential(sample_user)
|
||||
admin_request.delete(
|
||||
'webauthn.update_webauthn_credential',
|
||||
user_id=sample_user.id,
|
||||
webauthn_credential_id=cred1.id,
|
||||
_expected_status=204
|
||||
)
|
||||
assert sample_user.webauthn_credentials == [cred2]
|
||||
|
||||
|
||||
def test_delete_webauthn_credential_errors_if_last_key(admin_request, sample_user):
|
||||
cred = create_webauthn_credential(sample_user)
|
||||
response = admin_request.delete(
|
||||
'webauthn.delete_webauthn_credential',
|
||||
user_id=sample_user.id,
|
||||
webauthn_credential_id=cred.id,
|
||||
_expected_status=400
|
||||
)
|
||||
assert response['message'] == 'Cannot delete last remaining webauthn credential for user'
|
||||
|
||||
|
||||
def test_delete_webauthn_credential_errors_if_user_id_doesnt_match(admin_request, notify_db_session):
|
||||
user_1 = create_user(email='1')
|
||||
user_2 = create_user(email='2')
|
||||
cred_2a = create_webauthn_credential(user_2)
|
||||
|
||||
response = admin_request.delete(
|
||||
'webauthn.delete_webauthn_credential',
|
||||
user_id=user_1.id,
|
||||
webauthn_credential_id=cred_2a.id,
|
||||
_expected_status=404
|
||||
)
|
||||
|
||||
assert response['message'] == 'No result found'
|
||||
Reference in New Issue
Block a user