mirror of
https://github.com/GSA/notifications-api.git
synced 2026-01-30 14:31:57 -05:00
Merge branch 'master' into notification-created-status
This commit is contained in:
@@ -1,7 +1,7 @@
|
||||
import uuid
|
||||
from flask import current_app
|
||||
from notifications_python_client.authentication import create_jwt_token
|
||||
from app.models import ApiKey
|
||||
from app.models import ApiKey, KEY_TYPE_NORMAL
|
||||
from app.dao.api_key_dao import (get_unsigned_secrets, save_model_api_key)
|
||||
from app.dao.services_dao import dao_fetch_service_by_id
|
||||
|
||||
@@ -14,7 +14,12 @@ def create_authorization_header(service_id=None):
|
||||
secret = secrets[0]
|
||||
else:
|
||||
service = dao_fetch_service_by_id(service_id)
|
||||
data = {'service': service, 'name': uuid.uuid4(), 'created_by': service.created_by}
|
||||
data = {
|
||||
'service': service,
|
||||
'name': uuid.uuid4(),
|
||||
'created_by': service.created_by,
|
||||
'key_type': KEY_TYPE_NORMAL
|
||||
}
|
||||
api_key = ApiKey(**data)
|
||||
save_model_api_key(api_key)
|
||||
secret = get_unsigned_secrets(service_id)[0]
|
||||
|
||||
@@ -3,7 +3,7 @@ from datetime import datetime, timedelta
|
||||
from notifications_python_client.authentication import create_jwt_token
|
||||
from flask import json, current_app
|
||||
from app.dao.api_key_dao import get_unsigned_secrets, save_model_api_key, get_unsigned_secret, expire_api_key
|
||||
from app.models import ApiKey
|
||||
from app.models import ApiKey, KEY_TYPE_NORMAL
|
||||
|
||||
|
||||
def test_should_not_allow_request_with_no_token(notify_api):
|
||||
@@ -78,7 +78,8 @@ def test_should_allow_valid_token_when_service_has_multiple_keys(notify_api, sam
|
||||
with notify_api.test_client() as client:
|
||||
data = {'service': sample_api_key.service,
|
||||
'name': 'some key name',
|
||||
'created_by': sample_api_key.created_by
|
||||
'created_by': sample_api_key.created_by,
|
||||
'key_type': KEY_TYPE_NORMAL
|
||||
}
|
||||
api_key = ApiKey(**data)
|
||||
save_model_api_key(api_key)
|
||||
@@ -121,13 +122,15 @@ def test_authentication_passes_when_service_has_multiple_keys_some_expired(
|
||||
expired_key_data = {'service': sample_api_key.service,
|
||||
'name': 'expired_key',
|
||||
'expiry_date': datetime.utcnow(),
|
||||
'created_by': sample_api_key.created_by
|
||||
'created_by': sample_api_key.created_by,
|
||||
'key_type': KEY_TYPE_NORMAL
|
||||
}
|
||||
expired_key = ApiKey(**expired_key_data)
|
||||
save_model_api_key(expired_key)
|
||||
another_key = {'service': sample_api_key.service,
|
||||
'name': 'another_key',
|
||||
'created_by': sample_api_key.created_by
|
||||
'created_by': sample_api_key.created_by,
|
||||
'key_type': KEY_TYPE_NORMAL
|
||||
}
|
||||
api_key = ApiKey(**another_key)
|
||||
save_model_api_key(api_key)
|
||||
@@ -148,13 +151,15 @@ def test_authentication_returns_token_expired_when_service_uses_expired_key_and_
|
||||
with notify_api.test_client() as client:
|
||||
expired_key = {'service': sample_api_key.service,
|
||||
'name': 'expired_key',
|
||||
'created_by': sample_api_key.created_by
|
||||
'created_by': sample_api_key.created_by,
|
||||
'key_type': KEY_TYPE_NORMAL
|
||||
}
|
||||
expired_api_key = ApiKey(**expired_key)
|
||||
save_model_api_key(expired_api_key)
|
||||
another_key = {'service': sample_api_key.service,
|
||||
'name': 'another_key',
|
||||
'created_by': sample_api_key.created_by
|
||||
'created_by': sample_api_key.created_by,
|
||||
'key_type': KEY_TYPE_NORMAL
|
||||
}
|
||||
api_key = ApiKey(**another_key)
|
||||
save_model_api_key(api_key)
|
||||
|
||||
@@ -18,7 +18,8 @@ from app.models import (
|
||||
Permission,
|
||||
ProviderStatistics,
|
||||
ProviderDetails,
|
||||
NotificationStatistics)
|
||||
NotificationStatistics,
|
||||
KEY_TYPE_NORMAL)
|
||||
from app.dao.users_dao import (save_model_user, create_user_code, create_secret_code)
|
||||
from app.dao.services_dao import (dao_create_service, dao_add_user_to_service)
|
||||
from app.dao.templates_dao import dao_create_template
|
||||
@@ -229,10 +230,11 @@ def sample_email_template_with_placeholders(notify_db, notify_db_session):
|
||||
@pytest.fixture(scope='function')
|
||||
def sample_api_key(notify_db,
|
||||
notify_db_session,
|
||||
service=None):
|
||||
service=None,
|
||||
key_type=KEY_TYPE_NORMAL):
|
||||
if service is None:
|
||||
service = sample_service(notify_db, notify_db_session)
|
||||
data = {'service': service, 'name': uuid.uuid4(), 'created_by': service.created_by}
|
||||
data = {'service': service, 'name': uuid.uuid4(), 'created_by': service.created_by, 'key_type': key_type}
|
||||
api_key = ApiKey(**data)
|
||||
save_model_api_key(api_key)
|
||||
return api_key
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
from datetime import datetime
|
||||
|
||||
from pytest import fail
|
||||
import pytest
|
||||
from sqlalchemy.exc import IntegrityError
|
||||
from sqlalchemy.orm.exc import NoResultFound
|
||||
|
||||
from app.dao.api_key_dao import (save_model_api_key,
|
||||
@@ -9,7 +10,7 @@ from app.dao.api_key_dao import (save_model_api_key,
|
||||
get_unsigned_secret,
|
||||
_generate_secret,
|
||||
_get_secret, expire_api_key)
|
||||
from app.models import ApiKey
|
||||
from app.models import ApiKey, KEY_TYPE_NORMAL
|
||||
|
||||
|
||||
def test_secret_is_signed_and_can_be_read_again(notify_api, mocker):
|
||||
@@ -20,13 +21,11 @@ def test_secret_is_signed_and_can_be_read_again(notify_api, mocker):
|
||||
assert signed_secret != 'some_uuid'
|
||||
|
||||
|
||||
def test_save_api_key_should_create_new_api_key_and_history(notify_api,
|
||||
notify_db,
|
||||
notify_db_session,
|
||||
sample_service):
|
||||
def test_save_api_key_should_create_new_api_key_and_history(sample_service):
|
||||
api_key = ApiKey(**{'service': sample_service,
|
||||
'name': sample_service.name,
|
||||
'created_by': sample_service.created_by})
|
||||
'created_by': sample_service.created_by,
|
||||
'key_type': KEY_TYPE_NORMAL})
|
||||
save_model_api_key(api_key)
|
||||
|
||||
all_api_keys = get_model_api_keys(service_id=sample_service.id)
|
||||
@@ -41,8 +40,6 @@ def test_save_api_key_should_create_new_api_key_and_history(notify_api,
|
||||
|
||||
|
||||
def test_expire_api_key_should_update_the_api_key_and_create_history_record(notify_api,
|
||||
notify_db,
|
||||
notify_db_session,
|
||||
sample_api_key):
|
||||
expire_api_key(service_id=sample_api_key.service_id, api_key_id=sample_api_key.id)
|
||||
all_api_keys = get_model_api_keys(service_id=sample_api_key.service_id)
|
||||
@@ -61,16 +58,9 @@ def test_expire_api_key_should_update_the_api_key_and_create_history_record(noti
|
||||
sorted_all_history[1].version = 2
|
||||
|
||||
|
||||
def test_get_api_key_should_raise_exception_when_api_key_does_not_exist(notify_api,
|
||||
notify_db,
|
||||
notify_db_session,
|
||||
sample_service,
|
||||
fake_uuid):
|
||||
try:
|
||||
def test_get_api_key_should_raise_exception_when_api_key_does_not_exist(sample_service, fake_uuid):
|
||||
with pytest.raises(NoResultFound):
|
||||
get_model_api_keys(sample_service.id, id=fake_uuid)
|
||||
fail("Should have thrown a NoResultFound exception")
|
||||
except NoResultFound:
|
||||
pass
|
||||
|
||||
|
||||
def test_should_return_api_key_for_service(notify_api, notify_db, notify_db_session, sample_api_key):
|
||||
@@ -78,43 +68,30 @@ def test_should_return_api_key_for_service(notify_api, notify_db, notify_db_sess
|
||||
assert api_key == sample_api_key
|
||||
|
||||
|
||||
def test_should_return_unsigned_api_keys_for_service_id(notify_api,
|
||||
notify_db,
|
||||
notify_db_session,
|
||||
sample_api_key):
|
||||
def test_should_return_unsigned_api_keys_for_service_id(sample_api_key):
|
||||
unsigned_api_key = get_unsigned_secrets(sample_api_key.service_id)
|
||||
assert len(unsigned_api_key) == 1
|
||||
assert sample_api_key.secret != unsigned_api_key[0]
|
||||
assert unsigned_api_key[0] == _get_secret(sample_api_key.secret)
|
||||
|
||||
|
||||
def test_get_unsigned_secret_returns_key(notify_api,
|
||||
notify_db,
|
||||
notify_db_session,
|
||||
sample_api_key):
|
||||
def test_get_unsigned_secret_returns_key(sample_api_key):
|
||||
unsigned_api_key = get_unsigned_secret(sample_api_key.id)
|
||||
assert sample_api_key.secret != unsigned_api_key
|
||||
assert unsigned_api_key == _get_secret(sample_api_key.secret)
|
||||
|
||||
|
||||
def test_should_not_allow_duplicate_key_names_per_service(notify_api,
|
||||
notify_db,
|
||||
notify_db_session,
|
||||
sample_api_key,
|
||||
fake_uuid):
|
||||
def test_should_not_allow_duplicate_key_names_per_service(sample_api_key, fake_uuid):
|
||||
api_key = ApiKey(**{'id': fake_uuid,
|
||||
'service': sample_api_key.service,
|
||||
'name': sample_api_key.name,
|
||||
'created_by': sample_api_key.created_by})
|
||||
try:
|
||||
'created_by': sample_api_key.created_by,
|
||||
'key_type': KEY_TYPE_NORMAL})
|
||||
with pytest.raises(IntegrityError):
|
||||
save_model_api_key(api_key)
|
||||
fail("should throw IntegrityError")
|
||||
except:
|
||||
pass
|
||||
|
||||
|
||||
def test_save_api_key_should_not_create_new_service_history(notify_api, notify_db, notify_db_session, sample_service):
|
||||
|
||||
def test_save_api_key_should_not_create_new_service_history(sample_service):
|
||||
from app.models import Service
|
||||
|
||||
assert Service.query.count() == 1
|
||||
@@ -122,7 +99,8 @@ def test_save_api_key_should_not_create_new_service_history(notify_api, notify_d
|
||||
|
||||
api_key = ApiKey(**{'service': sample_service,
|
||||
'name': sample_service.name,
|
||||
'created_by': sample_service.created_by})
|
||||
'created_by': sample_service.created_by,
|
||||
'key_type': KEY_TYPE_NORMAL})
|
||||
save_model_api_key(api_key)
|
||||
|
||||
assert Service.get_history_model().query.count() == 1
|
||||
|
||||
@@ -1,46 +1,60 @@
|
||||
import json
|
||||
from datetime import timedelta, datetime
|
||||
|
||||
from flask import url_for
|
||||
from app.models import ApiKey
|
||||
from app.dao.api_key_dao import save_model_api_key, expire_api_key
|
||||
|
||||
from app.models import ApiKey, KEY_TYPE_NORMAL
|
||||
from app.dao.api_key_dao import expire_api_key
|
||||
from tests import create_authorization_header
|
||||
from tests.app.conftest import sample_api_key as create_sample_api_key
|
||||
from tests.app.conftest import sample_service as create_sample_service
|
||||
from tests.app.conftest import sample_user as create_user
|
||||
|
||||
|
||||
def test_api_key_should_create_new_api_key_for_service(notify_api, notify_db,
|
||||
notify_db_session,
|
||||
sample_service):
|
||||
def test_api_key_should_create_new_api_key_for_service(notify_api, sample_service):
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
data = {'name': 'some secret name', 'created_by': str(sample_service.created_by.id)}
|
||||
data = {
|
||||
'name': 'some secret name',
|
||||
'created_by': str(sample_service.created_by.id),
|
||||
'key_type': KEY_TYPE_NORMAL
|
||||
}
|
||||
auth_header = create_authorization_header()
|
||||
response = client.post(url_for('service.renew_api_key', service_id=sample_service.id),
|
||||
response = client.post(url_for('service.create_api_key', service_id=sample_service.id),
|
||||
data=json.dumps(data),
|
||||
headers=[('Content-Type', 'application/json'), auth_header])
|
||||
assert response.status_code == 201
|
||||
assert response.get_data is not None
|
||||
assert 'data' in json.loads(response.get_data(as_text=True))
|
||||
saved_api_key = ApiKey.query.filter_by(service_id=sample_service.id).first()
|
||||
assert saved_api_key.service_id == sample_service.id
|
||||
assert saved_api_key.name == 'some secret name'
|
||||
|
||||
|
||||
def test_api_key_should_return_error_when_service_does_not_exist(notify_api, notify_db, notify_db_session,
|
||||
sample_service):
|
||||
def test_api_key_should_return_error_when_service_does_not_exist(notify_api, sample_service):
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
import uuid
|
||||
missing_service_id = uuid.uuid4()
|
||||
auth_header = create_authorization_header()
|
||||
response = client.post(url_for('service.renew_api_key', service_id=missing_service_id),
|
||||
response = client.post(url_for('service.create_api_key', service_id=missing_service_id),
|
||||
headers=[('Content-Type', 'application/json'), auth_header])
|
||||
assert response.status_code == 404
|
||||
|
||||
|
||||
def test_revoke_should_expire_api_key_for_service(notify_api, notify_db, notify_db_session,
|
||||
sample_api_key):
|
||||
def test_create_api_key_should_set_default_key_type_of_normal(notify_api, sample_service):
|
||||
with notify_api.test_request_context(), notify_api.test_client() as client:
|
||||
data = {
|
||||
'name': 'some secret name',
|
||||
'created_by': str(sample_service.created_by.id)
|
||||
}
|
||||
auth_header = create_authorization_header()
|
||||
response = client.post(url_for('service.create_api_key', service_id=sample_service.id),
|
||||
data=json.dumps(data),
|
||||
headers=[('Content-Type', 'application/json'), auth_header])
|
||||
assert response.status_code == 201
|
||||
assert ApiKey.query.one().key_type == KEY_TYPE_NORMAL
|
||||
|
||||
|
||||
def test_revoke_should_expire_api_key_for_service(notify_api, sample_api_key):
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
assert ApiKey.query.count() == 1
|
||||
@@ -54,26 +68,29 @@ def test_revoke_should_expire_api_key_for_service(notify_api, notify_db, notify_
|
||||
assert api_keys_for_service.expiry_date is not None
|
||||
|
||||
|
||||
def test_api_key_should_create_multiple_new_api_key_for_service(notify_api, notify_db,
|
||||
notify_db_session,
|
||||
sample_service):
|
||||
def test_api_key_should_create_multiple_new_api_key_for_service(notify_api, sample_service):
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
assert ApiKey.query.count() == 0
|
||||
data = {'name': 'some secret name', 'created_by': str(sample_service.created_by.id)}
|
||||
data = {
|
||||
'name': 'some secret name',
|
||||
'created_by': str(sample_service.created_by.id),
|
||||
'key_type': KEY_TYPE_NORMAL
|
||||
}
|
||||
auth_header = create_authorization_header()
|
||||
response = client.post(url_for('service.renew_api_key', service_id=sample_service.id),
|
||||
response = client.post(url_for('service.create_api_key', service_id=sample_service.id),
|
||||
data=json.dumps(data),
|
||||
headers=[('Content-Type', 'application/json'), auth_header])
|
||||
assert response.status_code == 201
|
||||
assert ApiKey.query.count() == 1
|
||||
data = {'name': 'another secret name', 'created_by': str(sample_service.created_by.id)}
|
||||
|
||||
data['name'] = 'another secret name'
|
||||
auth_header = create_authorization_header()
|
||||
response2 = client.post(url_for('service.renew_api_key', service_id=sample_service.id),
|
||||
response2 = client.post(url_for('service.create_api_key', service_id=sample_service.id),
|
||||
data=json.dumps(data),
|
||||
headers=[('Content-Type', 'application/json'), auth_header])
|
||||
assert response2.status_code == 201
|
||||
assert response2.get_data != response.get_data
|
||||
assert json.loads(response.get_data(as_text=True)) != json.loads(response2.get_data(as_text=True))
|
||||
assert ApiKey.query.count() == 2
|
||||
|
||||
|
||||
@@ -110,9 +127,7 @@ def test_get_api_keys_should_return_all_keys_for_service(notify_api, notify_db,
|
||||
assert len(json_resp['apiKeys']) == 3
|
||||
|
||||
|
||||
def test_get_api_keys_should_return_one_key_for_service(notify_api, notify_db,
|
||||
notify_db_session,
|
||||
sample_api_key):
|
||||
def test_get_api_keys_should_return_one_key_for_service(notify_api, sample_api_key):
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
auth_header = create_authorization_header()
|
||||
|
||||
@@ -34,7 +34,7 @@ def test_url_for_update_service(notify_api):
|
||||
assert str(url) == '/service/{}'.format(service_id)
|
||||
|
||||
|
||||
def test_url_for_renew_api_key(notify_api):
|
||||
def test_url_for_create_api_key(notify_api):
|
||||
with notify_api.test_request_context():
|
||||
url = url_for('service.renew_api_key', service_id=service_id)
|
||||
url = url_for('service.create_api_key', service_id=service_id)
|
||||
assert str(url) == '/service/{}/api-key'.format(service_id)
|
||||
|
||||
@@ -50,7 +50,7 @@ def notify_db_session(request, notify_db):
|
||||
def teardown():
|
||||
notify_db.session.remove()
|
||||
for tbl in reversed(notify_db.metadata.sorted_tables):
|
||||
if tbl.name not in ["provider_details"]:
|
||||
if tbl.name not in ["provider_details", "key_types"]:
|
||||
notify_db.engine.execute(tbl.delete())
|
||||
notify_db.session.commit()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user