From c8ac8f349a1369e76c41304f223e6f16f8aaf7f8 Mon Sep 17 00:00:00 2001 From: Kenneth Kehl <@kkehl@flexion.us> Date: Thu, 19 Jun 2025 11:31:37 -0700 Subject: [PATCH] fix input handling --- app/job/rest.py | 10 ++++++++-- tests/app/job/test_rest.py | 17 +++++++++++++++++ 2 files changed, 25 insertions(+), 2 deletions(-) diff --git a/app/job/rest.py b/app/job/rest.py index 32e845b6a..9a7de309c 100644 --- a/app/job/rest.py +++ b/app/job/rest.py @@ -47,7 +47,7 @@ register_errors(job_blueprint) def is_suspicious_input(input_str): if not isinstance(input_str, str): - return True + return False pattern = r"(?i)\b(OR|AND|UNION|SELECT|DROP|INSERT|UPDATE|DELETE|EXEC|TRUNCATE|CREATE|ALTER|--|/\*|\bpg_sleep\b|\bsleep\b)|[';]{2,}" # noqa return bool(re.search(pattern, input_str)) @@ -55,7 +55,7 @@ def is_suspicious_input(input_str): def is_valid_id(id): if not isinstance(id, str): return True - return bool(re.match(r"^[a-zA-Z0-9_-]{1,32}$", id)) + return bool(re.match(r"^[a-zA-Z0-9_-]{1,50}$", id)) @job_blueprint.route("/", methods=["GET"]) @@ -209,8 +209,14 @@ def get_recent_notifications_for_service_job(service_id, job_id): @job_blueprint.route("//notification_count", methods=["GET"]) def get_notification_count_for_job_id(service_id, job_id): if is_suspicious_input(service_id) or is_suspicious_input(job_id): + current_app.logger.error( + f"Service Id {service_id} is suspicious input or job id {job_id} is." + ) abort(403) if not is_valid_id(service_id) or not is_valid_id(job_id): + current_app.logger.error( + f"service id {service_id} or job_id {job_id} is not a valid id" + ) abort(403) dao_get_job_by_service_id_and_job_id(service_id, job_id) count = dao_get_notification_count_for_job_id(job_id=job_id) diff --git a/tests/app/job/test_rest.py b/tests/app/job/test_rest.py index f65ce2458..e39b24b54 100644 --- a/tests/app/job/test_rest.py +++ b/tests/app/job/test_rest.py @@ -16,6 +16,7 @@ from app.enums import ( NotificationType, TemplateType, ) +from app.job.rest import is_suspicious_input, is_valid_id from app.utils import utc_now from tests import create_admin_authorization_header from tests.app.db import ( @@ -586,6 +587,22 @@ def test_get_all_notifications_for_job_returns_correct_format( assert resp["notifications"][0]["status"] == sample_notification_with_job.status +def test_is_valid_id(sample_job): + returnVal = is_valid_id(sample_job.service_id) + assert returnVal is True + + returnVal = is_valid_id("abc pgsleep(1)") + assert returnVal is False + + +def test_is_suspicious_input(sample_job): + returnVal = is_suspicious_input(sample_job.id) + assert returnVal is False + + returnVal = is_suspicious_input("1 OR pg_sleep(1)") + assert returnVal is True + + def test_get_notification_count_for_job_id(admin_request, mocker, sample_job): mock_dao = mocker.patch( "app.job.rest.dao_get_notification_count_for_job_id", return_value=3