more input checking

This commit is contained in:
Kenneth Kehl
2025-06-26 10:35:46 -07:00
parent 3e93d6c9c6
commit 58a8b51f59
16 changed files with 214 additions and 86 deletions

View File

@@ -53,7 +53,13 @@ from app.user.users_schema import (
post_verify_code_schema,
post_verify_webauthn_schema,
)
from app.utils import debug_not_production, hilite, url_with_token, utc_now
from app.utils import (
check_suspicious_id,
debug_not_production,
hilite,
url_with_token,
utc_now,
)
from notifications_utils.recipients import is_us_phone_number, use_numeric_sender
user_blueprint = Blueprint("user", __name__)
@@ -95,6 +101,7 @@ def create_user():
@user_blueprint.route("/<uuid:user_id>", methods=["POST"])
def update_user_attribute(user_id):
check_suspicious_id(user_id)
user_to_update = get_user_by_id(user_id=user_id)
req_json = request.get_json()
if "updated_by" in req_json:
@@ -159,6 +166,7 @@ def get_sms_reply_to_for_notify_service(recipient, template):
@user_blueprint.route("/<uuid:user_id>/archive", methods=["POST"])
def archive_user(user_id):
check_suspicious_id(user_id)
user = get_user_by_id(user_id)
dao_archive_user(user)
@@ -167,6 +175,7 @@ def archive_user(user_id):
@user_blueprint.route("/<uuid:user_id>/activate", methods=["POST"])
def activate_user(user_id):
check_suspicious_id(user_id)
user = get_user_by_id(user_id=user_id)
if user.state == "active":
raise InvalidRequest("User already active", status_code=400)
@@ -178,6 +187,7 @@ def activate_user(user_id):
@user_blueprint.route("/<uuid:user_id>/deactivate", methods=["POST"])
def deactivate_user(user_id):
check_suspicious_id(user_id)
user = get_user_by_id(user_id=user_id)
if user.state == "pending":
raise InvalidRequest("User already inactive", status_code=400)
@@ -189,6 +199,7 @@ def deactivate_user(user_id):
@user_blueprint.route("/<uuid:user_id>/reset-failed-login-count", methods=["POST"])
def user_reset_failed_login_count(user_id):
check_suspicious_id(user_id)
user_to_update = get_user_by_id(user_id=user_id)
reset_failed_login_count(user_to_update)
return jsonify(data=user_to_update.serialize()), 200
@@ -196,6 +207,7 @@ def user_reset_failed_login_count(user_id):
@user_blueprint.route("/<uuid:user_id>/verify/password", methods=["POST"])
def verify_user_password(user_id):
check_suspicious_id(user_id)
user_to_verify = get_user_by_id(user_id=user_id)
try:
@@ -217,6 +229,7 @@ def verify_user_password(user_id):
@user_blueprint.route("/<uuid:user_id>/verify/code", methods=["POST"])
def verify_user_code(user_id):
check_suspicious_id(user_id)
data = request.get_json()
validate(data, post_verify_code_schema)
@@ -251,6 +264,7 @@ def verify_user_code(user_id):
@user_blueprint.route("/<uuid:user_id>/complete/webauthn-login", methods=["POST"])
@user_blueprint.route("/<uuid:user_id>/verify/webauthn-login", methods=["POST"])
def complete_login_after_webauthn_authentication_attempt(user_id):
check_suspicious_id(user_id)
"""
complete login after a webauthn authentication. There's nothing webauthn specific in this code
but the sms/email flows do this as part of `verify_user_code` above and this is the equivalent spot in the
@@ -283,6 +297,7 @@ def complete_login_after_webauthn_authentication_attempt(user_id):
@user_blueprint.route("/<uuid:user_id>/<code_type>-code", methods=["POST"])
def send_user_2fa_code(user_id, code_type):
check_suspicious_id(user_id)
user_to_send_to = get_user_by_id(user_id=user_id)
if count_user_verify_codes(user_to_send_to) >= current_app.config.get(
@@ -386,6 +401,7 @@ def create_2fa_code(
@user_blueprint.route("/<uuid:user_id>/change-email-verification", methods=["POST"])
def send_user_confirm_new_email(user_id):
check_suspicious_id(user_id)
user_to_send_to = get_user_by_id(user_id=user_id)
email = email_data_request_schema.load(request.get_json())
@@ -425,6 +441,7 @@ def send_user_confirm_new_email(user_id):
@user_blueprint.route("/<uuid:user_id>/email-verification", methods=["POST"])
def send_new_user_email_verification(user_id):
check_suspicious_id(user_id)
current_app.logger.info("Sending email verification for user {}".format(user_id))
request_json = request.get_json()
@@ -479,6 +496,7 @@ def send_new_user_email_verification(user_id):
@user_blueprint.route("/<uuid:user_id>/email-already-registered", methods=["POST"])
def send_already_registered_email(user_id):
check_suspicious_id(user_id)
current_app.logger.info("Email already registered for user {}".format(user_id))
to = email_data_request_schema.load(request.get_json())
@@ -528,6 +546,7 @@ def send_already_registered_email(user_id):
@user_blueprint.route("/<uuid:user_id>", methods=["GET"])
@user_blueprint.route("", methods=["GET"])
def get_user(user_id=None):
check_suspicious_id(user_id)
users = get_user_by_id(user_id=user_id)
result = (
[x.serialize() for x in users] if isinstance(users, list) else users.serialize()
@@ -539,6 +558,7 @@ def get_user(user_id=None):
"/<uuid:user_id>/service/<uuid:service_id>/permission", methods=["POST"]
)
def set_permissions(user_id, service_id):
check_suspicious_id(user_id, service_id)
# TODO fix security hole, how do we verify that the user
# who is making this request has permission to make the request.
service_user = dao_get_service_user(user_id, service_id)
@@ -651,6 +671,7 @@ def report_all_users():
@user_blueprint.route("/<uuid:user_id>/organizations-and-services", methods=["GET"])
def get_organizations_and_services_for_user(user_id):
check_suspicious_id(user_id)
user = get_user_and_accounts(user_id)
data = get_orgs_and_services(user)
return jsonify(data)