Change Tokens to ApiKey

Added name to ApiKey model
This commit is contained in:
Rebecca Law
2016-01-19 12:07:00 +00:00
parent 6966c2484f
commit 4fc5c34320
14 changed files with 281 additions and 226 deletions

View File

@@ -1,7 +1,7 @@
from client.authentication import create_jwt_token
from flask import json, url_for
from app.dao.tokens_dao import get_unsigned_token
from app.dao.api_key_dao import get_unsigned_secret
def test_should_not_allow_request_with_no_token(notify_api):
@@ -33,13 +33,13 @@ def test_should_not_allow_request_with_incorrect_token(notify_api):
assert data['error'] == 'Invalid token: signature'
def test_should_not_allow_incorrect_path(notify_api, notify_db, notify_db_session, sample_token):
def test_should_not_allow_incorrect_path(notify_api, notify_db, notify_db_session, sample_api_key):
with notify_api.test_request_context():
with notify_api.test_client() as client:
token = create_jwt_token(request_method="GET",
request_path="/bad",
secret=get_unsigned_token(sample_token.service_id),
client_id=sample_token.service_id)
secret=get_unsigned_secret(sample_api_key.service_id),
client_id=sample_api_key.service_id)
response = client.get(url_for('status.show_status'),
headers={'Authorization': "Bearer {}".format(token)})
assert response.status_code == 403
@@ -47,10 +47,10 @@ def test_should_not_allow_incorrect_path(notify_api, notify_db, notify_db_sessio
assert data['error'] == 'Invalid token: request'
def test_should_not_allow_incorrect_method(notify_api, notify_db, notify_db_session, sample_token):
def test_should_not_allow_incorrect_method(notify_api, notify_db, notify_db_session, sample_api_key):
with notify_api.test_request_context():
with notify_api.test_client() as client:
token = __create_post_token(sample_token.service_id, {})
token = __create_post_token(sample_api_key.service_id, {})
response = client.get(url_for('status.show_status'),
headers={'Authorization': "Bearer {}".format(token)})
assert response.status_code == 403
@@ -58,11 +58,11 @@ def test_should_not_allow_incorrect_method(notify_api, notify_db, notify_db_sess
assert data['error'] == 'Invalid token: request'
def test_should_not_allow_invalid_secret(notify_api, notify_db, notify_db_session, sample_token):
def test_should_not_allow_invalid_secret(notify_api, notify_db, notify_db_session, sample_api_key):
with notify_api.test_request_context():
with notify_api.test_client() as client:
token = create_jwt_token(request_method="POST", request_path="/", secret="not-so-secret",
client_id=sample_token.service_id)
client_id=sample_api_key.service_id)
response = client.get(url_for('status.show_status'),
headers={'Authorization': "Bearer {}".format(token)})
assert response.status_code == 403
@@ -70,10 +70,10 @@ def test_should_not_allow_invalid_secret(notify_api, notify_db, notify_db_sessio
assert data['error'] == 'Invalid token: signature'
def test_should_allow_valid_token(notify_api, notify_db, notify_db_session, sample_token):
def test_should_allow_valid_token(notify_api, notify_db, notify_db_session, sample_api_key):
with notify_api.test_request_context():
with notify_api.test_client() as client:
token = __create_get_token(sample_token.service_id)
token = __create_get_token(sample_api_key.service_id)
response = client.get(url_for('status.show_status'),
headers={'Authorization': 'Bearer {}'.format(token)})
assert response.status_code == 200
@@ -86,20 +86,20 @@ JSON_BODY = json.dumps({
})
def test_should_allow_valid_token_with_post_body(notify_api, notify_db, notify_db_session, sample_token):
def test_should_allow_valid_token_with_post_body(notify_api, notify_db, notify_db_session, sample_api_key):
with notify_api.test_request_context():
with notify_api.test_client() as client:
token = __create_post_token(sample_token.service_id, JSON_BODY)
token = __create_post_token(sample_api_key.service_id, JSON_BODY)
response = client.post(url_for('status.show_status'),
data=JSON_BODY,
headers={'Authorization': 'Bearer {}'.format(token)})
assert response.status_code == 200
def test_should_not_allow_valid_token_with_invalid_post_body(notify_api, notify_db, notify_db_session, sample_token):
def test_should_not_allow_valid_token_with_invalid_post_body(notify_api, notify_db, notify_db_session, sample_api_key):
with notify_api.test_request_context():
with notify_api.test_client() as client:
token = __create_post_token(sample_token.service_id, JSON_BODY)
token = __create_post_token(sample_api_key.service_id, JSON_BODY)
response = client.post(url_for('status.show_status'),
data="spurious",
headers={'Authorization': 'Bearer {}'.format(token)})
@@ -111,7 +111,7 @@ def test_should_not_allow_valid_token_with_invalid_post_body(notify_api, notify_
def __create_get_token(service_id):
return create_jwt_token(request_method="GET",
request_path=url_for('status.show_status'),
secret=get_unsigned_token(service_id),
secret=get_unsigned_secret(service_id),
client_id=service_id)
@@ -119,7 +119,7 @@ def __create_post_token(service_id, request_body):
return create_jwt_token(
request_method="POST",
request_path=url_for('status.show_status'),
secret=get_unsigned_token(service_id),
secret=get_unsigned_secret(service_id),
client_id=service_id,
request_body=request_body
)

View File

@@ -1,9 +1,9 @@
import pytest
from app.models import (User, Service, Template, Token, Job)
from app.models import (User, Service, Template, ApiKey, Job)
from app.dao.users_dao import (save_model_user)
from app.dao.services_dao import save_model_service
from app.dao.templates_dao import save_model_template
from app.dao.tokens_dao import save_model_token
from app.dao.api_key_dao import save_model_api_key
from app.dao.jobs_dao import save_job
import uuid
@@ -51,7 +51,7 @@ def sample_template(notify_db,
service=None):
if service is None:
service = sample_service(notify_db, notify_db_session)
sample_token(notify_db, notify_db_session, service=service)
sample_api_key(notify_db, notify_db_session, service=service)
data = {
'name': template_name,
'template_type': template_type,
@@ -64,15 +64,15 @@ def sample_template(notify_db,
@pytest.fixture(scope='function')
def sample_token(notify_db,
notify_db_session,
service=None):
def sample_api_key(notify_db,
notify_db_session,
service=None):
if service is None:
service = sample_service(notify_db, notify_db_session)
data = {'service_id': service.id}
token = Token(**data)
save_model_token(token)
return token
data = {'service_id': service.id, 'name': service.name}
api_key = ApiKey(**data)
save_model_api_key(api_key)
return api_key
@pytest.fixture(scope='function')
@@ -105,7 +105,7 @@ def sample_job(notify_db,
def sample_admin_service_id(notify_db, notify_db_session):
admin_user = sample_user(notify_db, notify_db_session, email="notify_admin@digital.cabinet-office.gov.uk")
admin_service = sample_service(notify_db, notify_db_session, service_name="Sample Admin Service", user=admin_user)
data = {'service_id': admin_service.id}
token = Token(**data)
save_model_token(token)
data = {'service_id': admin_service.id, 'name': 'sample admin key'}
api_key = ApiKey(**data)
save_model_api_key(api_key)
return admin_service.id

View File

@@ -0,0 +1,69 @@
from datetime import datetime
from pytest import fail
from sqlalchemy.orm.exc import NoResultFound
from app.dao.api_key_dao import (save_model_api_key,
get_model_api_keys,
get_unsigned_secret,
_generate_secret,
_get_secret)
from app.models import ApiKey
def test_secret_is_signed_and_can_be_read_again(notify_api):
import uuid
with notify_api.test_request_context():
token = str(uuid.uuid4())
signed_secret = _generate_secret(token=token)
assert token == _get_secret(signed_secret)
assert signed_secret != token
def test_save_api_key_should_create_new_api_key(notify_api, notify_db, notify_db_session, sample_service):
api_key = ApiKey(**{'service_id': sample_service.id, 'name': sample_service.name})
save_model_api_key(api_key)
all_api_keys = get_model_api_keys()
assert len(all_api_keys) == 1
assert all_api_keys[0] == api_key
def test_save_api_key_should_update_the_api_key(notify_api, notify_db, notify_db_session, sample_api_key):
now = datetime.utcnow()
saved_api_key = get_model_api_keys(sample_api_key.service_id)
save_model_api_key(saved_api_key, update_dict={'id': saved_api_key.id, 'expiry_date': now})
all_api_keys = get_model_api_keys()
assert len(all_api_keys) == 1
assert all_api_keys[0].expiry_date == now
assert all_api_keys[0].secret == saved_api_key.secret
assert all_api_keys[0].id == saved_api_key.id
assert all_api_keys[0].service_id == saved_api_key.service_id
def test_get_api_key_should_raise_exception_when_service_does_not_exist(notify_api, notify_db, notify_db_session,
sample_service):
try:
get_model_api_keys(sample_service.id)
fail("Should have thrown a NoResultFound exception")
except NoResultFound:
pass
def test_get_api_key_should_return_none_when_service_does_not_exist(notify_api, notify_db, notify_db_session,
sample_service):
assert get_model_api_keys(service_id=sample_service.id, raise_=False) is None
def test_should_return_api_key_for_service(notify_api, notify_db, notify_db_session, sample_api_key):
api_key = get_model_api_keys(sample_api_key.service_id)
assert api_key == sample_api_key
def test_should_return_unsigned_api_key_for_service_id(notify_api,
notify_db,
notify_db_session,
sample_api_key):
unsigned_api_key = get_unsigned_secret(sample_api_key.service_id)
assert sample_api_key.secret != unsigned_api_key
assert unsigned_api_key == _get_secret(sample_api_key.secret)

View File

@@ -1,62 +0,0 @@
import uuid
from app.dao.tokens_dao import (save_model_token, get_model_tokens, get_unsigned_token, _generate_token, _get_token)
from datetime import datetime
from app.models import Token
from pytest import fail
from sqlalchemy.orm.exc import NoResultFound
def test_token_is_signed_and_can_be_read_again(notify_api):
import uuid
with notify_api.test_request_context():
token = str(uuid.uuid4())
signed_token = _generate_token(token=token)
assert token == _get_token(signed_token)
assert signed_token != token
def test_save_token_should_create_new_token(notify_api, notify_db, notify_db_session, sample_service):
api_token = Token(**{'service_id': sample_service.id})
save_model_token(api_token)
all_tokens = get_model_tokens()
assert len(all_tokens) == 1
assert all_tokens[0] == api_token
def test_save_token_should_update_the_token(notify_api, notify_db, notify_db_session, sample_token):
now = datetime.utcnow()
saved_token = get_model_tokens(sample_token.service_id)
save_model_token(saved_token, update_dict={'id': saved_token.id, 'expiry_date': now})
all_tokens = get_model_tokens()
assert len(all_tokens) == 1
assert all_tokens[0].expiry_date == now
assert all_tokens[0].token == saved_token.token
assert all_tokens[0].id == saved_token.id
assert all_tokens[0].service_id == saved_token.service_id
def test_get_token_should_raise_exception_when_service_does_not_exist(notify_api, notify_db, notify_db_session,
sample_service):
try:
get_model_tokens(sample_service.id)
fail("Should have thrown a NoResultFound exception")
except NoResultFound:
pass
def test_get_token_should_return_none_when_service_does_not_exist(notify_api, notify_db, notify_db_session,
sample_service):
assert get_model_tokens(service_id=sample_service.id, raise_=False) is None
def test_should_return_token_for_service(notify_api, notify_db, notify_db_session, sample_token):
token = get_model_tokens(sample_token.service_id)
assert token == sample_token
def test_should_return_unsigned_token_for_service_id(notify_api, notify_db, notify_db_session,
sample_token):
unsigned_token = get_unsigned_token(sample_token.service_id)
assert sample_token.token != unsigned_token
assert unsigned_token == _get_token(sample_token.token)

View File

@@ -1,7 +1,7 @@
import json
from flask import url_for
from app.dao.services_dao import save_model_service
from app.models import (Service, Token, Template)
from app.models import (Service, ApiKey, Template)
from tests import create_authorization_header
from tests.app.conftest import sample_user as create_sample_user
@@ -70,7 +70,6 @@ def test_post_service(notify_api, notify_db, notify_db_session, sample_user, sam
json_resp = json.loads(resp.get_data(as_text=True))
assert json_resp['data']['name'] == service.name
assert json_resp['data']['limit'] == service.limit
assert json_resp['token'] is not None
def test_post_service_multiple_users(notify_api, notify_db, notify_db_session, sample_user, sample_admin_service_id):
@@ -311,76 +310,84 @@ def test_delete_service_not_exists(notify_api, notify_db, notify_db_session, sam
assert Service.query.count() == 2
def test_renew_token_should_return_token_when_service_does_not_have_a_valid_token(notify_api, notify_db,
notify_db_session,
sample_service,
sample_admin_service_id):
def test_renew_api_key_should_create_new_api_key_for_service(notify_api, notify_db,
notify_db_session,
sample_service,
sample_admin_service_id):
with notify_api.test_request_context():
with notify_api.test_client() as client:
data = {'name': 'some secret name'}
auth_header = create_authorization_header(service_id=sample_admin_service_id,
path=url_for('service.renew_token', service_id=sample_service.id),
method='POST')
response = client.post(url_for('service.renew_token', service_id=sample_service.id),
path=url_for('service.renew_api_key',
service_id=sample_service.id),
method='POST',
request_body=json.dumps(data))
response = client.post(url_for('service.renew_api_key', service_id=sample_service.id),
data=json.dumps(data),
headers=[('Content-Type', 'application/json'), auth_header])
assert response.status_code == 201
assert response.get_data is not None
saved_token = Token.query.filter_by(service_id=sample_service.id).first()
assert saved_token.service_id == sample_service.id
saved_api_key = ApiKey.query.filter_by(service_id=sample_service.id).first()
assert saved_api_key.service_id == sample_service.id
assert saved_api_key.name == 'some secret name'
def test_renew_token_should_expire_the_old_token_and_create_a_new_token(notify_api, notify_db, notify_db_session,
sample_token, sample_admin_service_id):
def test_renew_api_key_should_expire_the_old_api_key_and_create_a_new_api_key(notify_api, notify_db, notify_db_session,
sample_api_key, sample_admin_service_id):
with notify_api.test_request_context():
with notify_api.test_client() as client:
assert Token.query.count() == 2
assert ApiKey.query.count() == 2
data = {'name': 'some secret name'}
auth_header = create_authorization_header(service_id=sample_admin_service_id,
path=url_for('service.renew_token',
service_id=sample_token.service_id),
method='POST')
path=url_for('service.renew_api_key',
service_id=sample_api_key.service_id),
method='POST',
request_body=json.dumps(data))
response = client.post(url_for('service.renew_token', service_id=sample_token.service_id),
response = client.post(url_for('service.renew_api_key', service_id=sample_api_key.service_id),
data=json.dumps(data),
headers=[('Content-Type', 'application/json'), auth_header])
assert response.status_code == 201
assert Token.query.count() == 3
all_tokens = Token.query.filter_by(service_id=sample_token.service_id).all()
for x in all_tokens:
if x.id == sample_token.id:
assert ApiKey.query.count() == 3
all_api_keys = ApiKey.query.filter_by(service_id=sample_api_key.service_id).all()
for x in all_api_keys:
if x.id == sample_api_key.id:
assert x.expiry_date is not None
else:
assert x.expiry_date is None
assert x.token is not sample_token.token
assert x.secret is not sample_api_key.secret
def test_create_token_should_return_error_when_service_does_not_exist(notify_api, notify_db, notify_db_session,
sample_service, sample_admin_service_id):
def test_renew_api_key_should_return_error_when_service_does_not_exist(notify_api, notify_db, notify_db_session,
sample_service, sample_admin_service_id):
with notify_api.test_request_context():
with notify_api.test_client() as client:
auth_header = create_authorization_header(service_id=sample_admin_service_id,
path=url_for('service.renew_token', service_id="123"),
path=url_for('service.renew_api_key', service_id="123"),
method='POST')
response = client.post(url_for('service.renew_token', service_id=123),
response = client.post(url_for('service.renew_api_key', service_id=123),
headers=[('Content-Type', 'application/json'), auth_header])
assert response.status_code == 404
def test_revoke_token_should_expire_token_for_service(notify_api, notify_db, notify_db_session,
sample_token, sample_admin_service_id):
def test_revoke_api_key_should_expire_api_key_for_service(notify_api, notify_db, notify_db_session,
sample_api_key, sample_admin_service_id):
with notify_api.test_request_context():
with notify_api.test_client() as client:
assert Token.query.count() == 2
assert ApiKey.query.count() == 2
auth_header = create_authorization_header(service_id=sample_admin_service_id,
path=url_for('service.revoke_token',
service_id=sample_token.service_id),
path=url_for('service.revoke_api_key',
service_id=sample_api_key.service_id),
method='POST')
response = client.post(url_for('service.revoke_token', service_id=sample_token.service_id),
response = client.post(url_for('service.revoke_api_key', service_id=sample_api_key.service_id),
headers=[auth_header])
assert response.status_code == 202
tokens_for_service = Token.query.filter_by(service_id=sample_token.service_id).first()
assert tokens_for_service.expiry_date is not None
api_keys_for_service = ApiKey.query.filter_by(service_id=sample_api_key.service_id).first()
assert api_keys_for_service.expiry_date is not None
def test_create_service_should_create_new_token_for_service(notify_api, notify_db, notify_db_session, sample_user,
sample_admin_service_id):
def test_create_service_should_create_new_service_for_user(notify_api, notify_db, notify_db_session, sample_user,
sample_admin_service_id):
with notify_api.test_request_context():
with notify_api.test_client() as client:
data = {
@@ -394,12 +401,10 @@ def test_create_service_should_create_new_token_for_service(notify_api, notify_d
method='POST',
request_body=json.dumps(data))
headers = [('Content-Type', 'application/json'), auth_header]
assert Token.query.count() == 1
resp = client.post(url_for('service.create_service'),
data=json.dumps(data),
headers=headers)
assert resp.status_code == 201
assert Token.query.count() == 2
def test_create_template(notify_api, notify_db, notify_db_session, sample_service, sample_admin_service_id):