Merge branch 'master' into active-service

This commit is contained in:
Leo Hemsted
2016-11-10 13:54:02 +00:00
5 changed files with 91 additions and 38 deletions

View File

@@ -3,7 +3,7 @@ from sqlalchemy.exc import DataError
from sqlalchemy.orm.exc import NoResultFound
from notifications_python_client.authentication import decode_jwt_token, get_token_issuer
from notifications_python_client.errors import TokenDecodeError, TokenExpiredError
from notifications_python_client.errors import TokenDecodeError, TokenExpiredError, TokenIssuerError
from app.dao.api_key_dao import get_model_api_keys
from app.dao.services_dao import dao_fetch_service_by_id
@@ -39,8 +39,10 @@ def requires_auth():
auth_token = get_auth_token(request)
try:
client = get_token_issuer(auth_token)
except TokenDecodeError:
raise AuthError("Invalid token: signature", 403)
except TokenDecodeError as e:
raise AuthError(e.message, 403)
except TokenIssuerError:
raise AuthError("Invalid token: iss not provided", 403)
if client == current_app.config.get('ADMIN_CLIENT_USER_NAME'):
return handle_admin_key(auth_token, current_app.config.get('ADMIN_CLIENT_SECRET'))
@@ -78,8 +80,8 @@ def handle_admin_key(auth_token, secret):
try:
get_decode_errors(auth_token, secret)
return
except TokenDecodeError:
raise AuthError("Invalid token: signature", 403)
except TokenDecodeError as e:
raise AuthError(e.message, 403)
def get_decode_errors(auth_token, unsigned_secret):

View File

@@ -19,7 +19,7 @@ monotonic==1.2
statsd==3.2.1
jsonschema==2.5.1
git+https://github.com/alphagov/notifications-python-client.git@1.3.0#egg=notifications-python-client==1.3.0
git+https://github.com/alphagov/notifications-python-client.git@2.0.0#egg=notifications-python-client==2.0.0
git+https://github.com/alphagov/notifications-utils.git@9.1.1#egg=notifications-utils==9.1.1

View File

@@ -1,3 +1,6 @@
import jwt
import uuid
import time
from datetime import datetime
import pytest
@@ -41,6 +44,46 @@ def test_should_not_allow_request_with_incorrect_token(notify_api, sample_user):
assert data['message'] == {"token": ['Invalid token: signature']}
def test_should_not_allow_request_with_no_iss(client):
# code copied from notifications_python_client.authentication.py::create_jwt_token
headers = {
"typ": 'JWT',
"alg": 'HS256'
}
claims = {
# 'iss': not provided
'iat': int(time.time())
}
token = jwt.encode(payload=claims, key=str(uuid.uuid4()), headers=headers).decode()
response = client.get('/service', headers={'Authorization': 'Bearer {}'.format(token)})
assert response.status_code == 403
data = json.loads(response.get_data())
assert data['message'] == {"token": ['Invalid token: iss field not provided']}
def test_should_not_allow_request_with_no_iat(client, sample_api_key):
# code copied from notifications_python_client.authentication.py::create_jwt_token
headers = {
"typ": 'JWT',
"alg": 'HS256'
}
claims = {
'iss': str(sample_api_key.service_id)
# 'iat': not provided
}
token = jwt.encode(payload=claims, key=str(uuid.uuid4()), headers=headers).decode()
response = client.get('/service', headers={'Authorization': 'Bearer {}'.format(token)})
assert response.status_code == 403
data = json.loads(response.get_data())
assert data['message'] == {"token": ['Invalid token: signature, api token is not valid']}
def test_should_not_allow_invalid_secret(notify_api, sample_api_key):
with notify_api.test_request_context():
with notify_api.test_client() as client:

View File

@@ -698,14 +698,15 @@ def test_should_delete_notification_and_return_error_if_sqs_fails(
save_model_api_key(api_key)
auth_header = create_jwt_token(secret=api_key.unsigned_secret, client_id=str(api_key.service_id))
with pytest.raises(Exception) as exc:
response = client.post(
path='/notifications/{}'.format(template_type),
data=json.dumps(data),
headers=[('Content-Type', 'application/json'), ('Authorization', 'Bearer {}'.format(auth_header))])
mocked.assert_called_once_with([fake_uuid], queue='send-{}'.format(template_type))
assert str(exc.value) == 'failed to talk to SQS'
assert response.status_code == 500
assert not notifications_dao.get_notification_by_id(fake_uuid)
assert not NotificationHistory.query.get(fake_uuid)

View File

@@ -2,7 +2,6 @@ from contextlib import contextmanager
import os
import boto3
from unittest import mock
import pytest
from alembic.command import upgrade
from alembic.config import Config
@@ -13,16 +12,31 @@ from app import create_app, db
@pytest.fixture(scope='session')
def notify_api(request):
def notify_api():
app = create_app()
# deattach server-error error handlers - error_handler_spec looks like:
# {'blueprint_name': {
# status_code: [error_handlers],
# None: [ tuples of (exception, )]
# }}
for error_handlers in app.error_handler_spec.values():
error_handlers.pop(500, None)
if None in error_handlers:
error_handlers[None] = [
exception_handler
for exception_handler in error_handlers[None]
if exception_handler[0] != Exception
]
if error_handlers[None] == []:
error_handlers.pop(None)
ctx = app.app_context()
ctx.push()
def teardown():
ctx.pop()
yield app
request.addfinalizer(teardown)
return app
ctx.pop()
@pytest.fixture(scope='function')
@@ -32,9 +46,8 @@ def client(notify_api):
@pytest.fixture(scope='session')
def notify_db(notify_api, request):
def notify_db(notify_api):
assert db.engine.url.database != 'notification_api', 'dont run tests against main db'
Migrate(notify_api, db)
Manager(db, MigrateCommand)
BASE_DIR = os.path.dirname(os.path.dirname(__file__))
@@ -45,36 +58,30 @@ def notify_db(notify_api, request):
with notify_api.app_context():
upgrade(config, 'head')
def teardown():
yield db
db.session.remove()
db.get_engine(notify_api).dispose()
request.addfinalizer(teardown)
return db
@pytest.fixture(scope='function')
def notify_db_session(request, notify_db):
def teardown():
def notify_db_session(notify_db):
yield notify_db
notify_db.session.remove()
for tbl in reversed(notify_db.metadata.sorted_tables):
if tbl.name not in ["provider_details", "key_types", "branding_type", "job_status"]:
notify_db.engine.execute(tbl.delete())
notify_db.session.commit()
request.addfinalizer(teardown)
@pytest.fixture(scope='function')
def os_environ(mocker):
mocker.patch('os.environ', {})
@pytest.fixture(scope='function')
def os_environ(request):
env_patch = mock.patch('os.environ', {})
request.addfinalizer(env_patch.stop)
return env_patch.start()
@pytest.fixture(scope='function')
def sqs_client_conn(request):
def sqs_client_conn():
boto3.setup_default_session(region_name='eu-west-1')
return boto3.resource('sqs')