mirror of
https://github.com/GSA/notifications-admin.git
synced 2026-02-10 21:34:10 -05:00
In research we’ve seen people mix up the service ID and API key because they’re both 36 character UUIDs. We can’t get rid of the service ID because it’s used to look up the API key. Instead, we should change API key to be one long string, which contains both the service ID, API key and (optionally) the name of the key. For example: ``` casework_production-8b3aa916-ec82-434e-b0c5-d5d9b371d6a3-dcdc5083-2fee-4fba-8afd-51f3f4bcb7b0 ``` We still need to keep the old, separate, key and service ID for a while until people have updated their clients. But they’re now both on this page, rather than on two separate pages, which should make for less fussing anyway. This shouldn’t be rolled out until the new clients are available. - [ ] https://github.com/alphagov/notifications-python-client/pull/36 - [ ] https://github.com/alphagov/notifications-node-client/pull/10 - [ ] https://github.com/alphagov/notifications-ruby-client/pull/15 - [ ] https://github.com/alphagov/notifications-java-client/pull/38 - [ ] PHP????
198 lines
5.8 KiB
Python
198 lines
5.8 KiB
Python
import re
|
|
import csv
|
|
from io import StringIO
|
|
from os import path
|
|
from functools import wraps
|
|
from flask import (abort, session, request, redirect, url_for)
|
|
import pyexcel
|
|
import pyexcel.ext.io
|
|
import pyexcel.ext.xls
|
|
import pyexcel.ext.xlsx
|
|
import pyexcel.ext.ods3
|
|
|
|
|
|
class BrowsableItem(object):
|
|
"""
|
|
Maps for the template browse-list.
|
|
"""
|
|
|
|
def __init__(self, item, *args, **kwargs):
|
|
self._item = item
|
|
super(BrowsableItem, self).__init__()
|
|
|
|
@property
|
|
def title(self):
|
|
pass
|
|
|
|
@property
|
|
def link(self):
|
|
pass
|
|
|
|
@property
|
|
def hint(self):
|
|
pass
|
|
|
|
@property
|
|
def destructive(self):
|
|
pass
|
|
|
|
|
|
def user_has_permissions(*permissions, admin_override=False, any_=False):
|
|
def wrap(func):
|
|
@wraps(func)
|
|
def wrap_func(*args, **kwargs):
|
|
from flask_login import current_user
|
|
if current_user and current_user.has_permissions(permissions=permissions,
|
|
admin_override=admin_override, any_=any_):
|
|
return func(*args, **kwargs)
|
|
else:
|
|
abort(403)
|
|
return wrap_func
|
|
return wrap
|
|
|
|
|
|
def redirect_to_sign_in(f):
|
|
@wraps(f)
|
|
def wrapped(*args, **kwargs):
|
|
if 'user_details' not in session:
|
|
return redirect(url_for('main.sign_in'))
|
|
else:
|
|
return f(*args, **kwargs)
|
|
return wrapped
|
|
|
|
|
|
def get_errors_for_csv(recipients, template_type):
|
|
|
|
errors = []
|
|
|
|
if recipients.rows_with_bad_recipients:
|
|
number_of_bad_recipients = len(list(recipients.rows_with_bad_recipients))
|
|
if 'sms' == template_type:
|
|
if 1 == number_of_bad_recipients:
|
|
errors.append("fix 1 phone number")
|
|
else:
|
|
errors.append("fix {} phone numbers".format(number_of_bad_recipients))
|
|
elif 'email' == template_type:
|
|
if 1 == number_of_bad_recipients:
|
|
errors.append("fix 1 email address")
|
|
else:
|
|
errors.append("fix {} email addresses".format(number_of_bad_recipients))
|
|
|
|
if recipients.rows_with_missing_data:
|
|
number_of_rows_with_missing_data = len(list(recipients.rows_with_missing_data))
|
|
if 1 == number_of_rows_with_missing_data:
|
|
errors.append("enter missing data in 1 row")
|
|
else:
|
|
errors.append("enter missing data in {} rows".format(number_of_rows_with_missing_data))
|
|
|
|
return errors
|
|
|
|
|
|
def generate_notifications_csv(json_list):
|
|
from app import format_datetime_24h, format_notification_status
|
|
content = StringIO()
|
|
retval = None
|
|
with content as csvfile:
|
|
csvwriter = csv.writer(csvfile)
|
|
csvwriter.writerow(['Row number', 'Recipient', 'Template', 'Type', 'Job', 'Status', 'Time'])
|
|
for x in json_list:
|
|
csvwriter.writerow([
|
|
int(x['job_row_number']) + 2 if 'job_row_number' in x and x['job_row_number'] else '',
|
|
x['to'],
|
|
x['template']['name'],
|
|
x['template']['template_type'],
|
|
x['job']['original_file_name'] if x['job'] else '',
|
|
format_notification_status(x['status'], x['template']['template_type']),
|
|
format_datetime_24h(x['created_at'])])
|
|
retval = content.getvalue()
|
|
return retval
|
|
|
|
|
|
def get_page_from_request():
|
|
if 'page' in request.args:
|
|
try:
|
|
return int(request.args['page'])
|
|
except ValueError:
|
|
return None
|
|
else:
|
|
return 1
|
|
|
|
|
|
def generate_previous_dict(view, service_id, page, url_args=None):
|
|
return generate_previous_next_dict(view, service_id, page - 1, 'Previous page', url_args or {})
|
|
|
|
|
|
def generate_next_dict(view, service_id, page, url_args=None):
|
|
return generate_previous_next_dict(view, service_id, page + 1, 'Next page', url_args or {})
|
|
|
|
|
|
def generate_previous_next_dict(view, service_id, page, title, url_args):
|
|
return {
|
|
'url': url_for(view, service_id=service_id, page=page, **url_args),
|
|
'title': title,
|
|
'label': 'page {}'.format(page)
|
|
}
|
|
|
|
|
|
def email_safe(string, whitespace='.'):
|
|
return "".join([
|
|
character.lower() if character.isalnum() or character == whitespace else ""
|
|
for character in re.sub(r"\s+", whitespace, string.strip())
|
|
])
|
|
|
|
|
|
class Spreadsheet():
|
|
|
|
allowed_file_extensions = ['csv', 'xlsx', 'xls', 'ods', 'xlsm', 'tsv']
|
|
|
|
def __init__(self, csv_data, filename=''):
|
|
self.filename = filename
|
|
self.as_csv_data = csv_data
|
|
self.as_dict = {
|
|
'file_name': self.filename,
|
|
'data': self.as_csv_data
|
|
}
|
|
|
|
@classmethod
|
|
def can_handle(cls, filename):
|
|
return cls.get_extension(filename) in cls.allowed_file_extensions
|
|
|
|
@staticmethod
|
|
def get_extension(filename):
|
|
return path.splitext(filename)[1].lower().lstrip('.')
|
|
|
|
@staticmethod
|
|
def normalise_newlines(file_content):
|
|
return '\r\n'.join(file_content.read().decode('utf-8').splitlines())
|
|
|
|
@classmethod
|
|
def from_rows(cls, rows, filename=''):
|
|
|
|
with StringIO() as converted:
|
|
output = csv.writer(converted)
|
|
|
|
for row in rows:
|
|
output.writerow(row)
|
|
|
|
return cls(converted.getvalue(), filename)
|
|
|
|
@classmethod
|
|
def from_file(cls, file_content, filename=''):
|
|
|
|
extension = cls.get_extension(filename)
|
|
|
|
if extension == 'csv':
|
|
return cls(Spreadsheet.normalise_newlines(file_content), filename)
|
|
|
|
if extension == 'tsv':
|
|
file_content = StringIO(Spreadsheet.normalise_newlines(file_content))
|
|
|
|
return cls.from_rows(pyexcel.get_sheet(
|
|
file_type=extension,
|
|
file_content=file_content.getvalue()
|
|
).to_array(), filename)
|
|
|
|
|
|
def get_help_argument():
|
|
return request.args.get('help') if request.args.get('help') in ('1', '2', '3') else None
|