Files
notifications-admin/app/utils/__init__.py
Ben Thorner c17d438de8 DRY-up email revalidation check
Previously this was duplicated between the "two_factor" and the
"webauthn" views, and required more test setup. This DRYs up the
check and tests it once, using mocks to simplify the view tests.

As part of DRYing up the check into a util module, I've also moved
the "is_less_than_days_ago" function it uses.
2021-06-14 12:52:54 +01:00

148 lines
4.9 KiB
Python

from functools import wraps
from itertools import chain
from urllib.parse import urlparse
from flask import abort, current_app, g, make_response, request
from flask_login import current_user
from notifications_utils.field import Field
from orderedset._orderedset import OrderedSet
from werkzeug.datastructures import MultiDict
from werkzeug.routing import RequestRedirect
SENDING_STATUSES = ['created', 'pending', 'sending', 'pending-virus-check']
DELIVERED_STATUSES = ['delivered', 'sent', 'returned-letter']
FAILURE_STATUSES = ['failed', 'temporary-failure', 'permanent-failure',
'technical-failure', 'virus-scan-failed', 'validation-failed']
REQUESTED_STATUSES = SENDING_STATUSES + DELIVERED_STATUSES + FAILURE_STATUSES
NOTIFICATION_TYPES = ["sms", "email", "letter", "broadcast"]
def service_has_permission(permission):
from app import current_service
def wrap(func):
@wraps(func)
def wrap_func(*args, **kwargs):
if not current_service or not current_service.has_permission(permission):
abort(403)
return func(*args, **kwargs)
return wrap_func
return wrap
def get_help_argument():
return request.args.get('help') if request.args.get('help') in ('1', '2', '3') else None
def get_logo_cdn_domain():
parsed_uri = urlparse(current_app.config['ADMIN_BASE_URL'])
if parsed_uri.netloc.startswith('localhost'):
return 'static-logos.notify.tools'
subdomain = parsed_uri.hostname.split('.')[0]
domain = parsed_uri.netloc[len(subdomain + '.'):]
return "static-logos.{}".format(domain)
def parse_filter_args(filter_dict):
if not isinstance(filter_dict, MultiDict):
filter_dict = MultiDict(filter_dict)
return MultiDict(
(
key,
(','.join(filter_dict.getlist(key))).split(',')
)
for key in filter_dict.keys()
if ''.join(filter_dict.getlist(key))
)
def set_status_filters(filter_args):
status_filters = filter_args.get('status', [])
return list(OrderedSet(chain(
(status_filters or REQUESTED_STATUSES),
DELIVERED_STATUSES if 'delivered' in status_filters else [],
SENDING_STATUSES if 'sending' in status_filters else [],
FAILURE_STATUSES if 'failed' in status_filters else []
)))
def unicode_truncate(s, length):
encoded = s.encode('utf-8')[:length]
return encoded.decode('utf-8', 'ignore')
def should_skip_template_page(template_type):
return (
current_user.has_permissions('send_messages')
and not current_user.has_permissions('manage_templates', 'manage_api_keys')
and template_type != 'letter'
)
def get_default_sms_sender(sms_senders):
return str(next((
Field(x['sms_sender'], html='escape')
for x in sms_senders if x['is_default']
), "None"))
class PermanentRedirect(RequestRedirect):
"""
In Werkzeug 0.15.0 the status code for RequestRedirect changed from 301 to 308.
308 status codes are not supported when Internet Explorer is used with Windows 7
and Windows 8.1, so this class keeps the original status code of 301.
"""
code = 301
def hide_from_search_engines(f):
@wraps(f)
def decorated_function(*args, **kwargs):
g.hide_from_search_engines = True
response = make_response(f(*args, **kwargs))
response.headers['X-Robots-Tag'] = 'noindex'
return response
return decorated_function
# Function to merge two dict or lists with a JSON-like structure into one.
# JSON-like means they can contain all types JSON can: all the main primitives
# plus nested lists or dictionaries.
# Merge is additive. New values overwrite old and collections are added to.
def merge_jsonlike(source, destination):
def merge_items(source_item, destination_item):
if isinstance(source_item, dict) and isinstance(destination_item, dict):
merge_dicts(source_item, destination_item)
elif isinstance(source_item, list) and isinstance(destination_item, list):
merge_lists(source_item, destination_item)
else: # primitive value
return False
return True
def merge_lists(source, destination):
last_src_idx = len(source) - 1
for idx, item in enumerate(destination):
if idx <= last_src_idx:
# assign destination value if can't be merged into source
if merge_items(source[idx], destination[idx]) is False:
source[idx] = destination[idx]
else:
source.append(item)
def merge_dicts(source, destination):
for key, value in destination.items():
if key in source:
# assign destination value if can't be merged into source
if merge_items(source[key], value) is False:
source[key] = value
else:
source[key] = value
merge_items(source, destination)