mirror of
https://github.com/GSA/notifications-api.git
synced 2026-02-04 02:11:11 -05:00
Implement a JWT header into base client
- adds a base client - adds a notifications client These do not proxy onto genuine methods. This pull request is the basic implication of the API Client. Still needs a few things before is ready, notably proper logging and actual API endpoints to hook into. Basic premise is to deliver the JWT tokens required for Notify API authentication so we can discuss the implementation/premise.
This commit is contained in:
@@ -1,7 +1,100 @@
|
||||
from flask import Blueprint
|
||||
from flask import Blueprint, request, abort
|
||||
from jwt import decode, DecodeError
|
||||
import calendar
|
||||
import time
|
||||
import base64
|
||||
import hashlib
|
||||
import hmac
|
||||
|
||||
AUTHORIZATION_HEADER = 'Authorization'
|
||||
AUTHORIZATION_SCHEME = 'Bearer'
|
||||
WINDOW = 1
|
||||
|
||||
main = Blueprint('main', __name__)
|
||||
|
||||
|
||||
def get_secrets(service_identifier):
|
||||
"""
|
||||
Temp method until secrets are stored in database etc
|
||||
:param service_identifier:
|
||||
:return: (Boolean, String)
|
||||
"""
|
||||
secrets = {
|
||||
'service1': '1234'
|
||||
}
|
||||
if service_identifier not in secrets:
|
||||
return False, None
|
||||
return True, secrets[service_identifier]
|
||||
|
||||
|
||||
def get_token_from_headers(headers):
|
||||
auth_header = headers.get(AUTHORIZATION_HEADER, '')
|
||||
if auth_header[:7] != AUTHORIZATION_SCHEME + " ":
|
||||
return None
|
||||
return auth_header[7:]
|
||||
|
||||
|
||||
def token_is_valid(token):
|
||||
try:
|
||||
# decode token to get service identifier
|
||||
# signature not checked
|
||||
unverified = decode(token, verify=False, algorithms=['HS256'])
|
||||
|
||||
# service identifier used to get secret
|
||||
found, secret = get_secrets(unverified['iss'])
|
||||
|
||||
# use secret to validate the token
|
||||
verified = decode(token, key=secret.encode(), verify=True, algorithms=['HS256'])
|
||||
|
||||
# check expiry
|
||||
if not calendar.timegm(time.gmtime()) < verified['iat'] + WINDOW:
|
||||
print("TIMESTAMP FAILED")
|
||||
return False
|
||||
|
||||
# check request
|
||||
signed_url = base64.b64encode(
|
||||
hmac.new(
|
||||
secret.encode(),
|
||||
"{} {}".format(request.method, request.path).encode(),
|
||||
digestmod=hashlib.sha256
|
||||
).digest()
|
||||
).decode()
|
||||
if signed_url != verified['req']:
|
||||
print("URL FAILED")
|
||||
return False
|
||||
|
||||
# check body
|
||||
signed_json_request = base64.b64encode(
|
||||
hmac.new(
|
||||
secret.encode(),
|
||||
request.data,
|
||||
digestmod=hashlib.sha256
|
||||
).digest()
|
||||
).decode()
|
||||
|
||||
print(verified)
|
||||
|
||||
if signed_json_request != verified['pay']:
|
||||
print("PAYLOAD FAILED")
|
||||
return False
|
||||
|
||||
except DecodeError:
|
||||
print("TOKEN VERIFICATION FAILED")
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def perform_authentication():
|
||||
incoming_token = get_token_from_headers(request.headers)
|
||||
if not incoming_token:
|
||||
abort(401)
|
||||
if not token_is_valid(incoming_token):
|
||||
abort(403)
|
||||
|
||||
|
||||
main.before_request(perform_authentication)
|
||||
|
||||
|
||||
from .views import notifications, index
|
||||
from . import errors
|
||||
|
||||
36
app/main/errors.py
Normal file
36
app/main/errors.py
Normal file
@@ -0,0 +1,36 @@
|
||||
from flask import jsonify
|
||||
|
||||
from . import main
|
||||
|
||||
|
||||
@main.app_errorhandler(400)
|
||||
def bad_request(e):
|
||||
return jsonify(error=str(e.description)), 400
|
||||
|
||||
|
||||
@main.app_errorhandler(401)
|
||||
def unauthorized(e):
|
||||
error_message = "Unauthorized, authentication token must be provided"
|
||||
return jsonify(error=error_message), 401, [('WWW-Authenticate', 'Bearer')]
|
||||
|
||||
|
||||
@main.app_errorhandler(403)
|
||||
def forbidden(e):
|
||||
error_message = "Forbidden, invalid authentication token provided"
|
||||
return jsonify(error=error_message), 403
|
||||
|
||||
|
||||
@main.app_errorhandler(404)
|
||||
def page_not_found(e):
|
||||
return jsonify(error=e.description or "Not found"), 404
|
||||
|
||||
|
||||
@main.app_errorhandler(429)
|
||||
def limit_exceeded(e):
|
||||
return jsonify(error=str(e.description)), 429
|
||||
|
||||
|
||||
@main.app_errorhandler(500)
|
||||
def internal_server_error(e):
|
||||
# TODO: log the error
|
||||
return jsonify(error="Internal error"), 500
|
||||
@@ -3,5 +3,10 @@ from .. import main
|
||||
|
||||
|
||||
@main.route('/', methods=['GET'])
|
||||
def index():
|
||||
def get_index():
|
||||
return jsonify(result="hello world"), 200
|
||||
|
||||
|
||||
@main.route('/', methods=['POST'])
|
||||
def post_index():
|
||||
return jsonify(result="hello world"), 200
|
||||
|
||||
Reference in New Issue
Block a user