mirror of
https://github.com/GSA/notifications-api.git
synced 2025-12-26 02:11:41 -05:00
If the service does not have any api keys, there would be an error but it was not formed well.
78 lines
2.5 KiB
Python
78 lines
2.5 KiB
Python
from flask import request, jsonify, _request_ctx_stack, current_app
|
|
from notifications_python_client.authentication import decode_jwt_token, get_token_issuer
|
|
from notifications_python_client.errors import TokenDecodeError, TokenExpiredError
|
|
from werkzeug.exceptions import abort
|
|
from app.dao.api_key_dao import get_unsigned_secrets
|
|
from app import api_user
|
|
from functools import wraps
|
|
|
|
|
|
def authentication_response(message, code):
|
|
current_app.logger.info(message)
|
|
return jsonify(result='error',
|
|
message=message
|
|
), code
|
|
|
|
|
|
def requires_auth():
|
|
auth_header = request.headers.get('Authorization', None)
|
|
if not auth_header:
|
|
return authentication_response('Unauthorized, authentication token must be provided', 401)
|
|
|
|
auth_scheme = auth_header[:7]
|
|
|
|
if auth_scheme != 'Bearer ':
|
|
return authentication_response('Unauthorized, authentication bearer scheme must be used', 401)
|
|
|
|
auth_token = auth_header[7:]
|
|
try:
|
|
api_client = fetch_client(get_token_issuer(auth_token))
|
|
except TokenDecodeError:
|
|
return authentication_response("Invalid token: signature", 403)
|
|
if api_client is None:
|
|
authentication_response("Invalid credentials", 403)
|
|
|
|
for secret in api_client['secret']:
|
|
try:
|
|
decode_jwt_token(
|
|
auth_token,
|
|
secret,
|
|
request.method,
|
|
request.path,
|
|
request.data.decode() if request.data else None
|
|
)
|
|
_request_ctx_stack.top.api_user = api_client
|
|
return
|
|
except TokenExpiredError:
|
|
errors_resp = authentication_response("Invalid token: expired", 403)
|
|
except TokenDecodeError:
|
|
errors_resp = authentication_response("Invalid token: signature", 403)
|
|
|
|
if not api_client['secret']:
|
|
errors_resp = authentication_response("Invalid token: signature", 403)
|
|
return errors_resp
|
|
|
|
|
|
def fetch_client(client):
|
|
if client == current_app.config.get('ADMIN_CLIENT_USER_NAME'):
|
|
return {
|
|
"client": client,
|
|
"secret": [current_app.config.get('ADMIN_CLIENT_SECRET')]
|
|
}
|
|
else:
|
|
return {
|
|
"client": client,
|
|
"secret": get_unsigned_secrets(client)
|
|
}
|
|
|
|
|
|
def require_admin():
|
|
def wrap(func):
|
|
@wraps(func)
|
|
def wrap_func(*args, **kwargs):
|
|
if not api_user['client'] == current_app.config.get('ADMIN_CLIENT_USER_NAME'):
|
|
abort(403)
|
|
return func(*args, **kwargs)
|
|
return wrap_func
|
|
return wrap
|