mirror of
https://github.com/GSA/notifications-api.git
synced 2025-12-12 16:22:17 -05:00
99 lines
2.4 KiB
Python
99 lines
2.4 KiB
Python
import uuid
|
|
from datetime import timedelta
|
|
|
|
from sqlalchemy import func, or_, select
|
|
|
|
from app import db
|
|
from app.dao.dao_utils import autocommit, version_class
|
|
from app.models import ApiKey
|
|
from app.utils import utc_now
|
|
|
|
|
|
@autocommit
|
|
@version_class(ApiKey)
|
|
def save_model_api_key(api_key):
|
|
if not api_key.id:
|
|
api_key.id = (
|
|
uuid.uuid4()
|
|
) # must be set now so version history model can use same id
|
|
api_key.secret = uuid.uuid4()
|
|
db.session.add(api_key)
|
|
|
|
|
|
@autocommit
|
|
@version_class(ApiKey)
|
|
def expire_api_key(service_id, api_key_id):
|
|
api_key = (
|
|
db.session.execute(
|
|
select(ApiKey).where(
|
|
ApiKey.id == api_key_id, ApiKey.service_id == service_id
|
|
)
|
|
)
|
|
.scalars()
|
|
.one()
|
|
)
|
|
api_key.expiry_date = utc_now()
|
|
db.session.add(api_key)
|
|
|
|
|
|
def get_model_api_keys(service_id, id=None):
|
|
if id:
|
|
return (
|
|
db.session.execute(
|
|
select(ApiKey).where(
|
|
ApiKey.id == id,
|
|
ApiKey.service_id == service_id,
|
|
ApiKey.expiry_date == None, # noqa
|
|
)
|
|
)
|
|
.scalars()
|
|
.one()
|
|
)
|
|
seven_days_ago = utc_now() - timedelta(days=7)
|
|
return (
|
|
db.session.execute(
|
|
select(ApiKey).where(
|
|
or_(
|
|
ApiKey.expiry_date == None, # noqa
|
|
func.date(ApiKey.expiry_date) > seven_days_ago, # noqa
|
|
),
|
|
ApiKey.service_id == service_id,
|
|
)
|
|
)
|
|
.scalars()
|
|
.all()
|
|
)
|
|
|
|
|
|
def get_unsigned_secrets(service_id):
|
|
"""
|
|
This method can only be exposed to the Authentication of the api calls.
|
|
"""
|
|
api_keys = (
|
|
db.session.execute(
|
|
select(ApiKey).where(
|
|
ApiKey.service_id == service_id, ApiKey.expiry_date == None # noqa
|
|
)
|
|
)
|
|
.scalars()
|
|
.all()
|
|
)
|
|
keys = [x.secret for x in api_keys]
|
|
return keys
|
|
|
|
|
|
def get_unsigned_secret(key_id):
|
|
"""
|
|
This method can only be exposed to the Authentication of the api calls.
|
|
"""
|
|
api_key = (
|
|
db.session.execute(
|
|
select(ApiKey).where(
|
|
ApiKey.id == key_id, ApiKey.expiry_date == None # noqa
|
|
)
|
|
)
|
|
.scalars()
|
|
.one()
|
|
)
|
|
return api_key.secret
|