Files
notifications-admin/notifications_utils/recipients.py

742 lines
23 KiB
Python
Raw Permalink Normal View History

import csv
import re
import sys
from collections import namedtuple
from contextlib import suppress
from functools import lru_cache
from io import StringIO
from itertools import islice
import phonenumbers
from flask import current_app
from ordered_set import OrderedSet
from phonenumbers.phonenumberutil import NumberParseException
from notifications_utils.formatters import (
strip_all_whitespace,
strip_and_remove_obscure_whitespace,
)
from notifications_utils.insensitive_dict import InsensitiveDict
2024-05-17 10:25:09 -07:00
from notifications_utils.international_billing_rates import INTERNATIONAL_BILLING_RATES
from notifications_utils.postal_address import (
address_line_7_key,
address_lines_1_to_6_and_postcode_keys,
address_lines_1_to_7_keys,
)
from notifications_utils.template import Template
from . import EMAIL_REGEX_PATTERN, hostname_part, tld_part
us_prefix = "1"
first_column_headings = {
"email": ["email address"],
"sms": ["phone number"],
"letter": [
line.replace("_", " ")
for line in address_lines_1_to_6_and_postcode_keys + [address_line_7_key]
],
}
address_columns = InsensitiveDict.from_keys(first_column_headings["letter"])
class RecipientCSV:
max_rows = 100_000
def __init__(
self,
file_data,
template,
max_errors_shown=20,
max_initial_rows_shown=10,
guestlist=None,
remaining_messages=sys.maxsize,
allow_international_sms=False,
allow_international_letters=False,
should_validate=True,
):
self.file_data = strip_all_whitespace(file_data, extra_characters=",")
self.max_errors_shown = max_errors_shown
self.max_initial_rows_shown = max_initial_rows_shown
self.guestlist = guestlist
self.template = template
self.allow_international_sms = allow_international_sms
self.allow_international_letters = allow_international_letters
self.remaining_messages = remaining_messages
self.rows_as_list = None
self.should_validate = should_validate
def __len__(self):
if not hasattr(self, "_len"):
self._len = len(self.rows)
return self._len
def __getitem__(self, requested_index):
return self.rows[requested_index]
@property
def guestlist(self):
return self._guestlist
@guestlist.setter
def guestlist(self, value):
try:
self._guestlist = list(value)
except TypeError:
self._guestlist = []
@property
def template(self):
return self._template
@template.setter
def template(self, value):
if not isinstance(value, Template):
raise TypeError(
"template must be an instance of "
"notifications_utils.template.Template"
)
self._template = value
self.template_type = self._template.template_type
self.recipient_column_headers = first_column_headings[self.template_type]
self.placeholders = self._template.placeholders
@property
def placeholders(self):
return self._placeholders
@placeholders.setter
def placeholders(self, value):
try:
self._placeholders = list(value) + self.recipient_column_headers
except TypeError:
self._placeholders = self.recipient_column_headers
self.placeholders_as_column_keys = [
InsensitiveDict.make_key(placeholder) for placeholder in self._placeholders
]
self.recipient_column_headers_as_column_keys = [
InsensitiveDict.make_key(placeholder)
for placeholder in self.recipient_column_headers
]
@property
def has_errors(self):
return bool(
self.missing_column_headers
or self.duplicate_recipient_column_headers
or self.more_rows_than_can_send
or self.too_many_rows
or (not self.allowed_to_send_to)
or any(self.rows_with_errors)
) # `or` is 3x faster than using `any()` here
@property
def allowed_to_send_to(self):
if self.template_type == "letter":
return True
if not self.guestlist:
return True
return all(
allowed_to_send_to(row.recipient, self.guestlist) for row in self.rows
)
@property
def rows(self):
if self.rows_as_list is None:
self.rows_as_list = list(self.get_rows())
return self.rows_as_list
@property
def _rows(self):
return csv.reader(
StringIO(self.file_data.strip()),
quoting=csv.QUOTE_MINIMAL,
skipinitialspace=True,
)
def get_rows(self):
column_headers = self._raw_column_headers # this is for caching
length_of_column_headers = len(column_headers)
rows_as_lists_of_columns = self._rows
next(rows_as_lists_of_columns, None) # skip the header row
for index, row in enumerate(rows_as_lists_of_columns):
if index >= self.max_rows:
yield None
continue
output_dict = {}
for column_name, column_value in zip(column_headers, row):
column_value = strip_and_remove_obscure_whitespace(column_value)
if (
InsensitiveDict.make_key(column_name)
in self.recipient_column_headers_as_column_keys
):
output_dict[column_name] = column_value or None
else:
insert_or_append_to_dict(
output_dict, column_name, column_value or None
)
length_of_row = len(row)
if length_of_column_headers < length_of_row:
output_dict[None] = row[length_of_column_headers:]
elif length_of_column_headers > length_of_row:
for key in column_headers[length_of_row:]:
insert_or_append_to_dict(output_dict, key, None)
yield Row(
output_dict,
index=index,
error_fn=self._get_error_for_field,
recipient_column_headers=self.recipient_column_headers,
placeholders=self.placeholders_as_column_keys,
template=self.template,
allow_international_letters=self.allow_international_letters,
validate_row=self.should_validate,
)
@property
def more_rows_than_can_send(self):
return len(self) > self.remaining_messages
@property
def too_many_rows(self):
return len(self) > self.max_rows
@property
def initial_rows(self):
return islice(self.rows, self.max_initial_rows_shown)
@property
def displayed_rows(self):
if any(self.rows_with_errors) and not self.missing_column_headers:
return self.initial_rows_with_errors
return self.initial_rows
def _filter_rows(self, attr):
return (row for row in self.rows if row and getattr(row, attr))
@property
def rows_with_errors(self):
return self._filter_rows("has_error")
@property
def rows_with_bad_recipients(self):
return self._filter_rows("has_bad_recipient")
@property
def rows_with_missing_data(self):
return self._filter_rows("has_missing_data")
@property
def rows_with_message_too_long(self):
return self._filter_rows("message_too_long")
@property
def rows_with_empty_message(self):
return self._filter_rows("message_empty")
@property
def initial_rows_with_errors(self):
return islice(self.rows_with_errors, self.max_errors_shown)
@property
def _raw_column_headers(self):
for row in self._rows:
return row
return []
@property
def column_headers(self):
return list(OrderedSet(self._raw_column_headers))
@property
def column_headers_as_column_keys(self):
return InsensitiveDict.from_keys(self.column_headers).keys()
@property
def missing_column_headers(self):
return set(
key
for key in self.placeholders
if (
InsensitiveDict.make_key(key) not in self.column_headers_as_column_keys
and not self.is_address_column(key)
)
)
@property
def duplicate_recipient_column_headers(self):
raw_recipient_column_headers = [
InsensitiveDict.make_key(column_header)
for column_header in self._raw_column_headers
if InsensitiveDict.make_key(column_header)
in self.recipient_column_headers_as_column_keys
]
return OrderedSet(
(
column_header
for column_header in self._raw_column_headers
if raw_recipient_column_headers.count(
InsensitiveDict.make_key(column_header)
)
> 1
)
)
def is_address_column(self, key):
return self.template_type == "letter" and key in address_columns
@property
def count_of_required_recipient_columns(self):
return 3 if self.template_type == "letter" else 1
@property
def has_recipient_columns(self):
if self.template_type == "letter":
sets_to_check = [
InsensitiveDict.from_keys(
address_lines_1_to_6_and_postcode_keys
).keys(),
InsensitiveDict.from_keys(address_lines_1_to_7_keys).keys(),
]
else:
sets_to_check = [
self.recipient_column_headers_as_column_keys,
]
for set_to_check in sets_to_check:
if (
len(
# Work out which columns are shared between the possible
# letter address columns and the columns in the users
# spreadsheet (`&` means set intersection)
set_to_check
& self.column_headers_as_column_keys
)
>= self.count_of_required_recipient_columns
):
return True
return False
def _get_error_for_field(self, key, value): # noqa: C901
if self.is_address_column(key):
return
if (
InsensitiveDict.make_key(key)
in self.recipient_column_headers_as_column_keys
):
if value in [None, ""] or isinstance(value, list):
if self.duplicate_recipient_column_headers:
return None
else:
return Cell.missing_field_error
try:
if self.template_type == "email":
validate_email_address(value)
if self.template_type == "sms":
validate_phone_number(
value, international=self.allow_international_sms
)
except (InvalidEmailError, InvalidPhoneError) as error:
return str(error)
if InsensitiveDict.make_key(key) not in self.placeholders_as_column_keys:
return
if value in [None, ""]:
return Cell.missing_field_error
class Row(InsensitiveDict):
message_too_long = False
message_empty = False
def __init__(
self,
row_dict,
*,
index,
error_fn,
recipient_column_headers,
placeholders,
template,
allow_international_letters,
validate_row=True,
):
# If we don't need to validate, then:
# by not setting template we avoid the template level validation (used to check message length)
# by not setting error_fn, we avoid the Cell.__init__ validation (used to check phone nums are valid,
# placeholders are present, etc)
if not validate_row:
template = None
error_fn = None
self.index = index
self.recipient_column_headers = recipient_column_headers
self.placeholders = placeholders
self.allow_international_letters = allow_international_letters
if template:
template.values = row_dict
self.template_type = template.template_type
# we do not validate email size for CSVs to avoid performance issues
if self.template_type == "email":
self.message_too_long = False
else:
self.message_too_long = template.is_message_too_long()
self.message_empty = template.is_message_empty()
super().__init__(
{
key: Cell(key, value, error_fn, self.placeholders)
for key, value in row_dict.items()
}
)
def __getitem__(self, key):
return super().__getitem__(key) if key in self else Cell()
def get(self, key, default=None):
if key not in self and default is not None:
return default
return self[key]
@property
def has_error(self):
return self.has_error_spanning_multiple_cells or any(
cell.error for cell in self.values()
)
@property
def has_bad_recipient(self):
if self.template_type == "letter":
return self.has_bad_postal_address
return self.get(self.recipient_column_headers[0]).recipient_error
@property
def has_bad_postal_address(self):
return self.template_type == "letter" and not self.as_postal_address.valid
@property
def has_error_spanning_multiple_cells(self):
return (
self.message_too_long or self.message_empty or self.has_bad_postal_address
)
@property
def has_missing_data(self):
return any(cell.error == Cell.missing_field_error for cell in self.values())
@property
def recipient(self):
columns = [self.get(column).data for column in self.recipient_column_headers]
return columns[0] if len(columns) == 1 else columns
@property
def as_postal_address(self):
from notifications_utils.postal_address import PostalAddress
return PostalAddress.from_personalisation(
self.recipient_and_personalisation,
allow_international_letters=self.allow_international_letters,
)
@property
def personalisation(self):
return InsensitiveDict(
{key: cell.data for key, cell in self.items() if key in self.placeholders}
)
@property
def recipient_and_personalisation(self):
return InsensitiveDict({key: cell.data for key, cell in self.items()})
class Cell:
missing_field_error = "Missing"
def __init__(self, key=None, value=None, error_fn=None, placeholders=None):
self.data = value
self.error = error_fn(key, value) if error_fn else None
self.ignore = InsensitiveDict.make_key(key) not in (placeholders or [])
def __eq__(self, other):
if not other.__class__ == self.__class__:
return False
return all(
(
self.data == other.data,
self.error == other.error,
self.ignore == other.ignore,
)
)
@property
def recipient_error(self):
return self.error not in {None, self.missing_field_error}
class InvalidEmailError(Exception):
def __init__(self, message=None):
super().__init__(message or "Not a valid email address")
class InvalidPhoneError(InvalidEmailError):
pass
class InvalidAddressError(InvalidEmailError):
pass
def normalize_phone_number(phonenumber):
if isinstance(phonenumber, str):
phonenumber = phonenumbers.parse(phonenumber, "US")
return phonenumbers.format_number(phonenumber, phonenumbers.PhoneNumberFormat.E164)
def is_us_phone_number(number):
try:
return _get_country_code(number) == us_prefix
except NumberParseException:
return False
international_phone_info = namedtuple(
"PhoneNumber",
[
"international",
"country_prefix",
"billable_units",
],
)
def get_international_phone_info(number):
number = validate_phone_number(number, international=True)
prefix = _get_country_code(number)
return international_phone_info(
international=(prefix != us_prefix),
country_prefix=prefix,
billable_units=get_billable_units_for_prefix(prefix),
)
# NANP_COUNTRY_AREA_CODES are the list of area codes in the North American Numbering Plan
# that have their own entry in international_billing_rates.yml.
# Source: https://en.wikipedia.org/wiki/List_of_North_American_Numbering_Plan_area_codes
_NANP_COUNTRY_AREA_CODES = [
"684",
"242",
"246",
"264",
"268",
"284",
"345",
"441",
"473",
"649",
"876",
"664",
"721",
"758",
"767",
"784",
"868",
"869",
]
def _get_country_code(number):
parsed = phonenumbers.parse(number, "US")
country_code = str(parsed.country_code)
if country_code == us_prefix:
area_code = str(parsed.national_number)[:3]
if area_code in _NANP_COUNTRY_AREA_CODES:
return f"{country_code}{area_code}"
return country_code
def get_billable_units_for_prefix(prefix):
"""Return the billable units for prefix. Hard-coded to 1 for now"""
return 1
# return INTERNATIONAL_BILLING_RATES[prefix]['billable_units']
def use_numeric_sender(number):
prefix = _get_country_code(number)
return (
INTERNATIONAL_BILLING_RATES[(prefix or us_prefix)]["attributes"]["alpha"]
== "NO"
)
def validate_us_phone_number(number):
try:
parsed = phonenumbers.parse(number, "US")
if not is_us_phone_number(number):
raise InvalidPhoneError("Not a US number")
if phonenumbers.is_valid_number(parsed):
return normalize_phone_number(parsed)
if len(str(parsed.national_number)) > 10:
raise InvalidPhoneError("Too many digits")
if len(str(parsed.national_number)) < 10:
raise InvalidPhoneError("Not enough digits")
if phonenumbers.is_possible_number(parsed):
raise InvalidPhoneError("Phone number range is not in use")
raise InvalidPhoneError("Phone number is not possible")
except NumberParseException as exc:
raise InvalidPhoneError(exc._msg) from exc
def validate_phone_number(number, international=False):
if (not international) or is_us_phone_number(number):
return validate_us_phone_number(number)
try:
parsed = phonenumbers.parse(number, None)
if parsed.country_code != 1:
raise InvalidPhoneError("Invalid country code")
number = f"{parsed.country_code}{parsed.national_number}"
if len(number) < 8:
raise InvalidPhoneError("Not enough digits")
if len(number) > 15:
raise InvalidPhoneError("Too many digits")
return normalize_phone_number(parsed)
except NumberParseException as exc:
if exc._msg == "Could not interpret numbers after plus-sign.":
raise InvalidPhoneError("Not a valid country prefix") from exc
raise InvalidPhoneError(exc._msg) from exc
validate_and_format_phone_number = validate_phone_number
def try_validate_and_format_phone_number(number, international=None, log_msg=None):
"""
For use in places where you shouldn't error if the phone number is invalid - for example if firetext pass us
something in
"""
try:
return validate_and_format_phone_number(number, international)
except InvalidPhoneError as exc:
if log_msg:
current_app.logger.warning("{}: {}".format(log_msg, exc))
return number
def _do_simple_email_checks(match, email_address):
# not an email
if not match:
raise InvalidEmailError
if len(email_address) > 320:
raise InvalidEmailError
# don't allow consecutive periods in either part
if ".." in email_address:
raise InvalidEmailError
def validate_email_address(email_address): # noqa (C901 too complex)
# almost exactly the same as by https://github.com/wtforms/wtforms/blob/master/wtforms/validators.py,
# with minor tweaks for SES compatibility - to avoid complications we are a lot stricter with the local part
# than neccessary - we don't allow any double quotes or semicolons to prevent SES Technical Failures
email_address = strip_and_remove_obscure_whitespace(email_address)
match = re.match(EMAIL_REGEX_PATTERN, email_address)
_do_simple_email_checks(match, email_address)
hostname = match.group(1)
# idna = "Internationalized domain name" - this encode/decode cycle converts unicode into its accurate ascii
# representation as the web uses. '例え.テスト'.encode('idna') == b'xn--r8jz45g.xn--zckzah'
try:
hostname = hostname.encode("idna").decode("ascii")
except UnicodeError:
raise InvalidEmailError
parts = hostname.split(".")
if len(hostname) > 253 or len(parts) < 2:
raise InvalidEmailError
for part in parts:
if not part or len(part) > 63 or not hostname_part.match(part):
raise InvalidEmailError
# if the part after the last . is not a valid TLD then bail out
if not tld_part.match(parts[-1]):
raise InvalidEmailError
return email_address
def format_email_address(email_address):
return strip_and_remove_obscure_whitespace(email_address.lower())
def validate_and_format_email_address(email_address):
return format_email_address(validate_email_address(email_address))
@lru_cache(maxsize=32, typed=False)
def format_recipient(recipient):
if not isinstance(recipient, str):
return ""
with suppress(InvalidPhoneError):
return validate_and_format_phone_number(recipient, international=True)
with suppress(InvalidEmailError):
return validate_and_format_email_address(recipient)
return recipient
def format_phone_number_human_readable(phone_number):
try:
phone_number = validate_phone_number(phone_number, international=True)
except InvalidPhoneError:
# if there was a validation error, we want to shortcut out here, but still display the number on the front end
return phone_number
international_phone_info = get_international_phone_info(phone_number)
return phonenumbers.format_number(
phonenumbers.parse(phone_number, None),
(
phonenumbers.PhoneNumberFormat.INTERNATIONAL
if international_phone_info.international
else phonenumbers.PhoneNumberFormat.NATIONAL
),
)
def allowed_to_send_to(recipient, allowlist):
return format_recipient(recipient) in {format_recipient(x) for x in allowlist}
def insert_or_append_to_dict(dict_, key, value):
if not (key or value):
# We dont care about completely empty values so its faster to
# ignore them rather than working out how to store them
return
if dict_.get(key):
if isinstance(dict_[key], list):
dict_[key].append(value)
else:
dict_[key] = [dict_[key], value]
else:
dict_.update({key: value})