fix input handling

This commit is contained in:
Kenneth Kehl
2025-06-19 11:31:37 -07:00
parent 69e7244485
commit c8ac8f349a
2 changed files with 25 additions and 2 deletions

View File

@@ -47,7 +47,7 @@ register_errors(job_blueprint)
def is_suspicious_input(input_str):
if not isinstance(input_str, str):
return True
return False
pattern = r"(?i)\b(OR|AND|UNION|SELECT|DROP|INSERT|UPDATE|DELETE|EXEC|TRUNCATE|CREATE|ALTER|--|/\*|\bpg_sleep\b|\bsleep\b)|[';]{2,}" # noqa
return bool(re.search(pattern, input_str))
@@ -55,7 +55,7 @@ def is_suspicious_input(input_str):
def is_valid_id(id):
if not isinstance(id, str):
return True
return bool(re.match(r"^[a-zA-Z0-9_-]{1,32}$", id))
return bool(re.match(r"^[a-zA-Z0-9_-]{1,50}$", id))
@job_blueprint.route("/<job_id>", methods=["GET"])
@@ -209,8 +209,14 @@ def get_recent_notifications_for_service_job(service_id, job_id):
@job_blueprint.route("/<job_id>/notification_count", methods=["GET"])
def get_notification_count_for_job_id(service_id, job_id):
if is_suspicious_input(service_id) or is_suspicious_input(job_id):
current_app.logger.error(
f"Service Id {service_id} is suspicious input or job id {job_id} is."
)
abort(403)
if not is_valid_id(service_id) or not is_valid_id(job_id):
current_app.logger.error(
f"service id {service_id} or job_id {job_id} is not a valid id"
)
abort(403)
dao_get_job_by_service_id_and_job_id(service_id, job_id)
count = dao_get_notification_count_for_job_id(job_id=job_id)

View File

@@ -16,6 +16,7 @@ from app.enums import (
NotificationType,
TemplateType,
)
from app.job.rest import is_suspicious_input, is_valid_id
from app.utils import utc_now
from tests import create_admin_authorization_header
from tests.app.db import (
@@ -586,6 +587,22 @@ def test_get_all_notifications_for_job_returns_correct_format(
assert resp["notifications"][0]["status"] == sample_notification_with_job.status
def test_is_valid_id(sample_job):
returnVal = is_valid_id(sample_job.service_id)
assert returnVal is True
returnVal = is_valid_id("abc pgsleep(1)")
assert returnVal is False
def test_is_suspicious_input(sample_job):
returnVal = is_suspicious_input(sample_job.id)
assert returnVal is False
returnVal = is_suspicious_input("1 OR pg_sleep(1)")
assert returnVal is True
def test_get_notification_count_for_job_id(admin_request, mocker, sample_job):
mock_dao = mocker.patch(
"app.job.rest.dao_get_notification_count_for_job_id", return_value=3