mirror of
https://github.com/GSA/notifications-api.git
synced 2025-12-12 08:12:27 -05:00
Scrubbing log records with the formatter.
Signed-off-by: Cliff Hill <Clifford.hill@gsa.gov>
This commit is contained in:
@@ -3,6 +3,7 @@ import logging.handlers
|
||||
import re
|
||||
import sys
|
||||
from itertools import product
|
||||
from typing import Any, override
|
||||
|
||||
from flask import g, request
|
||||
from flask.ctx import has_app_context, has_request_context
|
||||
@@ -18,6 +19,39 @@ TIME_FORMAT = "%Y-%m-%dT%H:%M:%S"
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _scrub(msg: Any) -> Any:
|
||||
# Sometimes just an exception object is passed in for the message, skip those.
|
||||
if not isinstance(msg, str):
|
||||
return msg
|
||||
phones = re.findall("(?:\\+ *)?\\d[\\d\\- ]{7,}\\d", msg)
|
||||
|
||||
phones = [phone.replace("-", "").replace(" ", "") for phone in phones]
|
||||
for phone in phones:
|
||||
msg = msg.replace(phone, "1XXXXXXXXXX")
|
||||
|
||||
emails = re.findall(
|
||||
r"[\w\.-]+@[\w\.-]+", msg
|
||||
) # ['alice@google.com', 'bob@abc.com']
|
||||
for email in emails:
|
||||
# do something with each found email string
|
||||
masked_email = "XXXXX@XXXXXXX"
|
||||
msg = msg.replace(email, masked_email)
|
||||
return msg
|
||||
|
||||
|
||||
class PIIFilter(logging.Filter):
|
||||
@override
|
||||
def filter(self, record: logging.LogRecord) -> logging.LogRecord:
|
||||
record.msg = _scrub(record.msg)
|
||||
return record
|
||||
|
||||
|
||||
class PIIFormatter(logging.Formatter):
|
||||
def format(self, record: logging.LogRecord) -> str:
|
||||
record.msg = _scrub(record.msg)
|
||||
return super().format(record)
|
||||
|
||||
|
||||
def init_app(app):
|
||||
app.config.setdefault("NOTIFY_LOG_LEVEL", "INFO")
|
||||
app.config.setdefault("NOTIFY_APP_NAME", "none")
|
||||
@@ -50,7 +84,7 @@ def init_app(app):
|
||||
|
||||
def get_handlers(app):
|
||||
handlers = []
|
||||
standard_formatter = logging.Formatter(LOG_FORMAT, TIME_FORMAT)
|
||||
standard_formatter = PIIFormatter(LOG_FORMAT, TIME_FORMAT)
|
||||
json_formatter = JSONFormatter(LOG_FORMAT, TIME_FORMAT)
|
||||
|
||||
stream_handler = logging.StreamHandler(sys.stdout)
|
||||
@@ -123,36 +157,6 @@ class ServiceIdFilter(logging.Filter):
|
||||
return record
|
||||
|
||||
|
||||
class PIIFilter(logging.Filter):
|
||||
def scrub(self, msg):
|
||||
# Eventually we want to scrub all messages in all logs for phone numbers
|
||||
# and email addresses, masking them. Ultimately this will probably get
|
||||
# refactored into a 'SafeLogger' subclass or something, but let's start here
|
||||
# with phones.
|
||||
|
||||
# Sometimes just an exception object is passed in for the message, skip those.
|
||||
if not isinstance(msg, str):
|
||||
return msg
|
||||
phones = re.findall("(?:\\+ *)?\\d[\\d\\- ]{7,}\\d", msg)
|
||||
|
||||
phones = [phone.replace("-", "").replace(" ", "") for phone in phones]
|
||||
for phone in phones:
|
||||
msg = msg.replace(phone, "1XXXXXXXXXX")
|
||||
|
||||
emails = re.findall(
|
||||
r"[\w\.-]+@[\w\.-]+", msg
|
||||
) # ['alice@google.com', 'bob@abc.com']
|
||||
for email in emails:
|
||||
# do something with each found email string
|
||||
masked_email = "XXXXX@XXXXXXX"
|
||||
msg = msg.replace(email, masked_email)
|
||||
return msg
|
||||
|
||||
def filter(self, record):
|
||||
record.msg = self.scrub(record.msg)
|
||||
return record
|
||||
|
||||
|
||||
class JSONFormatter(BaseJSONFormatter):
|
||||
def process_log_record(self, log_record):
|
||||
rename_map = {
|
||||
@@ -166,6 +170,7 @@ class JSONFormatter(BaseJSONFormatter):
|
||||
log_record["logType"] = "application"
|
||||
try:
|
||||
log_record["message"] = log_record["message"].format(**log_record)
|
||||
log_record["message"] = _scrub(log_record["message"]) # PII Scrubbing
|
||||
except KeyError as e:
|
||||
# We get occasional log messages that are nested dictionaries,
|
||||
# for example, delivery receipts, where the formatting fails
|
||||
|
||||
Reference in New Issue
Block a user