diff --git a/app/main/validators.py b/app/main/validators.py index 8e091b0f7..081556986 100644 --- a/app/main/validators.py +++ b/app/main/validators.py @@ -11,7 +11,8 @@ from notifications_utils.template import BroadcastMessageTemplate from wtforms import ValidationError from app.main._commonly_used_passwords import commonly_used_passwords -from app.utils import Spreadsheet, is_gov_user +from app.models.spreadsheet import Spreadsheet +from app.utils import is_gov_user class CommonlyUsedPassword: diff --git a/app/main/views/returned_letters.py b/app/main/views/returned_letters.py index bb81f3449..d4861733a 100644 --- a/app/main/views/returned_letters.py +++ b/app/main/views/returned_letters.py @@ -4,7 +4,8 @@ from flask import render_template from app import current_service, service_api_client from app.main import main -from app.utils import Spreadsheet, user_has_permissions +from app.models.spreadsheet import Spreadsheet +from app.utils import user_has_permissions @main.route("/services//returned-letters") diff --git a/app/models/spreadsheet.py b/app/models/spreadsheet.py new file mode 100644 index 000000000..3621d4e93 --- /dev/null +++ b/app/models/spreadsheet.py @@ -0,0 +1,105 @@ +import csv +from io import BytesIO, StringIO +from os import path + +import pyexcel +import pyexcel_xlsx + + +class Spreadsheet(): + + ALLOWED_FILE_EXTENSIONS = ('csv', 'xlsx', 'xls', 'ods', 'xlsm', 'tsv') + + def __init__(self, csv_data=None, rows=None, filename=''): + + self.filename = filename + + if csv_data and rows: + raise TypeError('Spreadsheet must be created from either rows or CSV data') + + self._csv_data = csv_data or '' + self._rows = rows or [] + + @property + def as_dict(self): + return { + 'file_name': self.filename, + 'data': self.as_csv_data + } + + @property + def as_csv_data(self): + if not self._csv_data: + with StringIO() as converted: + output = csv.writer(converted) + for row in self._rows: + output.writerow(row) + self._csv_data = converted.getvalue() + return self._csv_data + + @classmethod + def can_handle(cls, filename): + return cls.get_extension(filename) in cls.ALLOWED_FILE_EXTENSIONS + + @staticmethod + def get_extension(filename): + return path.splitext(filename)[1].lower().lstrip('.') + + @staticmethod + def normalise_newlines(file_content): + return '\r\n'.join(file_content.read().decode('utf-8').splitlines()) + + @classmethod + def from_rows(cls, rows, filename=''): + return cls(rows=rows, filename=filename) + + @classmethod + def from_dict(cls, dictionary, filename=''): + return cls.from_rows( + zip( + *sorted(dictionary.items(), key=lambda pair: pair[0]) + ), + filename=filename, + ) + + @classmethod + def from_file(cls, file_content, filename=''): + extension = cls.get_extension(filename) + + if extension == 'csv': + return cls(csv_data=Spreadsheet.normalise_newlines(file_content), filename=filename) + + if extension == 'tsv': + file_content = StringIO( + Spreadsheet.normalise_newlines(file_content)) + + instance = cls.from_rows( + pyexcel.iget_array( + file_type=extension, + file_stream=file_content), + filename) + pyexcel.free_resources() + return instance + + @classmethod + def from_file_form(cls, form): + return cls.from_file( + form.file.data, + filename=form.file.data.filename, + ) + + @property + def as_rows(self): + if not self._rows: + self._rows = list(csv.reader( + StringIO(self._csv_data), + quoting=csv.QUOTE_MINIMAL, + skipinitialspace=True, + )) + return self._rows + + @property + def as_excel_file(self): + io = BytesIO() + pyexcel_xlsx.save_data(io, {'Sheet 1': self.as_rows}) + return io.getvalue() diff --git a/app/utils.py b/app/utils.py index a983fe97a..92b48fd2a 100644 --- a/app/utils.py +++ b/app/utils.py @@ -1,20 +1,15 @@ -import csv import os import re import unicodedata from datetime import datetime, timedelta, timezone from functools import wraps -from io import BytesIO, StringIO from itertools import chain from math import floor, log10 from numbers import Number -from os import path from urllib.parse import urlparse import ago import dateutil -import pyexcel -import pyexcel_xlsx import pytz from dateutil import parser from flask import ( @@ -53,6 +48,7 @@ from orderedset._orderedset import OrderedSet from werkzeug.datastructures import MultiDict from werkzeug.routing import RequestRedirect +from app.models.spreadsheet import Spreadsheet from app.notify_client.organisations_api_client import organisations_client SENDING_STATUSES = ['created', 'pending', 'sending', 'pending-virus-check'] @@ -289,105 +285,6 @@ def id_safe(string): return email_safe(string, whitespace='-') -class Spreadsheet(): - - ALLOWED_FILE_EXTENSIONS = ('csv', 'xlsx', 'xls', 'ods', 'xlsm', 'tsv') - - def __init__(self, csv_data=None, rows=None, filename=''): - - self.filename = filename - - if csv_data and rows: - raise TypeError('Spreadsheet must be created from either rows or CSV data') - - self._csv_data = csv_data or '' - self._rows = rows or [] - - @property - def as_dict(self): - return { - 'file_name': self.filename, - 'data': self.as_csv_data - } - - @property - def as_csv_data(self): - if not self._csv_data: - with StringIO() as converted: - output = csv.writer(converted) - for row in self._rows: - output.writerow(row) - self._csv_data = converted.getvalue() - return self._csv_data - - @classmethod - def can_handle(cls, filename): - return cls.get_extension(filename) in cls.ALLOWED_FILE_EXTENSIONS - - @staticmethod - def get_extension(filename): - return path.splitext(filename)[1].lower().lstrip('.') - - @staticmethod - def normalise_newlines(file_content): - return '\r\n'.join(file_content.read().decode('utf-8').splitlines()) - - @classmethod - def from_rows(cls, rows, filename=''): - return cls(rows=rows, filename=filename) - - @classmethod - def from_dict(cls, dictionary, filename=''): - return cls.from_rows( - zip( - *sorted(dictionary.items(), key=lambda pair: pair[0]) - ), - filename=filename, - ) - - @classmethod - def from_file(cls, file_content, filename=''): - extension = cls.get_extension(filename) - - if extension == 'csv': - return cls(csv_data=Spreadsheet.normalise_newlines(file_content), filename=filename) - - if extension == 'tsv': - file_content = StringIO( - Spreadsheet.normalise_newlines(file_content)) - - instance = cls.from_rows( - pyexcel.iget_array( - file_type=extension, - file_stream=file_content), - filename) - pyexcel.free_resources() - return instance - - @classmethod - def from_file_form(cls, form): - return cls.from_file( - form.file.data, - filename=form.file.data.filename, - ) - - @property - def as_rows(self): - if not self._rows: - self._rows = list(csv.reader( - StringIO(self._csv_data), - quoting=csv.QUOTE_MINIMAL, - skipinitialspace=True, - )) - return self._rows - - @property - def as_excel_file(self): - io = BytesIO() - pyexcel_xlsx.save_data(io, {'Sheet 1': self.as_rows}) - return io.getvalue() - - def get_help_argument(): return request.args.get('help') if request.args.get('help') in ('1', '2', '3') else None