mirror of
https://github.com/GSA/notifications-admin.git
synced 2026-06-08 07:21:13 -04:00
It would be annoying to get all the way to the end of the flow and get told that the phone number or email address you entered isn’t valid. So this commit reuses the existing WTForms objects that we have to do some extra validation on the first step in the send one-off message flow. It also accounts for international phone numbers, if the service is allowed to send them. It doesn’t reject other people’s phone numbers if your service is restricted, because I think it’s better to let users play with the feature – it’s good for learning.
549 lines
19 KiB
Python
549 lines
19 KiB
Python
import itertools
|
||
from string import ascii_uppercase
|
||
|
||
from contextlib import suppress
|
||
from zipfile import BadZipFile
|
||
from xlrd.biffh import XLRDError
|
||
from werkzeug.routing import RequestRedirect
|
||
|
||
from flask import (
|
||
request,
|
||
render_template,
|
||
redirect,
|
||
url_for,
|
||
flash,
|
||
abort,
|
||
session,
|
||
current_app,
|
||
send_file,
|
||
)
|
||
|
||
from flask_login import login_required, current_user
|
||
|
||
from notifications_utils.columns import Columns
|
||
from notifications_utils.recipients import (
|
||
RecipientCSV,
|
||
first_column_headings,
|
||
validate_and_format_phone_number,
|
||
optional_address_columns,
|
||
)
|
||
|
||
from app.main import main
|
||
from app.main.forms import (
|
||
CsvUploadForm,
|
||
ChooseTimeForm,
|
||
get_furthest_possible_scheduled_time,
|
||
get_placeholder_form_instance
|
||
)
|
||
from app.main.uploader import (
|
||
s3upload,
|
||
s3download
|
||
)
|
||
from app import job_api_client, service_api_client, current_service, user_api_client
|
||
from app.utils import (
|
||
user_has_permissions,
|
||
get_errors_for_csv,
|
||
Spreadsheet,
|
||
get_help_argument,
|
||
get_template
|
||
)
|
||
from app.template_previews import TemplatePreview, get_page_count_for_letter
|
||
|
||
|
||
def get_page_headings(template_type):
|
||
return {
|
||
'email': 'Email templates',
|
||
'sms': 'Text message templates',
|
||
'letter': 'Letter templates'
|
||
}[template_type]
|
||
|
||
|
||
def get_example_csv_fields(column_headers, use_example_as_example, submitted_fields):
|
||
if use_example_as_example:
|
||
return ["example" for header in column_headers]
|
||
elif submitted_fields:
|
||
return [submitted_fields.get(header) for header in column_headers]
|
||
else:
|
||
return list(column_headers)
|
||
|
||
|
||
def get_example_csv_rows(template, use_example_as_example=True, submitted_fields=False):
|
||
return {
|
||
'email': ['test@example.com'] if use_example_as_example else [current_user.email_address],
|
||
'sms': ['07700 900321'] if use_example_as_example else [current_user.mobile_number],
|
||
'letter': [
|
||
(submitted_fields or {}).get(
|
||
key, get_example_letter_address(key) if use_example_as_example else key
|
||
)
|
||
for key in first_column_headings['letter']
|
||
]
|
||
}[template.template_type] + get_example_csv_fields(template.placeholders, use_example_as_example, submitted_fields)
|
||
|
||
|
||
def get_example_letter_address(key):
|
||
return {
|
||
'address line 1': 'A. Name',
|
||
'address line 2': '123 Example Street',
|
||
'postcode': 'XM4 5HQ'
|
||
}.get(key, '')
|
||
|
||
|
||
@main.route("/services/<service_id>/send/<template_id>/csv", methods=['GET', 'POST'])
|
||
@login_required
|
||
@user_has_permissions('send_texts', 'send_emails', 'send_letters')
|
||
def send_messages(service_id, template_id):
|
||
|
||
template = service_api_client.get_service_template(service_id, template_id)['data']
|
||
|
||
template = get_template(
|
||
template,
|
||
current_service,
|
||
show_recipient=True,
|
||
expand_emails=True,
|
||
letter_preview_url=url_for(
|
||
'.view_letter_template_preview',
|
||
service_id=service_id,
|
||
template_id=template_id,
|
||
filetype='png',
|
||
page_count=get_page_count_for_letter(template),
|
||
),
|
||
)
|
||
|
||
form = CsvUploadForm()
|
||
if form.validate_on_submit():
|
||
try:
|
||
upload_id = s3upload(
|
||
service_id,
|
||
Spreadsheet.from_file(form.file.data, filename=form.file.data.filename).as_dict,
|
||
current_app.config['AWS_REGION']
|
||
)
|
||
session['upload_data'] = {
|
||
"template_id": template_id,
|
||
"original_file_name": form.file.data.filename
|
||
}
|
||
return redirect(url_for('.check_messages',
|
||
service_id=service_id,
|
||
upload_id=upload_id,
|
||
template_type=template.template_type))
|
||
except (UnicodeDecodeError, BadZipFile, XLRDError):
|
||
flash('Couldn’t read {}. Try using a different file format.'.format(
|
||
form.file.data.filename
|
||
))
|
||
|
||
column_headings = first_column_headings[template.template_type] + list(template.placeholders)
|
||
|
||
return render_template(
|
||
'views/send.html',
|
||
template=template,
|
||
column_headings=list(ascii_uppercase[:len(column_headings)]),
|
||
example=[column_headings, get_example_csv_rows(template)],
|
||
form=form
|
||
)
|
||
|
||
|
||
@main.route("/services/<service_id>/send/<template_id>.csv", methods=['GET'])
|
||
@login_required
|
||
@user_has_permissions('send_texts', 'send_emails', 'send_letters', 'manage_templates', any_=True)
|
||
def get_example_csv(service_id, template_id):
|
||
template = get_template(
|
||
service_api_client.get_service_template(service_id, template_id)['data'], current_service
|
||
)
|
||
return Spreadsheet.from_rows([
|
||
first_column_headings[template.template_type] + list(template.placeholders),
|
||
get_example_csv_rows(template)
|
||
]).as_csv_data, 200, {
|
||
'Content-Type': 'text/csv; charset=utf-8',
|
||
'Content-Disposition': 'inline; filename="{}.csv"'.format(template.name)
|
||
}
|
||
|
||
|
||
@main.route("/services/<service_id>/send/<template_id>/test", endpoint='send_test')
|
||
@main.route("/services/<service_id>/send/<template_id>/one-off", endpoint='send_one_off')
|
||
@login_required
|
||
@user_has_permissions('send_texts', 'send_emails', 'send_letters')
|
||
def send_test(service_id, template_id):
|
||
session['send_test_values'] = dict()
|
||
session['send_test_letter_page_count'] = None
|
||
return redirect(url_for(
|
||
{
|
||
'main.send_test': '.send_test_step',
|
||
'main.send_one_off': '.send_one_off_step',
|
||
}[request.endpoint],
|
||
service_id=service_id,
|
||
template_id=template_id,
|
||
step_index=0,
|
||
help=get_help_argument(),
|
||
))
|
||
|
||
|
||
@main.route(
|
||
"/services/<service_id>/send/<template_id>/test/step-<int:step_index>",
|
||
methods=['GET', 'POST'],
|
||
endpoint='send_test_step',
|
||
)
|
||
@main.route(
|
||
"/services/<service_id>/send/<template_id>/one-off/step-<int:step_index>",
|
||
methods=['GET', 'POST'],
|
||
endpoint='send_one_off_step',
|
||
)
|
||
@login_required
|
||
@user_has_permissions('send_texts', 'send_emails', 'send_letters')
|
||
def send_test_step(service_id, template_id, step_index):
|
||
|
||
if 'send_test_values' not in session:
|
||
return redirect(url_for(
|
||
{
|
||
'main.send_test_step': '.send_test',
|
||
'main.send_one_off_step': '.send_one_off',
|
||
}[request.endpoint],
|
||
service_id=service_id,
|
||
template_id=template_id,
|
||
))
|
||
|
||
template = service_api_client.get_service_template(service_id, template_id)['data']
|
||
|
||
if not session.get('send_test_letter_page_count'):
|
||
session['send_test_letter_page_count'] = get_page_count_for_letter(template)
|
||
|
||
template = get_template(
|
||
template,
|
||
current_service,
|
||
show_recipient=True,
|
||
expand_emails=True,
|
||
letter_preview_url=url_for(
|
||
'.send_test_preview',
|
||
service_id=service_id,
|
||
template_id=template_id,
|
||
filetype='png',
|
||
),
|
||
page_count=session['send_test_letter_page_count']
|
||
)
|
||
|
||
placeholders = fields_to_fill_in(
|
||
template,
|
||
prefill_current_user=(request.endpoint == 'main.send_test_step'),
|
||
)
|
||
|
||
if len(placeholders) == 0:
|
||
return make_and_upload_csv_file(service_id, template)
|
||
|
||
try:
|
||
current_placeholder = placeholders[step_index]
|
||
except IndexError:
|
||
if all_placeholders_in_session(placeholders):
|
||
return make_and_upload_csv_file(service_id, template)
|
||
return redirect(url_for(
|
||
{
|
||
'main.send_test_step': '.send_test',
|
||
'main.send_one_off_step': '.send_one_off',
|
||
}[request.endpoint],
|
||
service_id=service_id,
|
||
template_id=template_id,
|
||
))
|
||
optional_placeholder = (current_placeholder in optional_address_columns)
|
||
form = get_placeholder_form_instance(
|
||
current_placeholder,
|
||
dict_to_populate_from=get_normalised_send_test_values_from_session(),
|
||
optional_placeholder=optional_placeholder,
|
||
allow_international_phone_numbers=current_service['can_send_international_sms'],
|
||
)
|
||
|
||
if form.validate_on_submit():
|
||
|
||
session['send_test_values'][current_placeholder] = form.placeholder_value.data
|
||
|
||
if all_placeholders_in_session(placeholders):
|
||
return make_and_upload_csv_file(service_id, template)
|
||
|
||
return redirect(url_for(
|
||
request.endpoint,
|
||
service_id=service_id,
|
||
template_id=template_id,
|
||
step_index=step_index + 1,
|
||
help=get_help_argument(),
|
||
))
|
||
|
||
if get_help_argument():
|
||
back_link = None
|
||
elif step_index == 0:
|
||
back_link = url_for(
|
||
'.view_template',
|
||
service_id=service_id,
|
||
template_id=template_id,
|
||
)
|
||
else:
|
||
back_link = url_for(
|
||
request.endpoint,
|
||
service_id=service_id,
|
||
template_id=template_id,
|
||
step_index=step_index - 1,
|
||
)
|
||
|
||
template.values = get_normalised_send_test_values_from_session()
|
||
template.values[current_placeholder] = None
|
||
|
||
return render_template(
|
||
'views/send-test.html',
|
||
template=template,
|
||
form=form,
|
||
optional_placeholder=optional_placeholder,
|
||
help=get_help_argument(),
|
||
back_link=back_link,
|
||
)
|
||
|
||
|
||
@main.route("/services/<service_id>/send/<template_id>/test.<filetype>", methods=['GET'])
|
||
@login_required
|
||
@user_has_permissions('send_texts', 'send_emails', 'send_letters')
|
||
def send_test_preview(service_id, template_id, filetype):
|
||
|
||
if filetype not in ('pdf', 'png'):
|
||
abort(404)
|
||
|
||
template = service_api_client.get_service_template(service_id, template_id)['data']
|
||
|
||
template = get_template(
|
||
template,
|
||
current_service,
|
||
letter_preview_url=url_for(
|
||
'.send_test_preview',
|
||
service_id=service_id,
|
||
template_id=template_id,
|
||
filetype='png',
|
||
),
|
||
)
|
||
|
||
template.values = get_normalised_send_test_values_from_session()
|
||
|
||
return TemplatePreview.from_utils_template(template, filetype, page=request.args.get('page'))
|
||
|
||
|
||
def _check_messages(service_id, template_type, upload_id, letters_as_pdf=False):
|
||
|
||
if not session.get('upload_data'):
|
||
# if we just return a `redirect` (302) object here, we'll get errors when we try and unpack in the
|
||
# check_messages route - so raise a werkzeug.routing redirect to ensure that doesn't happen.
|
||
|
||
# NOTE: this is a 301 MOVED PERMANENTLY (httpstatus.es/301), so the browser will cache this redirect, and it'll
|
||
# *always* happen for that browser. _check_messages is only used by endpoints that contain `upload_id`, which
|
||
# is a one-time-use id (that ties to a given file in S3 that is already deleted if it's not in the session)
|
||
raise RequestRedirect(get_check_messages_back_url(service_id, template_type))
|
||
|
||
users = user_api_client.get_users_for_service(service_id=service_id)
|
||
|
||
statistics = service_api_client.get_detailed_service_for_today(service_id)['data']['statistics']
|
||
remaining_messages = (current_service['message_limit'] - sum(stat['requested'] for stat in statistics.values()))
|
||
|
||
contents = s3download(service_id, upload_id)
|
||
|
||
template = get_template(
|
||
service_api_client.get_service_template(
|
||
service_id,
|
||
session['upload_data'].get('template_id')
|
||
)['data'],
|
||
current_service,
|
||
show_recipient=True,
|
||
letter_preview_url=url_for(
|
||
'.check_messages_preview',
|
||
service_id=service_id,
|
||
template_type=template_type,
|
||
upload_id=upload_id,
|
||
filetype='png',
|
||
) if not letters_as_pdf else None
|
||
)
|
||
recipients = RecipientCSV(
|
||
contents,
|
||
template_type=template.template_type,
|
||
placeholders=template.placeholders,
|
||
max_initial_rows_shown=50,
|
||
max_errors_shown=50,
|
||
whitelist=itertools.chain.from_iterable(
|
||
[user.name, user.mobile_number, user.email_address] for user in users
|
||
) if current_service['restricted'] else None,
|
||
remaining_messages=remaining_messages,
|
||
international_sms=current_service['can_send_international_sms'],
|
||
)
|
||
|
||
if request.args.get('from_test'):
|
||
extra_args = {'help': 1} if request.args.get('help', '0') != '0' else {}
|
||
if len(template.placeholders) or template.template_type == 'letter':
|
||
back_link = url_for(
|
||
'.send_test', service_id=service_id, template_id=template.id, **extra_args
|
||
)
|
||
else:
|
||
back_link = url_for(
|
||
'.view_template', service_id=service_id, template_id=template.id, **extra_args
|
||
)
|
||
choose_time_form = None
|
||
else:
|
||
back_link = url_for('.send_messages', service_id=service_id, template_id=template.id)
|
||
choose_time_form = ChooseTimeForm()
|
||
|
||
with suppress(StopIteration):
|
||
first_recipient = None
|
||
template.values = next(recipients.rows)
|
||
first_recipient = template.values.get(
|
||
Columns.make_key(recipients.recipient_column_headers[0]),
|
||
''
|
||
)
|
||
|
||
session['upload_data']['notification_count'] = len(list(recipients.rows))
|
||
session['upload_data']['valid'] = not recipients.has_errors
|
||
return dict(
|
||
recipients=recipients,
|
||
first_recipient=first_recipient,
|
||
template=template,
|
||
errors=recipients.has_errors,
|
||
row_errors=get_errors_for_csv(recipients, template.template_type),
|
||
count_of_recipients=session['upload_data']['notification_count'],
|
||
count_of_displayed_recipients=(
|
||
len(list(recipients.initial_annotated_rows_with_errors))
|
||
if any(recipients.rows_with_errors) and not recipients.missing_column_headers else
|
||
len(list(recipients.initial_annotated_rows))
|
||
),
|
||
original_file_name=session['upload_data'].get('original_file_name'),
|
||
upload_id=upload_id,
|
||
form=CsvUploadForm(),
|
||
remaining_messages=remaining_messages,
|
||
choose_time_form=choose_time_form,
|
||
back_link=back_link,
|
||
help=get_help_argument()
|
||
)
|
||
|
||
|
||
@main.route("/services/<service_id>/<template_type>/check/<upload_id>", methods=['GET'])
|
||
@login_required
|
||
@user_has_permissions('send_texts', 'send_emails', 'send_letters')
|
||
def check_messages(service_id, template_type, upload_id):
|
||
return render_template(
|
||
'views/check.html',
|
||
**_check_messages(service_id, template_type, upload_id)
|
||
)
|
||
|
||
|
||
@main.route("/services/<service_id>/<template_type>/check/<upload_id>.<filetype>", methods=['GET'])
|
||
@login_required
|
||
@user_has_permissions('send_texts', 'send_emails', 'send_letters')
|
||
def check_messages_preview(service_id, template_type, upload_id, filetype):
|
||
if filetype not in ('pdf', 'png'):
|
||
abort(404)
|
||
|
||
template = _check_messages(
|
||
service_id, template_type, upload_id, letters_as_pdf=True
|
||
)['template']
|
||
return TemplatePreview.from_utils_template(template, filetype)
|
||
|
||
|
||
@main.route("/services/<service_id>/<template_type>/check/<upload_id>", methods=['POST'])
|
||
@login_required
|
||
@user_has_permissions('send_texts', 'send_emails', 'send_letters')
|
||
def recheck_messages(service_id, template_type, upload_id):
|
||
|
||
if not session.get('upload_data'):
|
||
return redirect(url_for('main.choose_template', service_id=service_id))
|
||
|
||
return send_messages(service_id, session['upload_data'].get('template_id'))
|
||
|
||
|
||
@main.route("/services/<service_id>/start-job/<upload_id>", methods=['POST'])
|
||
@login_required
|
||
@user_has_permissions('send_texts', 'send_emails', 'send_letters')
|
||
def start_job(service_id, upload_id):
|
||
upload_data = session['upload_data']
|
||
|
||
if request.files or not upload_data.get('valid'):
|
||
# The csv was invalid, validate the csv again
|
||
return send_messages(service_id, upload_data.get('template_id'))
|
||
|
||
session.pop('upload_data')
|
||
|
||
job_api_client.create_job(
|
||
upload_id,
|
||
service_id,
|
||
upload_data.get('template_id'),
|
||
upload_data.get('original_file_name'),
|
||
upload_data.get('notification_count'),
|
||
scheduled_for=request.form.get('scheduled_for', '')
|
||
)
|
||
|
||
return redirect(
|
||
url_for('main.view_job', job_id=upload_id, service_id=service_id, help=request.form.get('help'))
|
||
)
|
||
|
||
|
||
@main.route("/services/<service_id>/end-tour/<example_template_id>")
|
||
@login_required
|
||
@user_has_permissions('manage_templates')
|
||
def go_to_dashboard_after_tour(service_id, example_template_id):
|
||
|
||
service_api_client.delete_service_template(service_id, example_template_id)
|
||
|
||
return redirect(
|
||
url_for('main.service_dashboard', service_id=service_id)
|
||
)
|
||
|
||
|
||
def get_check_messages_back_url(service_id, template_type):
|
||
if get_help_argument():
|
||
# if the user is on the introductory tour, then they should be redirected back to the beginning of the tour -
|
||
# but to do that we need to find the template_id of the example template. That template *should* be the only
|
||
# template for that service, but it's possible they've opened another tab and deleted it for example. In that
|
||
# case we should just redirect back to the main page as they clearly know what they're doing.
|
||
templates = service_api_client.get_service_templates(service_id)['data']
|
||
if len(templates) == 1:
|
||
return url_for('.send_test', service_id=service_id, template_id=templates[0]['id'], help=1)
|
||
|
||
return url_for('main.choose_template', service_id=service_id)
|
||
|
||
|
||
def fields_to_fill_in(template, prefill_current_user=False):
|
||
|
||
recipient_columns = first_column_headings[template.template_type]
|
||
|
||
if 'letter' == template.template_type or not prefill_current_user:
|
||
return recipient_columns + list(template.placeholders)
|
||
|
||
session['send_test_values'][recipient_columns[0]] = {
|
||
'email': current_user.email_address,
|
||
'sms': current_user.mobile_number,
|
||
}.get(template.template_type)
|
||
|
||
return list(template.placeholders)
|
||
|
||
|
||
def get_normalised_send_test_values_from_session():
|
||
return {
|
||
key: ''.join(value or [])
|
||
for key, value in session.get('send_test_values', {}).items()
|
||
}
|
||
|
||
|
||
def make_and_upload_csv_file(service_id, template):
|
||
upload_id = s3upload(
|
||
service_id,
|
||
Spreadsheet.from_dict(
|
||
session['send_test_values'],
|
||
filename=current_app.config['TEST_MESSAGE_FILENAME']
|
||
).as_dict,
|
||
current_app.config['AWS_REGION'],
|
||
)
|
||
session['upload_data'] = {
|
||
"template_id": template.id,
|
||
"original_file_name": current_app.config['TEST_MESSAGE_FILENAME']
|
||
}
|
||
return redirect(url_for(
|
||
'.check_messages',
|
||
upload_id=upload_id,
|
||
service_id=service_id,
|
||
template_type=template.template_type,
|
||
from_test=True,
|
||
help=2 if get_help_argument() else 0
|
||
))
|
||
|
||
|
||
def all_placeholders_in_session(placeholders):
|
||
return all(
|
||
get_normalised_send_test_values_from_session().get(placeholder, False) not in (False, None)
|
||
for placeholder in placeholders
|
||
)
|