mirror of
https://github.com/GSA/notifications-api.git
synced 2025-12-08 14:12:27 -05:00
* Remove SocketIO We have tried a few times to enable SocketIO in our app and it has been a challenge each step of the way, and now there is a dependency incompatibility it seems. Given that we are in maintenance mode with the applicaiton currently and hve different plans for the interface, we are removing this dependency. Signed-off-by: Carlo Costino <carlo.costino@gsa.gov> * Remove more unused code and address Flake8 warnings Signed-off-by: Carlo Costino <carlo.costino@gsa.gov> * Revert changes to fix test Signed-off-by: Carlo Costino <carlo.costino@gsa.gov> --------- Signed-off-by: Carlo Costino <carlo.costino@gsa.gov>
196 lines
6.0 KiB
Python
196 lines
6.0 KiB
Python
import os
|
|
import re
|
|
from datetime import datetime, timedelta, timezone
|
|
|
|
from flask import abort, current_app, url_for
|
|
from sqlalchemy import func
|
|
|
|
from notifications_utils.template import HTMLEmailTemplate, SMSMessageTemplate
|
|
|
|
DATETIME_FORMAT_NO_TIMEZONE = "%Y-%m-%d %H:%M:%S.%f"
|
|
DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"
|
|
DATE_FORMAT = "%Y-%m-%d"
|
|
|
|
|
|
def pagination_links(pagination, endpoint, **kwargs):
|
|
if "page" in kwargs:
|
|
kwargs.pop("page", None)
|
|
links = {}
|
|
if pagination.has_prev:
|
|
links["prev"] = url_for(endpoint, page=pagination.prev_num, **kwargs)
|
|
if pagination.has_next:
|
|
links["next"] = url_for(endpoint, page=pagination.next_num, **kwargs)
|
|
links["last"] = url_for(endpoint, page=pagination.pages, **kwargs)
|
|
return links
|
|
|
|
|
|
def get_prev_next_pagination_links(current_page, next_page_exists, endpoint, **kwargs):
|
|
if "page" in kwargs:
|
|
kwargs.pop("page", None)
|
|
links = {}
|
|
if current_page > 1:
|
|
links["prev"] = url_for(endpoint, page=current_page - 1, **kwargs)
|
|
if next_page_exists:
|
|
links["next"] = url_for(endpoint, page=current_page + 1, **kwargs)
|
|
return links
|
|
|
|
|
|
def url_with_token(data, url, config, base_url=None):
|
|
from notifications_utils.url_safe_token import generate_token
|
|
|
|
token = generate_token(data, config["SECRET_KEY"], config["DANGEROUS_SALT"])
|
|
base_url = (base_url or config["ADMIN_BASE_URL"]) + url
|
|
return base_url + token
|
|
|
|
|
|
def get_template_instance(template, values=None):
|
|
from app.enums import TemplateType
|
|
|
|
return {
|
|
TemplateType.SMS: SMSMessageTemplate,
|
|
TemplateType.EMAIL: HTMLEmailTemplate,
|
|
}[template["template_type"]](template, values)
|
|
|
|
|
|
def template_model_to_dict(template):
|
|
return {
|
|
"id": str(template.id),
|
|
"template_type": template.template_type,
|
|
"content": template.content,
|
|
"subject": getattr(template, "subject", None),
|
|
"created_at": template.created_at,
|
|
"name": template.name,
|
|
"version": template.version,
|
|
}
|
|
|
|
|
|
def get_midnight_in_utc(date):
|
|
"""
|
|
This function converts date to midnight in UTC,
|
|
removing the tzinfo from the datetime because the database stores the timestamps without timezone.
|
|
:param date: the day to calculate the local midnight in UTC for
|
|
:return: the datetime of local midnight in UTC, for example 2016-06-17 = 2016-06-16 23:00:00
|
|
"""
|
|
return datetime.combine(date, datetime.min.time())
|
|
|
|
|
|
def get_month_from_utc_column(column):
|
|
"""
|
|
Where queries need to count notifications by month it needs to be
|
|
the month in local time.
|
|
The database stores all timestamps as UTC without the timezone.
|
|
- First set the timezone on created_at to UTC
|
|
- then convert the timezone to local time (or America/New_York)
|
|
- lastly truncate the datetime to month with which we can group
|
|
queries
|
|
"""
|
|
return func.date_trunc("month", func.timezone("UTC", func.timezone("UTC", column)))
|
|
|
|
|
|
def get_public_notify_type_text(notify_type, plural=False):
|
|
from app.enums import NotificationType, ServicePermissionType
|
|
|
|
notify_type_text = notify_type
|
|
if notify_type == NotificationType.SMS:
|
|
notify_type_text = "text message"
|
|
elif notify_type == ServicePermissionType.UPLOAD_DOCUMENT:
|
|
notify_type_text = "document"
|
|
|
|
return "{}{}".format(notify_type_text, "s" if plural else "")
|
|
|
|
|
|
def midnight_n_days_ago(number_of_days):
|
|
"""
|
|
Returns midnight a number of days ago. Takes care of daylight savings etc.
|
|
"""
|
|
return get_midnight_in_utc(utc_now() - timedelta(days=number_of_days))
|
|
|
|
|
|
def escape_special_characters(string):
|
|
for special_character in ("\\", "_", "%", "/"):
|
|
string = string.replace(special_character, r"\{}".format(special_character))
|
|
return string
|
|
|
|
|
|
def get_archived_db_column_value(column):
|
|
date = utc_now().strftime("%Y-%m-%d")
|
|
return f"_archived_{date}_{column}"
|
|
|
|
|
|
def get_dt_string_or_none(val):
|
|
return val.strftime(DATETIME_FORMAT) if val else None
|
|
|
|
|
|
# Function used for debugging.
|
|
# Do print(hilite(message)) while debugging, then remove your print statements
|
|
def hilite(message):
|
|
ansi_green = "\033[32m"
|
|
ansi_reset = "\033[0m"
|
|
return f"{ansi_green}{message}{ansi_reset}"
|
|
|
|
|
|
def aware_utcnow():
|
|
return datetime.now(timezone.utc)
|
|
|
|
|
|
def naive_utcnow():
|
|
return aware_utcnow().replace(tzinfo=None)
|
|
|
|
|
|
def utc_now():
|
|
return naive_utcnow()
|
|
|
|
|
|
def debug_not_production(msg):
|
|
if os.getenv("NOTIFY_ENVIRONMENT") not in ["production"]:
|
|
current_app.logger.info(msg)
|
|
|
|
|
|
def is_suspicious_input(input_str):
|
|
if not isinstance(input_str, str):
|
|
return False
|
|
|
|
pattern = re.compile(
|
|
r"""
|
|
(?i) # case insensite
|
|
\b # word boundary
|
|
( # start of group for SQL keywords
|
|
OR # match SQL keyword OR
|
|
|AND
|
|
|UNION
|
|
|SELECT
|
|
|DROP
|
|
|INSERT
|
|
|UPDATE
|
|
|DELETE
|
|
|EXEC
|
|
|TRUNCATE
|
|
|CREATE
|
|
|ALTER
|
|
|-- # match SQL single-line comment
|
|
|/\* # match SQL multi-line comment
|
|
|\bpg_sleep\b # Match PostgreSQL 'pg_sleep' function
|
|
|
|
|\bsleep\b # Match SQL Server 'sleep' function
|
|
) # End SQL keywords and function group
|
|
| # OR operator to include an alternate pattern
|
|
[';]{2,} # Match two or more consecutive single quotes or semi-colons
|
|
""",
|
|
re.VERBOSE,
|
|
)
|
|
return bool(re.search(pattern, input_str))
|
|
|
|
|
|
def is_valid_id(id):
|
|
if not isinstance(id, str):
|
|
return True
|
|
return bool(re.match(r"^[a-zA-Z0-9_-]{1,50}$", id))
|
|
|
|
|
|
def check_suspicious_id(*args):
|
|
for id in args:
|
|
if not is_valid_id(id):
|
|
abort(403)
|
|
if is_suspicious_input(id):
|
|
abort(403)
|