mirror of
https://github.com/GSA/notifications-admin.git
synced 2026-02-05 19:03:30 -05:00
Extract Spreadsheet model from app/utils.py
`app/utils.py` is a bit of a dumping ground for things we don’t have a better place for. We now have a place and structure for storing ‘model’ code (‘model’ in the model, view, controller (MVC) sense of the word). This commit moves the spreadsheet model to that place.
This commit is contained in:
@@ -11,7 +11,8 @@ from notifications_utils.template import BroadcastMessageTemplate
|
||||
from wtforms import ValidationError
|
||||
|
||||
from app.main._commonly_used_passwords import commonly_used_passwords
|
||||
from app.utils import Spreadsheet, is_gov_user
|
||||
from app.models.spreadsheet import Spreadsheet
|
||||
from app.utils import is_gov_user
|
||||
|
||||
|
||||
class CommonlyUsedPassword:
|
||||
|
||||
@@ -4,7 +4,8 @@ from flask import render_template
|
||||
|
||||
from app import current_service, service_api_client
|
||||
from app.main import main
|
||||
from app.utils import Spreadsheet, user_has_permissions
|
||||
from app.models.spreadsheet import Spreadsheet
|
||||
from app.utils import user_has_permissions
|
||||
|
||||
|
||||
@main.route("/services/<uuid:service_id>/returned-letters")
|
||||
|
||||
105
app/models/spreadsheet.py
Normal file
105
app/models/spreadsheet.py
Normal file
@@ -0,0 +1,105 @@
|
||||
import csv
|
||||
from io import BytesIO, StringIO
|
||||
from os import path
|
||||
|
||||
import pyexcel
|
||||
import pyexcel_xlsx
|
||||
|
||||
|
||||
class Spreadsheet():
|
||||
|
||||
ALLOWED_FILE_EXTENSIONS = ('csv', 'xlsx', 'xls', 'ods', 'xlsm', 'tsv')
|
||||
|
||||
def __init__(self, csv_data=None, rows=None, filename=''):
|
||||
|
||||
self.filename = filename
|
||||
|
||||
if csv_data and rows:
|
||||
raise TypeError('Spreadsheet must be created from either rows or CSV data')
|
||||
|
||||
self._csv_data = csv_data or ''
|
||||
self._rows = rows or []
|
||||
|
||||
@property
|
||||
def as_dict(self):
|
||||
return {
|
||||
'file_name': self.filename,
|
||||
'data': self.as_csv_data
|
||||
}
|
||||
|
||||
@property
|
||||
def as_csv_data(self):
|
||||
if not self._csv_data:
|
||||
with StringIO() as converted:
|
||||
output = csv.writer(converted)
|
||||
for row in self._rows:
|
||||
output.writerow(row)
|
||||
self._csv_data = converted.getvalue()
|
||||
return self._csv_data
|
||||
|
||||
@classmethod
|
||||
def can_handle(cls, filename):
|
||||
return cls.get_extension(filename) in cls.ALLOWED_FILE_EXTENSIONS
|
||||
|
||||
@staticmethod
|
||||
def get_extension(filename):
|
||||
return path.splitext(filename)[1].lower().lstrip('.')
|
||||
|
||||
@staticmethod
|
||||
def normalise_newlines(file_content):
|
||||
return '\r\n'.join(file_content.read().decode('utf-8').splitlines())
|
||||
|
||||
@classmethod
|
||||
def from_rows(cls, rows, filename=''):
|
||||
return cls(rows=rows, filename=filename)
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, dictionary, filename=''):
|
||||
return cls.from_rows(
|
||||
zip(
|
||||
*sorted(dictionary.items(), key=lambda pair: pair[0])
|
||||
),
|
||||
filename=filename,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def from_file(cls, file_content, filename=''):
|
||||
extension = cls.get_extension(filename)
|
||||
|
||||
if extension == 'csv':
|
||||
return cls(csv_data=Spreadsheet.normalise_newlines(file_content), filename=filename)
|
||||
|
||||
if extension == 'tsv':
|
||||
file_content = StringIO(
|
||||
Spreadsheet.normalise_newlines(file_content))
|
||||
|
||||
instance = cls.from_rows(
|
||||
pyexcel.iget_array(
|
||||
file_type=extension,
|
||||
file_stream=file_content),
|
||||
filename)
|
||||
pyexcel.free_resources()
|
||||
return instance
|
||||
|
||||
@classmethod
|
||||
def from_file_form(cls, form):
|
||||
return cls.from_file(
|
||||
form.file.data,
|
||||
filename=form.file.data.filename,
|
||||
)
|
||||
|
||||
@property
|
||||
def as_rows(self):
|
||||
if not self._rows:
|
||||
self._rows = list(csv.reader(
|
||||
StringIO(self._csv_data),
|
||||
quoting=csv.QUOTE_MINIMAL,
|
||||
skipinitialspace=True,
|
||||
))
|
||||
return self._rows
|
||||
|
||||
@property
|
||||
def as_excel_file(self):
|
||||
io = BytesIO()
|
||||
pyexcel_xlsx.save_data(io, {'Sheet 1': self.as_rows})
|
||||
return io.getvalue()
|
||||
105
app/utils.py
105
app/utils.py
@@ -1,20 +1,15 @@
|
||||
import csv
|
||||
import os
|
||||
import re
|
||||
import unicodedata
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from functools import wraps
|
||||
from io import BytesIO, StringIO
|
||||
from itertools import chain
|
||||
from math import floor, log10
|
||||
from numbers import Number
|
||||
from os import path
|
||||
from urllib.parse import urlparse
|
||||
|
||||
import ago
|
||||
import dateutil
|
||||
import pyexcel
|
||||
import pyexcel_xlsx
|
||||
import pytz
|
||||
from dateutil import parser
|
||||
from flask import (
|
||||
@@ -53,6 +48,7 @@ from orderedset._orderedset import OrderedSet
|
||||
from werkzeug.datastructures import MultiDict
|
||||
from werkzeug.routing import RequestRedirect
|
||||
|
||||
from app.models.spreadsheet import Spreadsheet
|
||||
from app.notify_client.organisations_api_client import organisations_client
|
||||
|
||||
SENDING_STATUSES = ['created', 'pending', 'sending', 'pending-virus-check']
|
||||
@@ -289,105 +285,6 @@ def id_safe(string):
|
||||
return email_safe(string, whitespace='-')
|
||||
|
||||
|
||||
class Spreadsheet():
|
||||
|
||||
ALLOWED_FILE_EXTENSIONS = ('csv', 'xlsx', 'xls', 'ods', 'xlsm', 'tsv')
|
||||
|
||||
def __init__(self, csv_data=None, rows=None, filename=''):
|
||||
|
||||
self.filename = filename
|
||||
|
||||
if csv_data and rows:
|
||||
raise TypeError('Spreadsheet must be created from either rows or CSV data')
|
||||
|
||||
self._csv_data = csv_data or ''
|
||||
self._rows = rows or []
|
||||
|
||||
@property
|
||||
def as_dict(self):
|
||||
return {
|
||||
'file_name': self.filename,
|
||||
'data': self.as_csv_data
|
||||
}
|
||||
|
||||
@property
|
||||
def as_csv_data(self):
|
||||
if not self._csv_data:
|
||||
with StringIO() as converted:
|
||||
output = csv.writer(converted)
|
||||
for row in self._rows:
|
||||
output.writerow(row)
|
||||
self._csv_data = converted.getvalue()
|
||||
return self._csv_data
|
||||
|
||||
@classmethod
|
||||
def can_handle(cls, filename):
|
||||
return cls.get_extension(filename) in cls.ALLOWED_FILE_EXTENSIONS
|
||||
|
||||
@staticmethod
|
||||
def get_extension(filename):
|
||||
return path.splitext(filename)[1].lower().lstrip('.')
|
||||
|
||||
@staticmethod
|
||||
def normalise_newlines(file_content):
|
||||
return '\r\n'.join(file_content.read().decode('utf-8').splitlines())
|
||||
|
||||
@classmethod
|
||||
def from_rows(cls, rows, filename=''):
|
||||
return cls(rows=rows, filename=filename)
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, dictionary, filename=''):
|
||||
return cls.from_rows(
|
||||
zip(
|
||||
*sorted(dictionary.items(), key=lambda pair: pair[0])
|
||||
),
|
||||
filename=filename,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def from_file(cls, file_content, filename=''):
|
||||
extension = cls.get_extension(filename)
|
||||
|
||||
if extension == 'csv':
|
||||
return cls(csv_data=Spreadsheet.normalise_newlines(file_content), filename=filename)
|
||||
|
||||
if extension == 'tsv':
|
||||
file_content = StringIO(
|
||||
Spreadsheet.normalise_newlines(file_content))
|
||||
|
||||
instance = cls.from_rows(
|
||||
pyexcel.iget_array(
|
||||
file_type=extension,
|
||||
file_stream=file_content),
|
||||
filename)
|
||||
pyexcel.free_resources()
|
||||
return instance
|
||||
|
||||
@classmethod
|
||||
def from_file_form(cls, form):
|
||||
return cls.from_file(
|
||||
form.file.data,
|
||||
filename=form.file.data.filename,
|
||||
)
|
||||
|
||||
@property
|
||||
def as_rows(self):
|
||||
if not self._rows:
|
||||
self._rows = list(csv.reader(
|
||||
StringIO(self._csv_data),
|
||||
quoting=csv.QUOTE_MINIMAL,
|
||||
skipinitialspace=True,
|
||||
))
|
||||
return self._rows
|
||||
|
||||
@property
|
||||
def as_excel_file(self):
|
||||
io = BytesIO()
|
||||
pyexcel_xlsx.save_data(io, {'Sheet 1': self.as_rows})
|
||||
return io.getvalue()
|
||||
|
||||
|
||||
def get_help_argument():
|
||||
return request.args.get('help') if request.args.get('help') in ('1', '2', '3') else None
|
||||
|
||||
|
||||
Reference in New Issue
Block a user