Files
notifications-admin/app/utils.py

384 lines
11 KiB
Python
Raw Normal View History

import re
2016-04-12 14:19:51 +01:00
import csv
import pytz
from io import StringIO
from os import path
2016-02-19 16:38:04 +00:00
from functools import wraps
import unicodedata
2017-07-24 15:20:40 +01:00
from urllib.parse import urlparse
from collections import namedtuple
from datetime import datetime, timedelta, timezone
import dateutil
import ago
from flask import (
abort,
current_app,
redirect,
request,
session,
url_for
)
from flask_login import current_user
import pyexcel
from notifications_utils.template import (
SMSPreviewTemplate,
EmailPreviewTemplate,
LetterImageTemplate,
LetterPreviewTemplate,
)
2016-02-19 16:38:04 +00:00
SENDING_STATUSES = ['created', 'pending', 'sending']
DELIVERED_STATUSES = ['delivered', 'sent']
FAILURE_STATUSES = ['failed', 'temporary-failure', 'permanent-failure', 'technical-failure']
REQUESTED_STATUSES = SENDING_STATUSES + DELIVERED_STATUSES + FAILURE_STATUSES
class BrowsableItem(object):
"""
Maps for the template browse-list.
"""
def __init__(self, item, *args, **kwargs):
self._item = item
super(BrowsableItem, self).__init__()
@property
def title(self):
pass
@property
def link(self):
pass
@property
def hint(self):
pass
@property
def destructive(self):
pass
def user_has_permissions(*permissions, admin_override=False, any_=False):
2016-02-19 16:38:04 +00:00
def wrap(func):
@wraps(func)
def wrap_func(*args, **kwargs):
if current_user and current_user.is_authenticated:
if current_user.has_permissions(
permissions=permissions,
admin_override=admin_override,
any_=any_
):
return func(*args, **kwargs)
else:
abort(403)
else:
abort(401)
2016-02-19 16:38:04 +00:00
return wrap_func
return wrap
def redirect_to_sign_in(f):
@wraps(f)
def wrapped(*args, **kwargs):
if 'user_details' not in session:
return redirect(url_for('main.sign_in'))
else:
return f(*args, **kwargs)
return wrapped
def get_errors_for_csv(recipients, template_type):
errors = []
if recipients.rows_with_bad_recipients:
number_of_bad_recipients = len(list(recipients.rows_with_bad_recipients))
if 'sms' == template_type:
if 1 == number_of_bad_recipients:
errors.append("fix 1 phone number")
else:
errors.append("fix {} phone numbers".format(number_of_bad_recipients))
elif 'email' == template_type:
if 1 == number_of_bad_recipients:
errors.append("fix 1 email address")
else:
errors.append("fix {} email addresses".format(number_of_bad_recipients))
2016-11-10 14:10:39 +00:00
elif 'letter' == template_type:
if 1 == number_of_bad_recipients:
errors.append("fix 1 address")
else:
errors.append("fix {} addresses".format(number_of_bad_recipients))
if recipients.rows_with_missing_data:
number_of_rows_with_missing_data = len(list(recipients.rows_with_missing_data))
if 1 == number_of_rows_with_missing_data:
errors.append("enter missing data in 1 row")
else:
errors.append("enter missing data in {} rows".format(number_of_rows_with_missing_data))
return errors
def generate_notifications_csv(**kwargs):
from app import notification_api_client
if 'page' not in kwargs:
kwargs['page'] = 1
fieldnames = ['Row number', 'Recipient', 'Template', 'Type', 'Job', 'Status', 'Time']
yield ','.join(fieldnames) + '\n'
while kwargs['page']:
notifications_resp = notification_api_client.get_notifications_for_service(**kwargs)
notifications = notifications_resp['notifications']
for notification in notifications:
values = [
notification['row_number'],
notification['recipient'],
notification['template_name'],
notification['template_type'],
notification['job_name'],
notification['status'],
notification['created_at']
]
line = ','.join(str(i) for i in values) + '\n'
yield line
if notifications_resp['links'].get('next'):
kwargs['page'] += 1
else:
return
raise Exception("Should never reach here")
def get_page_from_request():
if 'page' in request.args:
try:
return int(request.args['page'])
except ValueError:
return None
else:
return 1
def generate_previous_dict(view, service_id, page, url_args=None):
2016-10-10 17:15:57 +01:00
return generate_previous_next_dict(view, service_id, page - 1, 'Previous page', url_args or {})
def generate_next_dict(view, service_id, page, url_args=None):
2016-10-10 17:15:57 +01:00
return generate_previous_next_dict(view, service_id, page + 1, 'Next page', url_args or {})
def generate_previous_next_dict(view, service_id, page, title, url_args):
return {
'url': url_for(view, service_id=service_id, page=page, **url_args),
'title': title,
'label': 'page {}'.format(page)
}
def email_safe(string, whitespace='.'):
# strips accents, diacritics etc
string = ''.join(c for c in unicodedata.normalize('NFD', string) if unicodedata.category(c) != 'Mn')
string = ''.join(
word.lower() if word.isalnum() or word == whitespace else ''
for word in re.sub(r'\s+', whitespace, string.strip())
)
string = re.sub(r'\.{2,}', '.', string)
return string.strip('.')
class Spreadsheet():
allowed_file_extensions = ['csv', 'xlsx', 'xls', 'ods', 'xlsm', 'tsv']
def __init__(self, csv_data, filename=''):
self.filename = filename
self.as_csv_data = csv_data
self.as_dict = {
'file_name': self.filename,
'data': self.as_csv_data
}
@classmethod
def can_handle(cls, filename):
return cls.get_extension(filename) in cls.allowed_file_extensions
@staticmethod
def get_extension(filename):
return path.splitext(filename)[1].lower().lstrip('.')
@staticmethod
def normalise_newlines(file_content):
return '\r\n'.join(file_content.read().decode('utf-8').splitlines())
@classmethod
def from_rows(cls, rows, filename=''):
with StringIO() as converted:
output = csv.writer(converted)
for row in rows:
output.writerow(row)
return cls(converted.getvalue(), filename)
Show one field at a time on send yourself a test The send yourself a test feature is useful for two things: - constructing an email/text message/letter without uploading a CSV file - seeing what the thing your going to send will look like (either by getting it in your inbox or downloading the PDF) - learning the concept of placeholders, ie understanding they’re thing that gets populated with _stuff_ The problem we’re seeing is that the current UI breaks when a template has a lot of placeholders. This is especially apparent with letter templates, which have a minimum of 7 placeholders by virtue of the address. The idea behind having the form fields side-by-side was to help people understand the relationship between their spreadsheet columns and the placeholders. But this means that the page was doing a lot of work, trying to teach: - replacement of placeholders - link between placeholders and spreadsheet columns The latter is better explained by the example spreadsheet shown on the upload page. So it can safely be removed from the send yourself a test page – in other words the fields don’t need to be shown side by side. Showing them one-at-a-time works well because: - it’s really obvious, even on first use, what the page is asking you to do - as your step through each placeholder, you see the message build up with the data you’ve entered – you’re learning how replacement of placeholders works by repetition This also means adding a matching endpoint for viewing each step of making the test letter as a PDF/PNG because we can’t reuse the view of the template without any placeholders filled any more.
2017-05-04 09:30:55 +01:00
@classmethod
def from_dict(cls, dictionary, filename=''):
return cls.from_rows(
zip(
*sorted(dictionary.items(), key=lambda pair: pair[0])
),
filename
)
@classmethod
def from_file(cls, file_content, filename=''):
extension = cls.get_extension(filename)
if extension == 'csv':
return cls(Spreadsheet.normalise_newlines(file_content), filename)
if extension == 'tsv':
file_content = StringIO(
Spreadsheet.normalise_newlines(file_content))
instance = cls.from_rows(
pyexcel.iget_array(
file_type=extension,
file_stream=file_content),
filename)
pyexcel.free_resources()
return instance
def get_help_argument():
return request.args.get('help') if request.args.get('help') in ('1', '2', '3') else None
def is_gov_user(email_address):
valid_domains = current_app.config['EMAIL_DOMAIN_REGEXES']
email_regex = (r"[\.|@]({})$".format("|".join(valid_domains)))
return bool(re.search(email_regex, email_address.lower()))
def get_template(
template,
service,
show_recipient=False,
expand_emails=False,
letter_preview_url=None,
page_count=1,
redact_missing_personalisation=False,
email_reply_to=None,
sms_sender=None,
):
if 'email' == template['template_type']:
return EmailPreviewTemplate(
template,
from_name=service['name'],
from_address='{}@notifications.service.gov.uk'.format(service['email_from']),
expanded=expand_emails,
show_recipient=show_recipient,
redact_missing_personalisation=redact_missing_personalisation,
reply_to=email_reply_to,
)
if 'sms' == template['template_type']:
return SMSPreviewTemplate(
template,
prefix=service['name'],
show_prefix=service['prefix_sms'],
sender=sms_sender,
show_sender=bool(sms_sender),
show_recipient=show_recipient,
redact_missing_personalisation=redact_missing_personalisation,
)
if 'letter' == template['template_type']:
if letter_preview_url:
return LetterImageTemplate(
template,
image_url=letter_preview_url,
page_count=int(page_count),
)
else:
return LetterPreviewTemplate(
template,
contact_block=service['letter_contact_block'],
admin_base_url=current_app.config['ADMIN_BASE_URL'],
redact_missing_personalisation=redact_missing_personalisation,
)
def get_current_financial_year():
now = datetime.utcnow()
current_month = int(now.strftime('%-m'))
current_year = int(now.strftime('%Y'))
return current_year if current_month > 3 else current_year - 1
def get_time_left(created_at):
return ago.human(
(
datetime.now(timezone.utc).replace(hour=23, minute=59, second=59)
) - (
dateutil.parser.parse(created_at) + timedelta(days=8)
),
future_tense='Data available for {}',
past_tense='Data no longer available', # No-one should ever see this
precision=1
)
def email_or_sms_not_enabled(template_type, permissions):
return (template_type in ['email', 'sms']) and (template_type not in permissions)
def get_letter_timings(upload_time):
LetterTimings = namedtuple(
'LetterTimings',
'printed_by, is_printed, earliest_delivery, latest_delivery'
)
# shift anything after 5pm to the next day
processing_day = gmt_timezones(upload_time) + timedelta(hours=(7))
print_day, earliest_delivery, latest_delivery = (
processing_day + timedelta(days=days)
for days in {
'Wednesday': (1, 3, 5),
'Thursday': (1, 4, 5),
'Friday': (3, 5, 6),
'Saturday': (2, 4, 5),
}.get(processing_day.strftime('%A'), (1, 3, 4))
)
printed_by = print_day.astimezone(pytz.timezone('Europe/London')).replace(hour=15, minute=0)
now = datetime.utcnow().replace(tzinfo=pytz.timezone('Europe/London'))
return LetterTimings(
printed_by=printed_by,
is_printed=(now > printed_by),
earliest_delivery=earliest_delivery,
latest_delivery=latest_delivery,
)
def gmt_timezones(date):
date = dateutil.parser.parse(date)
forced_utc = date.replace(tzinfo=pytz.utc)
return forced_utc.astimezone(pytz.timezone('Europe/London'))
2017-07-24 15:20:40 +01:00
def get_cdn_domain():
parsed_uri = urlparse(current_app.config['ADMIN_BASE_URL'])
if parsed_uri.netloc.startswith('localhost'):
return 'static-logos.notify.tools'
subdomain = parsed_uri.hostname.split('.')[0]
domain = parsed_uri.netloc[len(subdomain + '.'):]
return "static-logos.{}".format(domain)