mirror of
https://github.com/GSA/notifications-api.git
synced 2025-12-21 07:51:13 -05:00
Update api-key/revoke endpoint to expire the key for the service.
Previously we assumed there was only one api key that was valid.
This commit is contained in:
@@ -16,18 +16,10 @@ def save_model_api_key(api_key, update_dict={}):
|
|||||||
db.session.commit()
|
db.session.commit()
|
||||||
|
|
||||||
|
|
||||||
def get_model_api_keys(service_id=None, raise_=True):
|
def get_model_api_keys(service_id, id=None):
|
||||||
"""
|
if id:
|
||||||
:param raise_: when True query api_keys using one() which will raise NoResultFound exception
|
return ApiKey.query.filter_by(id=id, service_id=service_id, expiry_date=None).one()
|
||||||
when False query api_keys usong first() which will return None and not raise an exception.
|
return ApiKey.query.filter_by(service_id=service_id).all()
|
||||||
"""
|
|
||||||
if service_id:
|
|
||||||
# If expiry date is None the api_key is active
|
|
||||||
if raise_:
|
|
||||||
return ApiKey.query.filter_by(service_id=service_id, expiry_date=None).one()
|
|
||||||
else:
|
|
||||||
return ApiKey.query.filter_by(service_id=service_id, expiry_date=None).first()
|
|
||||||
return ApiKey.query.filter_by().all()
|
|
||||||
|
|
||||||
|
|
||||||
def get_unsigned_secrets(service_id):
|
def get_unsigned_secrets(service_id):
|
||||||
|
|||||||
@@ -96,18 +96,16 @@ def renew_api_key(service_id=None):
|
|||||||
return jsonify(data=unsigned_api_key), 201
|
return jsonify(data=unsigned_api_key), 201
|
||||||
|
|
||||||
|
|
||||||
@service.route('/<int:service_id>/api-key/revoke', methods=['POST'])
|
@service.route('/<int:service_id>/api-key/revoke/<int:api_key_id>', methods=['POST'])
|
||||||
def revoke_api_key(service_id):
|
def revoke_api_key(service_id, api_key_id):
|
||||||
try:
|
try:
|
||||||
get_model_services(service_id=service_id)
|
service_api_key = get_model_api_keys(service_id=service_id, id=api_key_id)
|
||||||
except DataError:
|
except DataError:
|
||||||
return jsonify(result="error", message="Invalid service id"), 400
|
return jsonify(result="error", message="Invalid api key for service"), 400
|
||||||
except NoResultFound:
|
except NoResultFound:
|
||||||
return jsonify(result="error", message="Service not found"), 404
|
return jsonify(result="error", message="Api key not found for service"), 404
|
||||||
|
|
||||||
service_api_key = get_model_api_keys(service_id=service_id, raise_=False)
|
save_model_api_key(service_api_key, update_dict={'id': service_api_key.id, 'expiry_date': datetime.utcnow()})
|
||||||
if service_api_key:
|
|
||||||
save_model_api_key(service_api_key, update_dict={'id': service_api_key.id, 'expiry_date': datetime.now()})
|
|
||||||
return jsonify(), 202
|
return jsonify(), 202
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -25,16 +25,16 @@ def test_save_api_key_should_create_new_api_key(notify_api, notify_db, notify_db
|
|||||||
api_key = ApiKey(**{'service_id': sample_service.id, 'name': sample_service.name})
|
api_key = ApiKey(**{'service_id': sample_service.id, 'name': sample_service.name})
|
||||||
save_model_api_key(api_key)
|
save_model_api_key(api_key)
|
||||||
|
|
||||||
all_api_keys = get_model_api_keys()
|
all_api_keys = get_model_api_keys(service_id=sample_service.id)
|
||||||
assert len(all_api_keys) == 1
|
assert len(all_api_keys) == 1
|
||||||
assert all_api_keys[0] == api_key
|
assert all_api_keys[0] == api_key
|
||||||
|
|
||||||
|
|
||||||
def test_save_api_key_should_update_the_api_key(notify_api, notify_db, notify_db_session, sample_api_key):
|
def test_save_api_key_should_update_the_api_key(notify_api, notify_db, notify_db_session, sample_api_key):
|
||||||
now = datetime.utcnow()
|
now = datetime.utcnow()
|
||||||
saved_api_key = get_model_api_keys(sample_api_key.service_id)
|
saved_api_key = get_model_api_keys(service_id=sample_api_key.service_id, id=sample_api_key.id)
|
||||||
save_model_api_key(saved_api_key, update_dict={'id': saved_api_key.id, 'expiry_date': now})
|
save_model_api_key(saved_api_key, update_dict={'id': saved_api_key.id, 'expiry_date': now})
|
||||||
all_api_keys = get_model_api_keys()
|
all_api_keys = get_model_api_keys(service_id=sample_api_key.service_id)
|
||||||
assert len(all_api_keys) == 1
|
assert len(all_api_keys) == 1
|
||||||
assert all_api_keys[0].expiry_date == now
|
assert all_api_keys[0].expiry_date == now
|
||||||
assert all_api_keys[0].secret == saved_api_key.secret
|
assert all_api_keys[0].secret == saved_api_key.secret
|
||||||
@@ -42,22 +42,17 @@ def test_save_api_key_should_update_the_api_key(notify_api, notify_db, notify_db
|
|||||||
assert all_api_keys[0].service_id == saved_api_key.service_id
|
assert all_api_keys[0].service_id == saved_api_key.service_id
|
||||||
|
|
||||||
|
|
||||||
def test_get_api_key_should_raise_exception_when_service_does_not_exist(notify_api, notify_db, notify_db_session,
|
def test_get_api_key_should_raise_exception_when_api_key_does_not_exist(notify_api, notify_db, notify_db_session,
|
||||||
sample_service):
|
sample_service):
|
||||||
try:
|
try:
|
||||||
get_model_api_keys(sample_service.id)
|
get_model_api_keys(sample_service.id, id=123)
|
||||||
fail("Should have thrown a NoResultFound exception")
|
fail("Should have thrown a NoResultFound exception")
|
||||||
except NoResultFound:
|
except NoResultFound:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
def test_get_api_key_should_return_none_when_service_does_not_exist(notify_api, notify_db, notify_db_session,
|
|
||||||
sample_service):
|
|
||||||
assert get_model_api_keys(service_id=sample_service.id, raise_=False) is None
|
|
||||||
|
|
||||||
|
|
||||||
def test_should_return_api_key_for_service(notify_api, notify_db, notify_db_session, sample_api_key):
|
def test_should_return_api_key_for_service(notify_api, notify_db, notify_db_session, sample_api_key):
|
||||||
api_key = get_model_api_keys(sample_api_key.service_id)
|
api_key = get_model_api_keys(service_id=sample_api_key.service_id, id=sample_api_key.id)
|
||||||
assert api_key == sample_api_key
|
assert api_key == sample_api_key
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -43,12 +43,15 @@ def test_revoke_should_expire_api_key_for_service(notify_api, notify_db, notify_
|
|||||||
with notify_api.test_client() as client:
|
with notify_api.test_client() as client:
|
||||||
assert ApiKey.query.count() == 1
|
assert ApiKey.query.count() == 1
|
||||||
auth_header = create_authorization_header(path=url_for('service.revoke_api_key',
|
auth_header = create_authorization_header(path=url_for('service.revoke_api_key',
|
||||||
service_id=sample_api_key.service_id),
|
service_id=sample_api_key.service_id,
|
||||||
|
api_key_id=sample_api_key.id),
|
||||||
method='POST')
|
method='POST')
|
||||||
response = client.post(url_for('service.revoke_api_key', service_id=sample_api_key.service_id),
|
response = client.post(url_for('service.revoke_api_key',
|
||||||
|
service_id=sample_api_key.service_id,
|
||||||
|
api_key_id=sample_api_key.id),
|
||||||
headers=[auth_header])
|
headers=[auth_header])
|
||||||
assert response.status_code == 202
|
assert response.status_code == 202
|
||||||
api_keys_for_service = ApiKey.query.filter_by(service_id=sample_api_key.service_id).first()
|
api_keys_for_service = ApiKey.query.get(sample_api_key.id)
|
||||||
assert api_keys_for_service.expiry_date is not None
|
assert api_keys_for_service.expiry_date is not None
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user