mirror of
https://github.com/GSA/notifications-api.git
synced 2026-02-04 10:21:14 -05:00
Merge pull request #255 from alphagov/api-keys-history
Added version history to api keys.
This commit is contained in:
@@ -1,19 +1,29 @@
|
|||||||
|
import uuid
|
||||||
from flask import current_app
|
from flask import current_app
|
||||||
from itsdangerous import URLSafeSerializer
|
from itsdangerous import URLSafeSerializer
|
||||||
|
|
||||||
from app import db
|
from app import db
|
||||||
from app.models import ApiKey
|
from app.models import ApiKey
|
||||||
|
|
||||||
|
from app.dao.dao_utils import (
|
||||||
|
transactional,
|
||||||
|
versioned
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@transactional
|
||||||
|
@versioned
|
||||||
def save_model_api_key(api_key, update_dict={}):
|
def save_model_api_key(api_key, update_dict={}):
|
||||||
if update_dict:
|
if update_dict:
|
||||||
if update_dict['id']:
|
update_dict.pop('id', None)
|
||||||
del update_dict['id']
|
for key, value in update_dict.items():
|
||||||
db.session.query(ApiKey).filter_by(id=api_key.id).update(update_dict)
|
setattr(api_key, key, value)
|
||||||
|
db.session.add(api_key)
|
||||||
else:
|
else:
|
||||||
|
if not api_key.id:
|
||||||
|
api_key.id = uuid.uuid4() # must be set now so version history model can use same id
|
||||||
api_key.secret = _generate_secret()
|
api_key.secret = _generate_secret()
|
||||||
db.session.add(api_key)
|
db.session.add(api_key)
|
||||||
db.session.commit()
|
|
||||||
|
|
||||||
|
|
||||||
def get_model_api_keys(service_id, id=None):
|
def get_model_api_keys(service_id, id=None):
|
||||||
|
|||||||
@@ -1,22 +1,6 @@
|
|||||||
import datetime
|
import itertools
|
||||||
|
|
||||||
from functools import wraps
|
from functools import wraps
|
||||||
|
from app.history_meta import versioned_objects, create_history
|
||||||
|
|
||||||
def create_history(obj):
|
|
||||||
history_mapper = obj.__history_mapper__
|
|
||||||
history_model = history_mapper.class_
|
|
||||||
history = history_model()
|
|
||||||
if obj.version:
|
|
||||||
obj.version += 1
|
|
||||||
else:
|
|
||||||
obj.version = 1
|
|
||||||
obj.created_at = datetime.datetime.now()
|
|
||||||
for prop in history_mapper.iterate_properties:
|
|
||||||
if obj.__mapper__.get_property(prop.key):
|
|
||||||
setattr(history, prop.key, getattr(obj, prop.key))
|
|
||||||
history.created_by_id = obj.created_by.id
|
|
||||||
return history
|
|
||||||
|
|
||||||
|
|
||||||
def transactional(func):
|
def transactional(func):
|
||||||
@@ -37,12 +21,10 @@ def transactional(func):
|
|||||||
def versioned(func):
|
def versioned(func):
|
||||||
@wraps(func)
|
@wraps(func)
|
||||||
def record_version(*args, **kwargs):
|
def record_version(*args, **kwargs):
|
||||||
import itertools
|
|
||||||
from app import db
|
from app import db
|
||||||
from app.history_meta import versioned_objects
|
|
||||||
from app.dao.dao_utils import create_history
|
|
||||||
func(*args, **kwargs)
|
func(*args, **kwargs)
|
||||||
for obj in versioned_objects(itertools.chain(db.session.new, db.session.dirty)):
|
history_objects = [create_history(obj) for obj in
|
||||||
history = create_history(obj)
|
versioned_objects(itertools.chain(db.session.new, db.session.dirty))]
|
||||||
db.session.add(history)
|
for h_obj in history_objects:
|
||||||
|
db.session.add(h_obj)
|
||||||
return record_version
|
return record_version
|
||||||
|
|||||||
@@ -14,10 +14,10 @@ Lastly when to create a version is done manually in dao_utils version decorator
|
|||||||
session events.
|
session events.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
import datetime
|
||||||
|
|
||||||
from sqlalchemy.ext.declarative import declared_attr
|
from sqlalchemy.ext.declarative import declared_attr
|
||||||
from sqlalchemy.orm import mapper
|
from sqlalchemy.orm import mapper, attributes, object_mapper
|
||||||
|
from sqlalchemy.orm.properties import RelationshipProperty, ColumnProperty
|
||||||
from sqlalchemy import Table, Column, ForeignKeyConstraint, Integer
|
from sqlalchemy import Table, Column, ForeignKeyConstraint, Integer
|
||||||
from sqlalchemy import util
|
from sqlalchemy import util
|
||||||
|
|
||||||
@@ -160,8 +160,60 @@ class Versioned(object):
|
|||||||
return mp
|
return mp
|
||||||
return map
|
return map
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def get_history_model(cls):
|
||||||
|
history_mapper = cls.__history_mapper__
|
||||||
|
return history_mapper.class_
|
||||||
|
|
||||||
|
|
||||||
def versioned_objects(iter):
|
def versioned_objects(iter):
|
||||||
for obj in iter:
|
for obj in iter:
|
||||||
if hasattr(obj, '__history_mapper__'):
|
if hasattr(obj, '__history_mapper__'):
|
||||||
yield obj
|
yield obj
|
||||||
|
|
||||||
|
|
||||||
|
def create_history(obj):
|
||||||
|
obj_mapper = object_mapper(obj)
|
||||||
|
history_mapper = obj.__history_mapper__
|
||||||
|
history_cls = history_mapper.class_
|
||||||
|
history = history_cls()
|
||||||
|
|
||||||
|
obj_state = attributes.instance_state(obj)
|
||||||
|
data = {}
|
||||||
|
|
||||||
|
for prop in obj_mapper.iterate_properties:
|
||||||
|
|
||||||
|
# expired object attributes and also deferred cols might not
|
||||||
|
# be in the dict. force it them load no matter what by using getattr().
|
||||||
|
if prop.key not in obj_state.dict:
|
||||||
|
getattr(obj, prop.key)
|
||||||
|
|
||||||
|
# if prop is a normal col just set it on history model
|
||||||
|
if isinstance(prop, ColumnProperty):
|
||||||
|
if not data.get(prop.key):
|
||||||
|
data[prop.key] = getattr(obj, prop.key)
|
||||||
|
|
||||||
|
# if the prop is a relationship property and there is a
|
||||||
|
# corresponding prop on hist object then set the
|
||||||
|
# relevant "_id" prop to the id of the current object.prop.id.
|
||||||
|
# This is so foreign keys get set on history when
|
||||||
|
# the source object is new and therefore property foo_id does
|
||||||
|
# not yet have a value before insert
|
||||||
|
|
||||||
|
elif isinstance(prop, RelationshipProperty):
|
||||||
|
if hasattr(history, prop.key+'_id'):
|
||||||
|
data[prop.key+'_id'] = getattr(obj, prop.key).id
|
||||||
|
|
||||||
|
if not obj.version:
|
||||||
|
obj.version = 1
|
||||||
|
obj.created_at = datetime.datetime.now()
|
||||||
|
else:
|
||||||
|
obj.version += 1
|
||||||
|
|
||||||
|
data['version'] = obj.version
|
||||||
|
data['created_at'] = obj.created_at
|
||||||
|
|
||||||
|
for key, value in data.items():
|
||||||
|
setattr(history, key, value)
|
||||||
|
|
||||||
|
return history
|
||||||
|
|||||||
@@ -96,13 +96,8 @@ class Service(db.Model, Versioned):
|
|||||||
created_by = db.relationship('User')
|
created_by = db.relationship('User')
|
||||||
created_by_id = db.Column(UUID(as_uuid=True), db.ForeignKey('users.id'), index=True, nullable=False)
|
created_by_id = db.Column(UUID(as_uuid=True), db.ForeignKey('users.id'), index=True, nullable=False)
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def get_history_model(cls):
|
|
||||||
history_mapper = cls.__history_mapper__
|
|
||||||
return history_mapper.class_
|
|
||||||
|
|
||||||
|
class ApiKey(db.Model, Versioned):
|
||||||
class ApiKey(db.Model):
|
|
||||||
__tablename__ = 'api_keys'
|
__tablename__ = 'api_keys'
|
||||||
|
|
||||||
id = db.Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
|
id = db.Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
|
||||||
@@ -111,6 +106,20 @@ class ApiKey(db.Model):
|
|||||||
service_id = db.Column(UUID(as_uuid=True), db.ForeignKey('services.id'), index=True, nullable=False)
|
service_id = db.Column(UUID(as_uuid=True), db.ForeignKey('services.id'), index=True, nullable=False)
|
||||||
service = db.relationship('Service', backref=db.backref('api_keys', lazy='dynamic'))
|
service = db.relationship('Service', backref=db.backref('api_keys', lazy='dynamic'))
|
||||||
expiry_date = db.Column(db.DateTime)
|
expiry_date = db.Column(db.DateTime)
|
||||||
|
created_at = db.Column(
|
||||||
|
db.DateTime,
|
||||||
|
index=False,
|
||||||
|
unique=False,
|
||||||
|
nullable=False,
|
||||||
|
default=datetime.datetime.now)
|
||||||
|
updated_at = db.Column(
|
||||||
|
db.DateTime,
|
||||||
|
index=False,
|
||||||
|
unique=False,
|
||||||
|
nullable=True,
|
||||||
|
onupdate=datetime.datetime.now)
|
||||||
|
created_by = db.relationship('User')
|
||||||
|
created_by_id = db.Column(UUID(as_uuid=True), db.ForeignKey('users.id'), index=True, nullable=False)
|
||||||
|
|
||||||
__table_args__ = (
|
__table_args__ = (
|
||||||
UniqueConstraint('service_id', 'name', name='uix_service_to_key_name'),
|
UniqueConstraint('service_id', 'name', name='uix_service_to_key_name'),
|
||||||
|
|||||||
@@ -97,21 +97,21 @@ def update_service(service_id):
|
|||||||
def renew_api_key(service_id=None):
|
def renew_api_key(service_id=None):
|
||||||
fetched_service = dao_fetch_service_by_id(service_id=service_id)
|
fetched_service = dao_fetch_service_by_id(service_id=service_id)
|
||||||
|
|
||||||
# create a new one
|
valid_api_key, errors = api_key_schema.load(request.get_json())
|
||||||
# TODO: what validation should be done here?
|
if errors:
|
||||||
secret_name = request.get_json()['name']
|
return jsonify(result="error", message=errors), 400
|
||||||
key = ApiKey(service=fetched_service, name=secret_name)
|
valid_api_key.service = fetched_service
|
||||||
save_model_api_key(key)
|
|
||||||
|
|
||||||
unsigned_api_key = get_unsigned_secret(key.id)
|
save_model_api_key(valid_api_key)
|
||||||
|
|
||||||
|
unsigned_api_key = get_unsigned_secret(valid_api_key.id)
|
||||||
return jsonify(data=unsigned_api_key), 201
|
return jsonify(data=unsigned_api_key), 201
|
||||||
|
|
||||||
|
|
||||||
@service.route('/<uuid:service_id>/api-key/revoke/<uuid:api_key_id>', methods=['POST'])
|
@service.route('/<uuid:service_id>/api-key/revoke/<uuid:api_key_id>', methods=['POST'])
|
||||||
def revoke_api_key(service_id, api_key_id):
|
def revoke_api_key(service_id, api_key_id):
|
||||||
service_api_key = get_model_api_keys(service_id=service_id, id=api_key_id)
|
service_api_key = get_model_api_keys(service_id=service_id, id=api_key_id)
|
||||||
|
save_model_api_key(service_api_key, update_dict={'expiry_date': datetime.utcnow()})
|
||||||
save_model_api_key(service_api_key, update_dict={'id': service_api_key.id, 'expiry_date': datetime.utcnow()})
|
|
||||||
return jsonify(), 202
|
return jsonify(), 202
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
64
migrations/versions/0006_api_keys_history.py
Normal file
64
migrations/versions/0006_api_keys_history.py
Normal file
@@ -0,0 +1,64 @@
|
|||||||
|
"""empty message
|
||||||
|
|
||||||
|
Revision ID: 0006_api_keys_history
|
||||||
|
Revises: 0005_add_provider_stats
|
||||||
|
Create Date: 2016-04-20 17:21:38.541766
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
# revision identifiers, used by Alembic.
|
||||||
|
revision = '0006_api_keys_history'
|
||||||
|
down_revision = '0005_add_provider_stats'
|
||||||
|
|
||||||
|
from alembic import op
|
||||||
|
import sqlalchemy as sa
|
||||||
|
from sqlalchemy.dialects import postgresql
|
||||||
|
|
||||||
|
def upgrade():
|
||||||
|
### commands auto generated by Alembic - please adjust! ###
|
||||||
|
op.create_table('api_keys_history',
|
||||||
|
sa.Column('id', postgresql.UUID(as_uuid=True), nullable=False),
|
||||||
|
sa.Column('name', sa.String(length=255), nullable=False),
|
||||||
|
sa.Column('secret', sa.String(length=255), nullable=False),
|
||||||
|
sa.Column('service_id', postgresql.UUID(as_uuid=True), nullable=False),
|
||||||
|
sa.Column('expiry_date', sa.DateTime(), nullable=True),
|
||||||
|
sa.Column('created_at', sa.DateTime(), nullable=True),
|
||||||
|
sa.Column('updated_at', sa.DateTime(), nullable=True),
|
||||||
|
sa.Column('created_by_id', postgresql.UUID(as_uuid=True), nullable=True),
|
||||||
|
sa.Column('version', sa.Integer(), autoincrement=False, nullable=False),
|
||||||
|
sa.PrimaryKeyConstraint('id', 'version')
|
||||||
|
)
|
||||||
|
|
||||||
|
op.create_index(op.f('ix_api_keys_history_created_by_id'), 'api_keys_history', ['created_by_id'], unique=False)
|
||||||
|
op.create_index(op.f('ix_api_keys_history_service_id'), 'api_keys_history', ['service_id'], unique=False)
|
||||||
|
op.add_column('api_keys', sa.Column('created_at', sa.DateTime(), nullable=True))
|
||||||
|
op.add_column('api_keys', sa.Column('created_by_id', postgresql.UUID(as_uuid=True), nullable=True))
|
||||||
|
op.add_column('api_keys', sa.Column('updated_at', sa.DateTime(), nullable=True))
|
||||||
|
op.add_column('api_keys', sa.Column('version', sa.Integer(), nullable=True))
|
||||||
|
|
||||||
|
op.get_bind()
|
||||||
|
op.execute('UPDATE api_keys SET created_by_id = (SELECT user_id FROM user_to_service WHERE api_keys.service_id = user_to_service.service_id LIMIT 1)')
|
||||||
|
op.execute('UPDATE api_keys SET version = 1, created_at = now()')
|
||||||
|
op.execute('INSERT INTO api_keys_history (id, name, secret, service_id, expiry_date, created_at, updated_at, created_by_id, version) SELECT id, name, secret, service_id, expiry_date, created_at, updated_at, created_by_id, version FROM api_keys')
|
||||||
|
|
||||||
|
op.alter_column('api_keys', 'created_at', nullable=False)
|
||||||
|
op.alter_column('api_keys', 'created_by_id', nullable=False)
|
||||||
|
op.alter_column('api_keys', 'version', nullable=False)
|
||||||
|
|
||||||
|
op.create_index(op.f('ix_api_keys_created_by_id'), 'api_keys', ['created_by_id'], unique=False)
|
||||||
|
op.create_foreign_key('fk_api_keys_created_by_id', 'api_keys', 'users', ['created_by_id'], ['id'])
|
||||||
|
### end Alembic commands ###
|
||||||
|
|
||||||
|
|
||||||
|
def downgrade():
|
||||||
|
### commands auto generated by Alembic - please adjust! ###
|
||||||
|
op.drop_constraint('fk_api_keys_created_by_id', 'api_keys', type_='foreignkey')
|
||||||
|
op.drop_index(op.f('ix_api_keys_created_by_id'), table_name='api_keys')
|
||||||
|
op.drop_column('api_keys', 'version')
|
||||||
|
op.drop_column('api_keys', 'updated_at')
|
||||||
|
op.drop_column('api_keys', 'created_by_id')
|
||||||
|
op.drop_column('api_keys', 'created_at')
|
||||||
|
op.drop_index(op.f('ix_api_keys_history_service_id'), table_name='api_keys_history')
|
||||||
|
op.drop_index(op.f('ix_api_keys_history_created_by_id'), table_name='api_keys_history')
|
||||||
|
op.drop_table('api_keys_history')
|
||||||
|
### end Alembic commands ###
|
||||||
@@ -105,7 +105,10 @@ def test_should_allow_valid_token_for_request_with_path_params(notify_api, sampl
|
|||||||
def test_should_allow_valid_token_when_service_has_multiple_keys(notify_api, sample_api_key):
|
def test_should_allow_valid_token_when_service_has_multiple_keys(notify_api, sample_api_key):
|
||||||
with notify_api.test_request_context():
|
with notify_api.test_request_context():
|
||||||
with notify_api.test_client() as client:
|
with notify_api.test_client() as client:
|
||||||
data = {'service_id': sample_api_key.service_id, 'name': 'some key name'}
|
data = {'service': sample_api_key.service,
|
||||||
|
'name': 'some key name',
|
||||||
|
'created_by': sample_api_key.created_by
|
||||||
|
}
|
||||||
api_key = ApiKey(**data)
|
api_key = ApiKey(**data)
|
||||||
save_model_api_key(api_key)
|
save_model_api_key(api_key)
|
||||||
token = __create_get_token(sample_api_key.service_id)
|
token = __create_get_token(sample_api_key.service_id)
|
||||||
@@ -185,11 +188,17 @@ def test_authentication_passes_when_service_has_multiple_keys_some_expired(
|
|||||||
sample_api_key):
|
sample_api_key):
|
||||||
with notify_api.test_request_context():
|
with notify_api.test_request_context():
|
||||||
with notify_api.test_client() as client:
|
with notify_api.test_client() as client:
|
||||||
exprired_key = {'service_id': sample_api_key.service_id, 'name': 'expired_key',
|
expired_key_data = {'service': sample_api_key.service,
|
||||||
'expiry_date': datetime.now()}
|
'name': 'expired_key',
|
||||||
expired_api_key = ApiKey(**exprired_key)
|
'expiry_date': datetime.now(),
|
||||||
save_model_api_key(expired_api_key)
|
'created_by': sample_api_key.created_by
|
||||||
another_key = {'service_id': sample_api_key.service_id, 'name': 'another_key'}
|
}
|
||||||
|
expired_key = ApiKey(**expired_key_data)
|
||||||
|
save_model_api_key(expired_key)
|
||||||
|
another_key = {'service': sample_api_key.service,
|
||||||
|
'name': 'another_key',
|
||||||
|
'created_by': sample_api_key.created_by
|
||||||
|
}
|
||||||
api_key = ApiKey(**another_key)
|
api_key = ApiKey(**another_key)
|
||||||
save_model_api_key(api_key)
|
save_model_api_key(api_key)
|
||||||
token = create_jwt_token(
|
token = create_jwt_token(
|
||||||
@@ -209,10 +218,16 @@ def test_authentication_returns_token_expired_when_service_uses_expired_key_and_
|
|||||||
sample_api_key):
|
sample_api_key):
|
||||||
with notify_api.test_request_context():
|
with notify_api.test_request_context():
|
||||||
with notify_api.test_client() as client:
|
with notify_api.test_client() as client:
|
||||||
expired_key = {'service_id': sample_api_key.service_id, 'name': 'expired_key'}
|
expired_key = {'service': sample_api_key.service,
|
||||||
|
'name': 'expired_key',
|
||||||
|
'created_by': sample_api_key.created_by
|
||||||
|
}
|
||||||
expired_api_key = ApiKey(**expired_key)
|
expired_api_key = ApiKey(**expired_key)
|
||||||
save_model_api_key(expired_api_key)
|
save_model_api_key(expired_api_key)
|
||||||
another_key = {'service_id': sample_api_key.service_id, 'name': 'another_key'}
|
another_key = {'service': sample_api_key.service,
|
||||||
|
'name': 'another_key',
|
||||||
|
'created_by': sample_api_key.created_by
|
||||||
|
}
|
||||||
api_key = ApiKey(**another_key)
|
api_key = ApiKey(**another_key)
|
||||||
save_model_api_key(api_key)
|
save_model_api_key(api_key)
|
||||||
token = create_jwt_token(
|
token = create_jwt_token(
|
||||||
@@ -222,9 +237,10 @@ def test_authentication_returns_token_expired_when_service_uses_expired_key_and_
|
|||||||
client_id=str(sample_api_key.service_id))
|
client_id=str(sample_api_key.service_id))
|
||||||
# expire the key
|
# expire the key
|
||||||
expire_the_key = {'id': expired_api_key.id,
|
expire_the_key = {'id': expired_api_key.id,
|
||||||
'service_id': str(sample_api_key.service_id),
|
'service': sample_api_key.service,
|
||||||
'name': 'expired_key',
|
'name': 'expired_key',
|
||||||
'expiry_date': datetime.now() + timedelta(hours=-2)}
|
'expiry_date': datetime.now() + timedelta(hours=-2),
|
||||||
|
'created_by': sample_api_key.created_by}
|
||||||
save_model_api_key(expired_api_key, expire_the_key)
|
save_model_api_key(expired_api_key, expire_the_key)
|
||||||
response = client.get(
|
response = client.get(
|
||||||
'/service',
|
'/service',
|
||||||
|
|||||||
@@ -201,7 +201,7 @@ def sample_api_key(notify_db,
|
|||||||
service=None):
|
service=None):
|
||||||
if service is None:
|
if service is None:
|
||||||
service = sample_service(notify_db, notify_db_session)
|
service = sample_service(notify_db, notify_db_session)
|
||||||
data = {'service_id': service.id, 'name': uuid.uuid4()}
|
data = {'service': service, 'name': uuid.uuid4(), 'created_by': service.created_by}
|
||||||
api_key = ApiKey(**data)
|
api_key = ApiKey(**data)
|
||||||
save_model_api_key(api_key)
|
save_model_api_key(api_key)
|
||||||
return api_key
|
return api_key
|
||||||
|
|||||||
@@ -21,16 +21,27 @@ def test_secret_is_signed_and_can_be_read_again(notify_api):
|
|||||||
assert signed_secret != token
|
assert signed_secret != token
|
||||||
|
|
||||||
|
|
||||||
def test_save_api_key_should_create_new_api_key(notify_api, notify_db, notify_db_session, sample_service):
|
def test_save_api_key_should_create_new_api_key_and_history(notify_api, notify_db, notify_db_session, sample_service):
|
||||||
api_key = ApiKey(**{'service_id': sample_service.id, 'name': sample_service.name})
|
api_key = ApiKey(**{'service': sample_service,
|
||||||
|
'name': sample_service.name,
|
||||||
|
'created_by': sample_service.created_by})
|
||||||
save_model_api_key(api_key)
|
save_model_api_key(api_key)
|
||||||
|
|
||||||
all_api_keys = get_model_api_keys(service_id=sample_service.id)
|
all_api_keys = get_model_api_keys(service_id=sample_service.id)
|
||||||
assert len(all_api_keys) == 1
|
assert len(all_api_keys) == 1
|
||||||
assert all_api_keys[0] == api_key
|
assert all_api_keys[0] == api_key
|
||||||
|
assert api_key.version == 1
|
||||||
|
|
||||||
|
all_history = api_key.get_history_model().query.all()
|
||||||
|
assert len(all_history) == 1
|
||||||
|
assert all_history[0].id == api_key.id
|
||||||
|
assert all_history[0].version == api_key.version
|
||||||
|
|
||||||
|
|
||||||
def test_save_api_key_should_update_the_api_key(notify_api, notify_db, notify_db_session, sample_api_key):
|
def test_expire_api_key_should_update_the_api_key_and_create_history_record(notify_api,
|
||||||
|
notify_db,
|
||||||
|
notify_db_session,
|
||||||
|
sample_api_key):
|
||||||
now = datetime.utcnow()
|
now = datetime.utcnow()
|
||||||
saved_api_key = get_model_api_keys(service_id=sample_api_key.service_id, id=sample_api_key.id)
|
saved_api_key = get_model_api_keys(service_id=sample_api_key.service_id, id=sample_api_key.id)
|
||||||
save_model_api_key(saved_api_key, update_dict={'id': saved_api_key.id, 'expiry_date': now})
|
save_model_api_key(saved_api_key, update_dict={'id': saved_api_key.id, 'expiry_date': now})
|
||||||
@@ -41,6 +52,14 @@ def test_save_api_key_should_update_the_api_key(notify_api, notify_db, notify_db
|
|||||||
assert all_api_keys[0].id == saved_api_key.id
|
assert all_api_keys[0].id == saved_api_key.id
|
||||||
assert all_api_keys[0].service_id == saved_api_key.service_id
|
assert all_api_keys[0].service_id == saved_api_key.service_id
|
||||||
|
|
||||||
|
all_history = saved_api_key.get_history_model().query.all()
|
||||||
|
assert len(all_history) == 2
|
||||||
|
assert all_history[0].id == saved_api_key.id
|
||||||
|
assert all_history[1].id == saved_api_key.id
|
||||||
|
sorted_all_history = sorted(all_history, key=lambda hist: hist.version)
|
||||||
|
sorted_all_history[0].version = 1
|
||||||
|
sorted_all_history[1].version = 2
|
||||||
|
|
||||||
|
|
||||||
def test_get_api_key_should_raise_exception_when_api_key_does_not_exist(notify_api,
|
def test_get_api_key_should_raise_exception_when_api_key_does_not_exist(notify_api,
|
||||||
notify_db,
|
notify_db,
|
||||||
@@ -83,8 +102,10 @@ def test_should_not_allow_duplicate_key_names_per_service(notify_api,
|
|||||||
notify_db_session,
|
notify_db_session,
|
||||||
sample_api_key,
|
sample_api_key,
|
||||||
fake_uuid):
|
fake_uuid):
|
||||||
api_key = ApiKey(
|
api_key = ApiKey(**{'id': fake_uuid,
|
||||||
**{'id': fake_uuid, 'service_id': sample_api_key.service_id, 'name': sample_api_key.name})
|
'service': sample_api_key.service,
|
||||||
|
'name': sample_api_key.name,
|
||||||
|
'created_by': sample_api_key.created_by})
|
||||||
try:
|
try:
|
||||||
save_model_api_key(api_key)
|
save_model_api_key(api_key)
|
||||||
fail("should throw IntegrityError")
|
fail("should throw IntegrityError")
|
||||||
|
|||||||
@@ -15,7 +15,7 @@ def test_api_key_should_create_new_api_key_for_service(notify_api, notify_db,
|
|||||||
sample_service):
|
sample_service):
|
||||||
with notify_api.test_request_context():
|
with notify_api.test_request_context():
|
||||||
with notify_api.test_client() as client:
|
with notify_api.test_client() as client:
|
||||||
data = {'name': 'some secret name'}
|
data = {'name': 'some secret name', 'created_by': str(sample_service.created_by.id)}
|
||||||
auth_header = create_authorization_header(path=url_for('service.renew_api_key',
|
auth_header = create_authorization_header(path=url_for('service.renew_api_key',
|
||||||
service_id=sample_service.id),
|
service_id=sample_service.id),
|
||||||
method='POST',
|
method='POST',
|
||||||
@@ -68,7 +68,7 @@ def test_api_key_should_create_multiple_new_api_key_for_service(notify_api, noti
|
|||||||
with notify_api.test_request_context():
|
with notify_api.test_request_context():
|
||||||
with notify_api.test_client() as client:
|
with notify_api.test_client() as client:
|
||||||
assert ApiKey.query.count() == 0
|
assert ApiKey.query.count() == 0
|
||||||
data = {'name': 'some secret name'}
|
data = {'name': 'some secret name', 'created_by': str(sample_service.created_by.id)}
|
||||||
auth_header = create_authorization_header(path=url_for('service.renew_api_key',
|
auth_header = create_authorization_header(path=url_for('service.renew_api_key',
|
||||||
service_id=sample_service.id),
|
service_id=sample_service.id),
|
||||||
method='POST',
|
method='POST',
|
||||||
@@ -78,7 +78,7 @@ def test_api_key_should_create_multiple_new_api_key_for_service(notify_api, noti
|
|||||||
headers=[('Content-Type', 'application/json'), auth_header])
|
headers=[('Content-Type', 'application/json'), auth_header])
|
||||||
assert response.status_code == 201
|
assert response.status_code == 201
|
||||||
assert ApiKey.query.count() == 1
|
assert ApiKey.query.count() == 1
|
||||||
data = {'name': 'another secret name'}
|
data = {'name': 'another secret name', 'created_by': str(sample_service.created_by.id)}
|
||||||
auth_header = create_authorization_header(path=url_for('service.renew_api_key',
|
auth_header = create_authorization_header(path=url_for('service.renew_api_key',
|
||||||
service_id=sample_service.id),
|
service_id=sample_service.id),
|
||||||
method='POST',
|
method='POST',
|
||||||
@@ -97,14 +97,17 @@ def test_get_api_keys_should_return_all_keys_for_service(notify_api, notify_db,
|
|||||||
with notify_api.test_request_context():
|
with notify_api.test_request_context():
|
||||||
with notify_api.test_client() as client:
|
with notify_api.test_client() as client:
|
||||||
another_user = create_user(notify_db, notify_db_session, email='another@it.gov.uk')
|
another_user = create_user(notify_db, notify_db_session, email='another@it.gov.uk')
|
||||||
|
|
||||||
another_service = create_sample_service(notify_db, notify_db_session, service_name='another',
|
another_service = create_sample_service(notify_db, notify_db_session, service_name='another',
|
||||||
user=another_user, email_from='another')
|
user=another_user, email_from='another')
|
||||||
|
# key for another service
|
||||||
create_sample_api_key(notify_db, notify_db_session, service=another_service)
|
create_sample_api_key(notify_db, notify_db_session, service=another_service)
|
||||||
api_key2 = ApiKey(**{'service_id': sample_api_key.service_id, 'name': 'second_api_key'})
|
|
||||||
api_key3 = ApiKey(**{'service_id': sample_api_key.service_id, 'name': 'third_api_key',
|
# this service already has one key, add two more, one expired
|
||||||
'expiry_date': datetime.utcnow() + timedelta(hours=-1)})
|
create_sample_api_key(notify_db, notify_db_session, service=sample_api_key.service)
|
||||||
save_model_api_key(api_key2)
|
one_to_expire = create_sample_api_key(notify_db, notify_db_session, service=sample_api_key.service)
|
||||||
save_model_api_key(api_key3)
|
save_model_api_key(one_to_expire, update_dict={'expiry_date': datetime.utcnow()})
|
||||||
|
|
||||||
assert ApiKey.query.count() == 4
|
assert ApiKey.query.count() == 4
|
||||||
|
|
||||||
auth_header = create_authorization_header(path=url_for('service.get_api_keys',
|
auth_header = create_authorization_header(path=url_for('service.get_api_keys',
|
||||||
|
|||||||
Reference in New Issue
Block a user