Files
notifications-admin/app/utils/csv.py
Carlo Costino 9a83ba7475 Localize notification_utils to the admin
This changeset pulls in all of the notification_utils code directly into the admin and removes it as an external dependency. We are doing this to cut down on operational maintenance of the project and will begin removing parts of it no longer needed for the admin.

Signed-off-by: Carlo Costino <carlo.costino@gsa.gov>
2024-05-16 10:37:37 -04:00

175 lines
6.0 KiB
Python

import datetime
import pytz
from flask_login import current_user
from app.models.spreadsheet import Spreadsheet
from app.utils.templates import get_sample_template
from notifications_utils.recipients import RecipientCSV
def get_errors_for_csv(recipients, template_type):
errors = []
if any(recipients.rows_with_bad_recipients):
number_of_bad_recipients = len(list(recipients.rows_with_bad_recipients))
if "sms" == template_type:
if 1 == number_of_bad_recipients:
errors.append("fix 1 phone number")
else:
errors.append("fix {} phone numbers".format(number_of_bad_recipients))
elif "email" == template_type:
if 1 == number_of_bad_recipients:
errors.append("fix 1 email address")
else:
errors.append("fix {} email addresses".format(number_of_bad_recipients))
if any(recipients.rows_with_missing_data):
number_of_rows_with_missing_data = len(list(recipients.rows_with_missing_data))
if 1 == number_of_rows_with_missing_data:
errors.append("enter missing data in 1 row")
else:
errors.append(
"enter missing data in {} rows".format(number_of_rows_with_missing_data)
)
if any(recipients.rows_with_message_too_long):
number_of_rows_with_message_too_long = len(
list(recipients.rows_with_message_too_long)
)
if 1 == number_of_rows_with_message_too_long:
errors.append("shorten the message in 1 row")
else:
errors.append(
"shorten the messages in {} rows".format(
number_of_rows_with_message_too_long
)
)
if any(recipients.rows_with_empty_message):
number_of_rows_with_empty_message = len(
list(recipients.rows_with_empty_message)
)
if 1 == number_of_rows_with_empty_message:
errors.append("check you have content for the empty message in 1 row")
else:
errors.append(
"check you have content for the empty messages in {} rows".format(
number_of_rows_with_empty_message
)
)
return errors
def generate_notifications_csv(**kwargs):
from app import notification_api_client
from app.s3_client.s3_csv_client import s3download
if "page" not in kwargs:
kwargs["page"] = 1
# This generates the "batch" csv report
if kwargs.get("job_id"):
original_file_contents = s3download(kwargs["service_id"], kwargs["job_id"])
original_upload = RecipientCSV(
original_file_contents,
template=get_sample_template(kwargs["template_type"]),
)
original_column_headers = original_upload.column_headers
fieldnames = [
"Phone Number",
"Template",
"Sent by",
"Batch File",
"Carrier Response",
"Status",
"Time",
]
for header in original_column_headers:
if header.lower() != "phone number":
fieldnames.append(header)
else:
# This generates the "full" csv report
fieldnames = [
"Phone Number",
"Template",
"Sent by",
"Batch File",
"Carrier Response",
"Status",
"Time",
]
yield ",".join(fieldnames) + "\n"
while kwargs["page"]:
notifications_resp = notification_api_client.get_notifications_for_service(
**kwargs
)
for notification in notifications_resp["notifications"]:
preferred_tz_created_at = convert_report_date_to_preferred_timezone(
notification["created_at"]
)
if kwargs.get("job_id"):
values = [
notification["recipient"],
notification["template_name"],
notification["created_by_name"],
notification["job_name"],
notification["provider_response"],
notification["status"],
preferred_tz_created_at,
]
for header in original_column_headers:
if header.lower() != "phone number":
values.append(
original_upload[notification["row_number"] - 1]
.get(header)
.data
)
else:
values = [
notification["recipient"],
notification["template_name"],
notification["created_by_name"] or "",
notification["job_name"] or "",
notification["provider_response"],
notification["status"],
preferred_tz_created_at,
]
yield Spreadsheet.from_rows([map(str, values)]).as_csv_data
if notifications_resp["links"].get("next"):
kwargs["page"] += 1
else:
return
raise Exception("Should never reach here")
def convert_report_date_to_preferred_timezone(db_date_str_in_utc):
"""
Report dates in the db are in UTC. We need to convert them to the user's default timezone,
which defaults to "US/Eastern"
"""
date_arr = db_date_str_in_utc.split(" ")
db_date_str_in_utc = f"{date_arr[0]}T{date_arr[1]}+00:00"
utc_date_obj = datetime.datetime.fromisoformat(db_date_str_in_utc)
utc_date_obj = utc_date_obj.astimezone(pytz.utc)
preferred_timezone = pytz.timezone(get_user_preferred_timezone())
preferred_date_obj = utc_date_obj.astimezone(preferred_timezone)
preferred_tz_created_at = preferred_date_obj.strftime("%Y-%m-%d %I:%M:%S %p")
return f"{preferred_tz_created_at} {get_user_preferred_timezone()}"
def get_user_preferred_timezone():
if current_user and hasattr(current_user, "preferred_timezone"):
return current_user.preferred_timezone
return "US/Eastern"