mirror of
https://github.com/GSA/notifications-api.git
synced 2025-12-16 02:02:13 -05:00
Just looks a bit tidier and less repetitive. I’ve only done this for the serialised service because: - we’re only checking this in places where we’re already using the serialised service - if we want to check this elsewhere there’s a good chance that new code should be using the serialised service, since it’ll itself be doing some kind of performance optimisation
136 lines
3.5 KiB
Python
136 lines
3.5 KiB
Python
from collections import defaultdict
|
|
from functools import partial
|
|
from threading import RLock
|
|
|
|
import cachetools
|
|
from flask import current_app
|
|
from notifications_utils.clients.redis import RequestCache
|
|
from notifications_utils.serialised_model import (
|
|
SerialisedModel,
|
|
SerialisedModelCollection,
|
|
)
|
|
from werkzeug.utils import cached_property
|
|
|
|
from app import db, redis_store
|
|
from app.dao.api_key_dao import get_model_api_keys
|
|
from app.dao.services_dao import dao_fetch_service_by_id
|
|
|
|
caches = defaultdict(partial(cachetools.TTLCache, maxsize=1024, ttl=2))
|
|
locks = defaultdict(RLock)
|
|
redis_cache = RequestCache(redis_store)
|
|
|
|
|
|
def memory_cache(func):
|
|
@cachetools.cached(
|
|
cache=caches[func.__qualname__],
|
|
lock=locks[func.__qualname__],
|
|
key=ignore_first_argument_cache_key,
|
|
)
|
|
def wrapper(*args, **kwargs):
|
|
return func(*args, **kwargs)
|
|
|
|
return wrapper
|
|
|
|
|
|
def ignore_first_argument_cache_key(cls, *args, **kwargs):
|
|
return cachetools.keys.hashkey(*args, **kwargs)
|
|
|
|
|
|
class SerialisedTemplate(SerialisedModel):
|
|
ALLOWED_PROPERTIES = {
|
|
'archived',
|
|
'content',
|
|
'id',
|
|
'postage',
|
|
'process_type',
|
|
'reply_to_text',
|
|
'subject',
|
|
'template_type',
|
|
'version',
|
|
}
|
|
|
|
@classmethod
|
|
@memory_cache
|
|
def from_id_and_service_id(cls, template_id, service_id, version=None):
|
|
return cls(cls.get_dict(template_id, service_id, version)['data'])
|
|
|
|
@staticmethod
|
|
@redis_cache.set('service-{service_id}-template-{template_id}-version-{version}')
|
|
def get_dict(template_id, service_id, version):
|
|
from app.dao import templates_dao
|
|
from app.schemas import template_schema
|
|
|
|
fetched_template = templates_dao.dao_get_template_by_id_and_service_id(
|
|
template_id=template_id,
|
|
service_id=service_id,
|
|
version=version,
|
|
)
|
|
|
|
template_dict = template_schema.dump(fetched_template).data
|
|
db.session.commit()
|
|
|
|
return {'data': template_dict}
|
|
|
|
|
|
class SerialisedService(SerialisedModel):
|
|
ALLOWED_PROPERTIES = {
|
|
'id',
|
|
'name',
|
|
'active',
|
|
'contact_link',
|
|
'email_from',
|
|
'message_limit',
|
|
'permissions',
|
|
'rate_limit',
|
|
'research_mode',
|
|
'restricted',
|
|
'prefix_sms',
|
|
'email_branding'
|
|
}
|
|
|
|
@classmethod
|
|
@memory_cache
|
|
def from_id(cls, service_id):
|
|
return cls(cls.get_dict(service_id)['data'])
|
|
|
|
@staticmethod
|
|
@redis_cache.set('service-{service_id}')
|
|
def get_dict(service_id):
|
|
from app.schemas import service_schema
|
|
|
|
service_dict = service_schema.dump(dao_fetch_service_by_id(service_id)).data
|
|
db.session.commit()
|
|
|
|
return {'data': service_dict}
|
|
|
|
@cached_property
|
|
def api_keys(self):
|
|
return SerialisedAPIKeyCollection.from_service_id(self.id)
|
|
|
|
@property
|
|
def high_volume(self):
|
|
return self.id in current_app.config['HIGH_VOLUME_SERVICE']
|
|
|
|
|
|
class SerialisedAPIKey(SerialisedModel):
|
|
ALLOWED_PROPERTIES = {
|
|
'id',
|
|
'secret',
|
|
'expiry_date',
|
|
'key_type',
|
|
}
|
|
|
|
|
|
class SerialisedAPIKeyCollection(SerialisedModelCollection):
|
|
model = SerialisedAPIKey
|
|
|
|
@classmethod
|
|
@memory_cache
|
|
def from_service_id(cls, service_id):
|
|
keys = [
|
|
{k: getattr(key, k) for k in SerialisedAPIKey.ALLOWED_PROPERTIES}
|
|
for key in get_model_api_keys(service_id)
|
|
]
|
|
db.session.commit()
|
|
return cls(keys)
|