mirror of
https://github.com/GSA/notifications-api.git
synced 2025-12-21 07:51:13 -05:00
@@ -1,6 +1,8 @@
|
|||||||
from flask import request, _request_ctx_stack, current_app, g
|
from flask import request, _request_ctx_stack, current_app, g
|
||||||
from notifications_python_client.authentication import decode_jwt_token, get_token_issuer
|
from notifications_python_client.authentication import decode_jwt_token, get_token_issuer
|
||||||
from notifications_python_client.errors import TokenDecodeError, TokenExpiredError, TokenIssuerError
|
from notifications_python_client.errors import (
|
||||||
|
TokenDecodeError, TokenExpiredError, TokenIssuerError, TokenAlgorithmError
|
||||||
|
)
|
||||||
from notifications_utils import request_helper
|
from notifications_utils import request_helper
|
||||||
from sqlalchemy.exc import DataError
|
from sqlalchemy.exc import DataError
|
||||||
from sqlalchemy.orm.exc import NoResultFound
|
from sqlalchemy.orm.exc import NoResultFound
|
||||||
@@ -83,13 +85,14 @@ def requires_auth():
|
|||||||
for api_key in service.api_keys:
|
for api_key in service.api_keys:
|
||||||
try:
|
try:
|
||||||
decode_jwt_token(auth_token, api_key.secret)
|
decode_jwt_token(auth_token, api_key.secret)
|
||||||
|
except TokenExpiredError:
|
||||||
|
err_msg = "Error: Your system clock must be accurate to within 30 seconds"
|
||||||
|
raise AuthError(err_msg, 403, service_id=service.id, api_key_id=api_key.id)
|
||||||
|
except TokenAlgorithmError:
|
||||||
|
err_msg = "Invalid token: algorithm used is not HS256"
|
||||||
|
raise AuthError(err_msg, 403, service_id=service.id, api_key_id=api_key.id)
|
||||||
except TokenDecodeError:
|
except TokenDecodeError:
|
||||||
continue
|
continue
|
||||||
except TokenExpiredError:
|
|
||||||
err_msg = (
|
|
||||||
"Error: Your system clock must be accurate to within 30 seconds"
|
|
||||||
)
|
|
||||||
raise AuthError(err_msg, 403, service_id=service.id, api_key_id=api_key.id)
|
|
||||||
|
|
||||||
if api_key.expiry_date:
|
if api_key.expiry_date:
|
||||||
raise AuthError("Invalid token: API key revoked", 403, service_id=service.id, api_key_id=api_key.id)
|
raise AuthError("Invalid token: API key revoked", 403, service_id=service.id, api_key_id=api_key.id)
|
||||||
|
|||||||
@@ -20,7 +20,7 @@ psycopg2-binary==2.8.4
|
|||||||
PyJWT==1.7.1
|
PyJWT==1.7.1
|
||||||
SQLAlchemy==1.3.10
|
SQLAlchemy==1.3.10
|
||||||
|
|
||||||
notifications-python-client==5.4.0
|
notifications-python-client==5.4.1
|
||||||
|
|
||||||
# PaaS
|
# PaaS
|
||||||
awscli-cwlogs>=1.4,<1.5
|
awscli-cwlogs>=1.4,<1.5
|
||||||
|
|||||||
@@ -22,7 +22,7 @@ psycopg2-binary==2.8.4
|
|||||||
PyJWT==1.7.1
|
PyJWT==1.7.1
|
||||||
SQLAlchemy==1.3.10
|
SQLAlchemy==1.3.10
|
||||||
|
|
||||||
notifications-python-client==5.4.0
|
notifications-python-client==5.4.1
|
||||||
|
|
||||||
# PaaS
|
# PaaS
|
||||||
awscli-cwlogs>=1.4,<1.5
|
awscli-cwlogs>=1.4,<1.5
|
||||||
@@ -40,12 +40,12 @@ alembic==1.3.1
|
|||||||
amqp==1.4.9
|
amqp==1.4.9
|
||||||
anyjson==0.3.3
|
anyjson==0.3.3
|
||||||
attrs==19.3.0
|
attrs==19.3.0
|
||||||
awscli==1.16.298
|
awscli==1.16.301
|
||||||
bcrypt==3.1.7
|
bcrypt==3.1.7
|
||||||
billiard==3.3.0.23
|
billiard==3.3.0.23
|
||||||
bleach==3.1.0
|
bleach==3.1.0
|
||||||
boto3==1.9.221
|
boto3==1.9.221
|
||||||
botocore==1.13.34
|
botocore==1.13.37
|
||||||
certifi==2019.11.28
|
certifi==2019.11.28
|
||||||
chardet==3.0.4
|
chardet==3.0.4
|
||||||
Click==7.0
|
Click==7.0
|
||||||
@@ -56,7 +56,7 @@ flask-redis==0.4.0
|
|||||||
future==0.18.2
|
future==0.18.2
|
||||||
greenlet==0.4.15
|
greenlet==0.4.15
|
||||||
idna==2.8
|
idna==2.8
|
||||||
importlib-metadata==1.2.0
|
importlib-metadata==1.3.0
|
||||||
Jinja2==2.10.3
|
Jinja2==2.10.3
|
||||||
jmespath==0.9.4
|
jmespath==0.9.4
|
||||||
kombu==3.0.37
|
kombu==3.0.37
|
||||||
|
|||||||
@@ -83,6 +83,27 @@ def test_auth_should_not_allow_request_with_no_iat(client, sample_api_key):
|
|||||||
assert exc.value.short_message == 'Invalid token: signature, api token not found'
|
assert exc.value.short_message == 'Invalid token: signature, api token not found'
|
||||||
|
|
||||||
|
|
||||||
|
def test_auth_should_not_allow_request_with_non_hs256_algorithm(client, sample_api_key):
|
||||||
|
iss = str(sample_api_key.service_id)
|
||||||
|
# code copied from notifications_python_client.authentication.py::create_jwt_token
|
||||||
|
headers = {
|
||||||
|
"typ": 'JWT',
|
||||||
|
"alg": 'HS512'
|
||||||
|
}
|
||||||
|
|
||||||
|
claims = {
|
||||||
|
'iss': iss,
|
||||||
|
'iat': int(time.time())
|
||||||
|
}
|
||||||
|
|
||||||
|
token = jwt.encode(payload=claims, key=str(uuid.uuid4()), headers=headers).decode()
|
||||||
|
|
||||||
|
request.headers = {'Authorization': 'Bearer {}'.format(token)}
|
||||||
|
with pytest.raises(AuthError) as exc:
|
||||||
|
requires_auth()
|
||||||
|
assert exc.value.short_message == 'Invalid token: algorithm used is not HS256'
|
||||||
|
|
||||||
|
|
||||||
def test_admin_auth_should_not_allow_request_with_no_iat(client, sample_api_key):
|
def test_admin_auth_should_not_allow_request_with_no_iat(client, sample_api_key):
|
||||||
iss = current_app.config['ADMIN_CLIENT_USER_NAME']
|
iss = current_app.config['ADMIN_CLIENT_USER_NAME']
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user