mirror of
https://github.com/GSA/notifications-api.git
synced 2025-12-25 01:41:34 -05:00
Once removal of code that uses existing alpha is done, then duplicated code from /notifications/sms and the new endpoint can be merged. Job id is now avaiable in notificaiton but is not used yet.
71 lines
2.4 KiB
Python
71 lines
2.4 KiB
Python
from flask import request, jsonify, _request_ctx_stack, current_app
|
|
from client.authentication import decode_jwt_token, get_token_issuer
|
|
from client.errors import TokenDecodeError, TokenRequestError, TokenExpiredError, TokenPayloadError
|
|
from app.dao.api_key_dao import get_unsigned_secrets
|
|
|
|
|
|
def authentication_response(message, code):
|
|
current_app.logger.info(message)
|
|
return jsonify(
|
|
error=message
|
|
), code
|
|
|
|
|
|
def requires_auth():
|
|
auth_header = request.headers.get('Authorization', None)
|
|
if not auth_header:
|
|
return authentication_response('Unauthorized, authentication token must be provided', 401)
|
|
|
|
auth_scheme = auth_header[:7]
|
|
|
|
if auth_scheme != 'Bearer ':
|
|
return authentication_response('Unauthorized, authentication bearer scheme must be used', 401)
|
|
|
|
auth_token = auth_header[7:]
|
|
try:
|
|
api_client = fetch_client(get_token_issuer(auth_token))
|
|
except TokenDecodeError:
|
|
return authentication_response("Invalid token: signature", 403)
|
|
if api_client is None:
|
|
authentication_response("Invalid credentials", 403)
|
|
|
|
for secret in api_client['secret']:
|
|
try:
|
|
decode_jwt_token(
|
|
auth_token,
|
|
secret,
|
|
request.method,
|
|
request.path,
|
|
request.data.decode() if request.data else None
|
|
)
|
|
_request_ctx_stack.top.api_user = api_client
|
|
return
|
|
except TokenRequestError:
|
|
errors_resp = authentication_response("Invalid token: request", 403)
|
|
except TokenExpiredError:
|
|
errors_resp = authentication_response("Invalid token: expired", 403)
|
|
except TokenPayloadError:
|
|
errors_resp = authentication_response("Invalid token: payload", 403)
|
|
except TokenDecodeError:
|
|
errors_resp = authentication_response("Invalid token: signature", 403)
|
|
|
|
return errors_resp
|
|
|
|
|
|
def fetch_client(client):
|
|
if client == current_app.config.get('ADMIN_CLIENT_USER_NAME'):
|
|
return {
|
|
"client": client,
|
|
"secret": [current_app.config.get('ADMIN_CLIENT_SECRET')]
|
|
}
|
|
elif client == current_app.config.get('DELIVERY_CLIENT_USER_NAME'):
|
|
return {
|
|
"client": client,
|
|
"secret": [current_app.config.get('DELIVERY_CLIENT_SECRET')]
|
|
}
|
|
else:
|
|
return {
|
|
"client": client,
|
|
"secret": get_unsigned_secrets(client)
|
|
}
|