mirror of
https://github.com/GSA/notifications-api.git
synced 2025-12-21 07:51:13 -05:00
refactor authentication code
moved api_key secret manipulation (generating and getting) into authentiation/utils, and added a property on the model, to facilitate easier matching of authenticated requests and the api keys they used
This commit is contained in:
@@ -1,10 +1,8 @@
|
|||||||
from flask import request, jsonify, _request_ctx_stack, current_app
|
from flask import request, jsonify, _request_ctx_stack, current_app
|
||||||
from notifications_python_client.authentication import decode_jwt_token, get_token_issuer
|
from notifications_python_client.authentication import decode_jwt_token, get_token_issuer
|
||||||
from notifications_python_client.errors import TokenDecodeError, TokenExpiredError
|
from notifications_python_client.errors import TokenDecodeError, TokenExpiredError
|
||||||
from werkzeug.exceptions import abort
|
|
||||||
from app.dao.api_key_dao import get_unsigned_secrets
|
from app.dao.api_key_dao import get_unsigned_secrets
|
||||||
from app import api_user
|
|
||||||
from functools import wraps
|
|
||||||
|
|
||||||
|
|
||||||
def authentication_response(message, code):
|
def authentication_response(message, code):
|
||||||
|
|||||||
12
app/authentication/utils.py
Normal file
12
app/authentication/utils.py
Normal file
@@ -0,0 +1,12 @@
|
|||||||
|
from flask import current_app
|
||||||
|
from itsdangerous import URLSafeSerializer
|
||||||
|
|
||||||
|
|
||||||
|
def get_secret(secret):
|
||||||
|
serializer = URLSafeSerializer(current_app.config.get('SECRET_KEY'))
|
||||||
|
return serializer.loads(secret, salt=current_app.config.get('DANGEROUS_SALT'))
|
||||||
|
|
||||||
|
|
||||||
|
def generate_secret(token):
|
||||||
|
serializer = URLSafeSerializer(current_app.config.get('SECRET_KEY'))
|
||||||
|
return serializer.dumps(str(token), current_app.config.get('DANGEROUS_SALT'))
|
||||||
@@ -1,9 +1,6 @@
|
|||||||
import uuid
|
import uuid
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
|
||||||
from flask import current_app
|
|
||||||
from itsdangerous import URLSafeSerializer
|
|
||||||
|
|
||||||
from app import db
|
from app import db
|
||||||
from app.models import ApiKey
|
from app.models import ApiKey
|
||||||
|
|
||||||
@@ -11,6 +8,7 @@ from app.dao.dao_utils import (
|
|||||||
transactional,
|
transactional,
|
||||||
version_class
|
version_class
|
||||||
)
|
)
|
||||||
|
from app.authentication.utils import generate_secret
|
||||||
|
|
||||||
|
|
||||||
@transactional
|
@transactional
|
||||||
@@ -18,7 +16,7 @@ from app.dao.dao_utils import (
|
|||||||
def save_model_api_key(api_key):
|
def save_model_api_key(api_key):
|
||||||
if not api_key.id:
|
if not api_key.id:
|
||||||
api_key.id = uuid.uuid4() # must be set now so version history model can use same id
|
api_key.id = uuid.uuid4() # must be set now so version history model can use same id
|
||||||
api_key.secret = _generate_secret()
|
api_key.secret = generate_secret(uuid.uuid4())
|
||||||
db.session.add(api_key)
|
db.session.add(api_key)
|
||||||
|
|
||||||
|
|
||||||
@@ -41,7 +39,7 @@ def get_unsigned_secrets(service_id):
|
|||||||
This method can only be exposed to the Authentication of the api calls.
|
This method can only be exposed to the Authentication of the api calls.
|
||||||
"""
|
"""
|
||||||
api_keys = ApiKey.query.filter_by(service_id=service_id, expiry_date=None).all()
|
api_keys = ApiKey.query.filter_by(service_id=service_id, expiry_date=None).all()
|
||||||
keys = [_get_secret(x.secret) for x in api_keys]
|
keys = [x.unsigned_secret for x in api_keys]
|
||||||
return keys
|
return keys
|
||||||
|
|
||||||
|
|
||||||
@@ -50,15 +48,4 @@ def get_unsigned_secret(key_id):
|
|||||||
This method can only be exposed to the Authentication of the api calls.
|
This method can only be exposed to the Authentication of the api calls.
|
||||||
"""
|
"""
|
||||||
api_key = ApiKey.query.filter_by(id=key_id, expiry_date=None).one()
|
api_key = ApiKey.query.filter_by(id=key_id, expiry_date=None).one()
|
||||||
return _get_secret(api_key.secret)
|
return api_key.unsigned_secret
|
||||||
|
|
||||||
|
|
||||||
def _generate_secret():
|
|
||||||
token = uuid.uuid4()
|
|
||||||
serializer = URLSafeSerializer(current_app.config.get('SECRET_KEY'))
|
|
||||||
return serializer.dumps(str(token), current_app.config.get('DANGEROUS_SALT'))
|
|
||||||
|
|
||||||
|
|
||||||
def _get_secret(signed_secret):
|
|
||||||
serializer = URLSafeSerializer(current_app.config.get('SECRET_KEY'))
|
|
||||||
return serializer.loads(signed_secret, salt=current_app.config.get('DANGEROUS_SALT'))
|
|
||||||
|
|||||||
@@ -5,14 +5,13 @@ from sqlalchemy.dialects.postgresql import (
|
|||||||
UUID,
|
UUID,
|
||||||
JSON
|
JSON
|
||||||
)
|
)
|
||||||
|
|
||||||
from sqlalchemy import UniqueConstraint
|
from sqlalchemy import UniqueConstraint
|
||||||
|
|
||||||
from app.encryption import (
|
from app.encryption import (
|
||||||
hashpw,
|
hashpw,
|
||||||
check_hash
|
check_hash
|
||||||
)
|
)
|
||||||
|
from app.authentication.utils import get_secret
|
||||||
from app import (
|
from app import (
|
||||||
db,
|
db,
|
||||||
encryption
|
encryption
|
||||||
@@ -135,6 +134,10 @@ class ApiKey(db.Model, Versioned):
|
|||||||
UniqueConstraint('service_id', 'name', name='uix_service_to_key_name'),
|
UniqueConstraint('service_id', 'name', name='uix_service_to_key_name'),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def unsigned_secret(self):
|
||||||
|
return get_secret(self.secret)
|
||||||
|
|
||||||
|
|
||||||
KEY_TYPE_NORMAL = 'normal'
|
KEY_TYPE_NORMAL = 'normal'
|
||||||
KEY_TYPE_TEAM = 'team'
|
KEY_TYPE_TEAM = 'team'
|
||||||
|
|||||||
@@ -1,9 +1,8 @@
|
|||||||
import uuid
|
from datetime import datetime
|
||||||
from datetime import datetime, timedelta
|
|
||||||
from notifications_python_client.authentication import create_jwt_token
|
from notifications_python_client.authentication import create_jwt_token
|
||||||
from flask import json, current_app
|
from flask import json, current_app
|
||||||
from app.dao.api_key_dao import get_unsigned_secrets, save_model_api_key, get_unsigned_secret, expire_api_key
|
from app.dao.api_key_dao import get_unsigned_secrets, save_model_api_key, get_unsigned_secret, expire_api_key
|
||||||
from app.models import ApiKey, KEY_TYPE_NORMAL
|
from app.models import ApiKey, KEY_TYPE_NORMAL, KEY_TYPE_TEAM
|
||||||
|
|
||||||
|
|
||||||
def test_should_not_allow_request_with_no_token(notify_api):
|
def test_should_not_allow_request_with_no_token(notify_api):
|
||||||
@@ -90,13 +89,6 @@ def test_should_allow_valid_token_when_service_has_multiple_keys(notify_api, sam
|
|||||||
assert response.status_code == 200
|
assert response.status_code == 200
|
||||||
|
|
||||||
|
|
||||||
JSON_BODY = json.dumps({
|
|
||||||
"key1": "value1",
|
|
||||||
"key2": "value2",
|
|
||||||
"key3": "value3"
|
|
||||||
})
|
|
||||||
|
|
||||||
|
|
||||||
def test_authentication_passes_admin_client_token(notify_api,
|
def test_authentication_passes_admin_client_token(notify_api,
|
||||||
notify_db,
|
notify_db,
|
||||||
notify_db_session,
|
notify_db_session,
|
||||||
|
|||||||
8
tests/app/authentication/test_utils.py
Normal file
8
tests/app/authentication/test_utils.py
Normal file
@@ -0,0 +1,8 @@
|
|||||||
|
from app.authentication.utils import generate_secret, get_secret
|
||||||
|
|
||||||
|
|
||||||
|
def test_secret_is_signed_and_can_be_read_again(notify_api):
|
||||||
|
with notify_api.test_request_context():
|
||||||
|
signed_secret = generate_secret('some_uuid')
|
||||||
|
assert signed_secret != 'some_uuid'
|
||||||
|
assert 'some_uuid' == get_secret(signed_secret)
|
||||||
@@ -4,23 +4,15 @@ import pytest
|
|||||||
from sqlalchemy.exc import IntegrityError
|
from sqlalchemy.exc import IntegrityError
|
||||||
from sqlalchemy.orm.exc import NoResultFound
|
from sqlalchemy.orm.exc import NoResultFound
|
||||||
|
|
||||||
|
from app.authentication.utils import get_secret
|
||||||
from app.dao.api_key_dao import (save_model_api_key,
|
from app.dao.api_key_dao import (save_model_api_key,
|
||||||
get_model_api_keys,
|
get_model_api_keys,
|
||||||
get_unsigned_secrets,
|
get_unsigned_secrets,
|
||||||
get_unsigned_secret,
|
get_unsigned_secret,
|
||||||
_generate_secret,
|
expire_api_key)
|
||||||
_get_secret, expire_api_key)
|
|
||||||
from app.models import ApiKey, KEY_TYPE_NORMAL
|
from app.models import ApiKey, KEY_TYPE_NORMAL
|
||||||
|
|
||||||
|
|
||||||
def test_secret_is_signed_and_can_be_read_again(notify_api, mocker):
|
|
||||||
with notify_api.test_request_context():
|
|
||||||
mocker.patch("uuid.uuid4", return_value='some_uuid')
|
|
||||||
signed_secret = _generate_secret()
|
|
||||||
assert 'some_uuid' == _get_secret(signed_secret)
|
|
||||||
assert signed_secret != 'some_uuid'
|
|
||||||
|
|
||||||
|
|
||||||
def test_save_api_key_should_create_new_api_key_and_history(sample_service):
|
def test_save_api_key_should_create_new_api_key_and_history(sample_service):
|
||||||
api_key = ApiKey(**{'service': sample_service,
|
api_key = ApiKey(**{'service': sample_service,
|
||||||
'name': sample_service.name,
|
'name': sample_service.name,
|
||||||
@@ -72,13 +64,13 @@ def test_should_return_unsigned_api_keys_for_service_id(sample_api_key):
|
|||||||
unsigned_api_key = get_unsigned_secrets(sample_api_key.service_id)
|
unsigned_api_key = get_unsigned_secrets(sample_api_key.service_id)
|
||||||
assert len(unsigned_api_key) == 1
|
assert len(unsigned_api_key) == 1
|
||||||
assert sample_api_key.secret != unsigned_api_key[0]
|
assert sample_api_key.secret != unsigned_api_key[0]
|
||||||
assert unsigned_api_key[0] == _get_secret(sample_api_key.secret)
|
assert unsigned_api_key[0] == get_secret(sample_api_key.secret)
|
||||||
|
|
||||||
|
|
||||||
def test_get_unsigned_secret_returns_key(sample_api_key):
|
def test_get_unsigned_secret_returns_key(sample_api_key):
|
||||||
unsigned_api_key = get_unsigned_secret(sample_api_key.id)
|
unsigned_api_key = get_unsigned_secret(sample_api_key.id)
|
||||||
assert sample_api_key.secret != unsigned_api_key
|
assert sample_api_key.secret != unsigned_api_key
|
||||||
assert unsigned_api_key == _get_secret(sample_api_key.secret)
|
assert unsigned_api_key == get_secret(sample_api_key.secret)
|
||||||
|
|
||||||
|
|
||||||
def test_should_not_allow_duplicate_key_names_per_service(sample_api_key, fake_uuid):
|
def test_should_not_allow_duplicate_key_names_per_service(sample_api_key, fake_uuid):
|
||||||
|
|||||||
Reference in New Issue
Block a user