2024-09-04 09:33:41 -07:00
|
|
|
import os
|
2025-06-26 10:35:46 -07:00
|
|
|
import re
|
2024-05-23 10:18:02 -07:00
|
|
|
from datetime import datetime, timedelta, timezone
|
2017-01-27 12:21:28 +00:00
|
|
|
|
2025-06-26 10:35:46 -07:00
|
|
|
from flask import abort, current_app, url_for
|
2021-03-10 13:55:06 +00:00
|
|
|
from sqlalchemy import func
|
2016-06-28 15:17:36 +01:00
|
|
|
|
2024-05-16 10:17:45 -04:00
|
|
|
from notifications_utils.template import HTMLEmailTemplate, SMSMessageTemplate
|
|
|
|
|
|
2020-12-18 17:39:35 +00:00
|
|
|
DATETIME_FORMAT_NO_TIMEZONE = "%Y-%m-%d %H:%M:%S.%f"
|
|
|
|
|
DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"
|
|
|
|
|
DATE_FORMAT = "%Y-%m-%d"
|
2017-04-03 15:49:23 +01:00
|
|
|
|
2016-06-28 15:17:36 +01:00
|
|
|
|
|
|
|
|
def pagination_links(pagination, endpoint, **kwargs):
|
2023-08-29 14:54:30 -07:00
|
|
|
if "page" in kwargs:
|
|
|
|
|
kwargs.pop("page", None)
|
2016-09-21 16:54:02 +01:00
|
|
|
links = {}
|
2016-06-28 15:17:36 +01:00
|
|
|
if pagination.has_prev:
|
2023-08-29 14:54:30 -07:00
|
|
|
links["prev"] = url_for(endpoint, page=pagination.prev_num, **kwargs)
|
2016-06-28 15:17:36 +01:00
|
|
|
if pagination.has_next:
|
2023-08-29 14:54:30 -07:00
|
|
|
links["next"] = url_for(endpoint, page=pagination.next_num, **kwargs)
|
|
|
|
|
links["last"] = url_for(endpoint, page=pagination.pages, **kwargs)
|
2016-06-28 15:17:36 +01:00
|
|
|
return links
|
2016-10-13 11:59:47 +01:00
|
|
|
|
|
|
|
|
|
2021-12-09 17:18:28 +00:00
|
|
|
def get_prev_next_pagination_links(current_page, next_page_exists, endpoint, **kwargs):
|
2023-08-29 14:54:30 -07:00
|
|
|
if "page" in kwargs:
|
|
|
|
|
kwargs.pop("page", None)
|
2021-12-09 17:18:28 +00:00
|
|
|
links = {}
|
|
|
|
|
if current_page > 1:
|
2023-08-29 14:54:30 -07:00
|
|
|
links["prev"] = url_for(endpoint, page=current_page - 1, **kwargs)
|
2021-12-09 17:18:28 +00:00
|
|
|
if next_page_exists:
|
2023-08-29 14:54:30 -07:00
|
|
|
links["next"] = url_for(endpoint, page=current_page + 1, **kwargs)
|
2021-12-09 17:18:28 +00:00
|
|
|
return links
|
|
|
|
|
|
|
|
|
|
|
2018-02-09 14:16:10 +00:00
|
|
|
def url_with_token(data, url, config, base_url=None):
|
2016-10-13 11:59:47 +01:00
|
|
|
from notifications_utils.url_safe_token import generate_token
|
2023-08-29 14:54:30 -07:00
|
|
|
|
|
|
|
|
token = generate_token(data, config["SECRET_KEY"], config["DANGEROUS_SALT"])
|
|
|
|
|
base_url = (base_url or config["ADMIN_BASE_URL"]) + url
|
2016-10-13 11:59:47 +01:00
|
|
|
return base_url + token
|
2016-12-09 15:56:25 +00:00
|
|
|
|
|
|
|
|
|
2025-05-27 19:40:26 -04:00
|
|
|
def get_template_instance(template, values=None):
|
2024-01-10 12:45:03 -05:00
|
|
|
from app.enums import TemplateType
|
2023-08-29 14:54:30 -07:00
|
|
|
|
2016-12-09 15:56:25 +00:00
|
|
|
return {
|
2024-02-28 12:40:52 -05:00
|
|
|
TemplateType.SMS: SMSMessageTemplate,
|
|
|
|
|
TemplateType.EMAIL: HTMLEmailTemplate,
|
2025-05-27 19:40:26 -04:00
|
|
|
}[template["template_type"]](template, values)
|
2025-05-16 16:23:59 -04:00
|
|
|
|
|
|
|
|
|
|
|
|
|
def template_model_to_dict(template):
|
|
|
|
|
return {
|
|
|
|
|
"id": str(template.id),
|
|
|
|
|
"template_type": template.template_type,
|
|
|
|
|
"content": template.content,
|
|
|
|
|
"subject": getattr(template, "subject", None),
|
|
|
|
|
"created_at": template.created_at,
|
|
|
|
|
"name": template.name,
|
|
|
|
|
"version": template.version,
|
|
|
|
|
}
|
2017-01-27 12:21:28 +00:00
|
|
|
|
|
|
|
|
|
2023-05-10 08:39:50 -07:00
|
|
|
def get_midnight_in_utc(date):
|
2017-01-27 15:57:25 +00:00
|
|
|
"""
|
2023-08-29 14:54:30 -07:00
|
|
|
This function converts date to midnight in UTC,
|
|
|
|
|
removing the tzinfo from the datetime because the database stores the timestamps without timezone.
|
|
|
|
|
:param date: the day to calculate the local midnight in UTC for
|
|
|
|
|
:return: the datetime of local midnight in UTC, for example 2016-06-17 = 2016-06-16 23:00:00
|
2017-01-27 15:57:25 +00:00
|
|
|
"""
|
2023-05-10 08:39:50 -07:00
|
|
|
return datetime.combine(date, datetime.min.time())
|
2017-01-27 12:21:28 +00:00
|
|
|
|
|
|
|
|
|
2023-05-30 12:16:49 -07:00
|
|
|
def get_month_from_utc_column(column):
|
2017-01-30 16:46:47 +00:00
|
|
|
"""
|
2023-08-29 14:54:30 -07:00
|
|
|
Where queries need to count notifications by month it needs to be
|
|
|
|
|
the month in local time.
|
|
|
|
|
The database stores all timestamps as UTC without the timezone.
|
|
|
|
|
- First set the timezone on created_at to UTC
|
|
|
|
|
- then convert the timezone to local time (or America/New_York)
|
|
|
|
|
- lastly truncate the datetime to month with which we can group
|
|
|
|
|
queries
|
2017-01-30 16:46:47 +00:00
|
|
|
"""
|
2023-08-29 14:54:30 -07:00
|
|
|
return func.date_trunc("month", func.timezone("UTC", func.timezone("UTC", column)))
|
2017-02-14 14:22:52 +00:00
|
|
|
|
|
|
|
|
|
2017-06-29 18:02:21 +01:00
|
|
|
def get_public_notify_type_text(notify_type, plural=False):
|
2024-01-15 16:45:55 -05:00
|
|
|
from app.enums import NotificationType, ServicePermissionType
|
2023-08-29 14:54:30 -07:00
|
|
|
|
2017-06-29 18:02:21 +01:00
|
|
|
notify_type_text = notify_type
|
2024-02-28 12:40:52 -05:00
|
|
|
if notify_type == NotificationType.SMS:
|
2023-08-29 14:54:30 -07:00
|
|
|
notify_type_text = "text message"
|
2024-01-15 16:45:55 -05:00
|
|
|
elif notify_type == ServicePermissionType.UPLOAD_DOCUMENT:
|
2023-08-29 14:54:30 -07:00
|
|
|
notify_type_text = "document"
|
2022-10-25 11:53:24 -04:00
|
|
|
|
2023-08-29 14:54:30 -07:00
|
|
|
return "{}{}".format(notify_type_text, "s" if plural else "")
|
2018-04-12 10:47:16 +01:00
|
|
|
|
|
|
|
|
|
2018-04-30 11:50:56 +01:00
|
|
|
def midnight_n_days_ago(number_of_days):
|
2018-04-12 10:47:16 +01:00
|
|
|
"""
|
|
|
|
|
Returns midnight a number of days ago. Takes care of daylight savings etc.
|
|
|
|
|
"""
|
2024-05-23 13:59:51 -07:00
|
|
|
return get_midnight_in_utc(utc_now() - timedelta(days=number_of_days))
|
2018-04-12 10:46:22 +01:00
|
|
|
|
|
|
|
|
|
2018-07-13 15:26:42 +01:00
|
|
|
def escape_special_characters(string):
|
2023-08-29 14:54:30 -07:00
|
|
|
for special_character in ("\\", "_", "%", "/"):
|
|
|
|
|
string = string.replace(special_character, r"\{}".format(special_character))
|
2018-07-13 15:26:42 +01:00
|
|
|
return string
|
2019-04-05 15:18:39 +01:00
|
|
|
|
|
|
|
|
|
2020-05-22 09:36:07 +01:00
|
|
|
def get_archived_db_column_value(column):
|
2024-05-23 13:59:51 -07:00
|
|
|
date = utc_now().strftime("%Y-%m-%d")
|
2023-08-29 14:54:30 -07:00
|
|
|
return f"_archived_{date}_{column}"
|
2020-07-27 15:17:19 +01:00
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_dt_string_or_none(val):
|
|
|
|
|
return val.strftime(DATETIME_FORMAT) if val else None
|
2020-12-04 16:00:20 +00:00
|
|
|
|
|
|
|
|
|
2024-03-28 11:19:02 -07:00
|
|
|
# Function used for debugging.
|
|
|
|
|
# Do print(hilite(message)) while debugging, then remove your print statements
|
|
|
|
|
def hilite(message):
|
|
|
|
|
ansi_green = "\033[32m"
|
|
|
|
|
ansi_reset = "\033[0m"
|
|
|
|
|
return f"{ansi_green}{message}{ansi_reset}"
|
2024-05-23 10:18:02 -07:00
|
|
|
|
|
|
|
|
|
|
|
|
|
def aware_utcnow():
|
|
|
|
|
return datetime.now(timezone.utc)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def naive_utcnow():
|
|
|
|
|
return aware_utcnow().replace(tzinfo=None)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def utc_now():
|
|
|
|
|
return naive_utcnow()
|
2024-09-04 09:33:41 -07:00
|
|
|
|
|
|
|
|
|
|
|
|
|
def debug_not_production(msg):
|
|
|
|
|
if os.getenv("NOTIFY_ENVIRONMENT") not in ["production"]:
|
|
|
|
|
current_app.logger.info(msg)
|
2025-04-09 16:56:42 -07:00
|
|
|
|
|
|
|
|
|
2025-06-26 10:35:46 -07:00
|
|
|
def is_suspicious_input(input_str):
|
|
|
|
|
if not isinstance(input_str, str):
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
pattern = re.compile(
|
|
|
|
|
r"""
|
|
|
|
|
(?i) # case insensite
|
|
|
|
|
\b # word boundary
|
|
|
|
|
( # start of group for SQL keywords
|
|
|
|
|
OR # match SQL keyword OR
|
|
|
|
|
|AND
|
|
|
|
|
|UNION
|
|
|
|
|
|SELECT
|
|
|
|
|
|DROP
|
|
|
|
|
|INSERT
|
|
|
|
|
|UPDATE
|
|
|
|
|
|DELETE
|
|
|
|
|
|EXEC
|
|
|
|
|
|TRUNCATE
|
|
|
|
|
|CREATE
|
|
|
|
|
|ALTER
|
|
|
|
|
|-- # match SQL single-line comment
|
|
|
|
|
|/\* # match SQL multi-line comment
|
|
|
|
|
|\bpg_sleep\b # Match PostgreSQL 'pg_sleep' function
|
|
|
|
|
|
|
|
|
|
|\bsleep\b # Match SQL Server 'sleep' function
|
|
|
|
|
) # End SQL keywords and function group
|
|
|
|
|
| # OR operator to include an alternate pattern
|
|
|
|
|
[';]{2,} # Match two or more consecutive single quotes or semi-colons
|
|
|
|
|
""",
|
|
|
|
|
re.VERBOSE,
|
|
|
|
|
)
|
|
|
|
|
return bool(re.search(pattern, input_str))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def is_valid_id(id):
|
|
|
|
|
if not isinstance(id, str):
|
|
|
|
|
return True
|
|
|
|
|
return bool(re.match(r"^[a-zA-Z0-9_-]{1,50}$", id))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def check_suspicious_id(*args):
|
|
|
|
|
for id in args:
|
|
|
|
|
if not is_valid_id(id):
|
|
|
|
|
abort(403)
|
|
|
|
|
if is_suspicious_input(id):
|
|
|
|
|
abort(403)
|