mirror of
https://github.com/GSA/notifications-api.git
synced 2026-01-31 23:26:23 -05:00
Remove letters-related code (#175)
This deletes a big ol' chunk of code related to letters. It's not everything—there are still a few things that might be tied to sms/email—but it's the the heart of letters function. SMS and email function should be untouched by this. Areas affected: - Things obviously about letters - PDF tasks, used for precompiling letters - Virus scanning, used for those PDFs - FTP, used to send letters to the printer - Postage stuff
This commit is contained in:
@@ -1,570 +0,0 @@
|
||||
from base64 import urlsafe_b64encode
|
||||
from datetime import datetime, timedelta
|
||||
from hashlib import sha512
|
||||
|
||||
from botocore.exceptions import ClientError as BotoClientError
|
||||
from flask import current_app
|
||||
from notifications_utils.letter_timings import LETTER_PROCESSING_DEADLINE
|
||||
from notifications_utils.postal_address import PostalAddress
|
||||
from notifications_utils.timezones import convert_utc_to_local_timezone
|
||||
|
||||
from app import encryption, notify_celery
|
||||
from app.aws import s3
|
||||
from app.config import QueueNames, TaskNames
|
||||
from app.cronitor import cronitor
|
||||
from app.dao.notifications_dao import (
|
||||
dao_get_letters_and_sheets_volume_by_postage,
|
||||
dao_get_letters_to_be_printed,
|
||||
dao_get_notification_by_reference,
|
||||
dao_update_notification,
|
||||
dao_update_notifications_by_reference,
|
||||
get_notification_by_id,
|
||||
update_notification_status_by_id,
|
||||
)
|
||||
from app.dao.templates_dao import dao_get_template_by_id
|
||||
from app.errors import VirusScanError
|
||||
from app.exceptions import NotificationTechnicalFailureException
|
||||
from app.letters.utils import (
|
||||
LetterPDFNotFound,
|
||||
ScanErrorType,
|
||||
find_letter_pdf_in_s3,
|
||||
generate_letter_pdf_filename,
|
||||
get_billable_units_for_letter_page_count,
|
||||
get_file_names_from_error_bucket,
|
||||
get_folder_name,
|
||||
get_reference_from_filename,
|
||||
move_error_pdf_to_scan_bucket,
|
||||
move_failed_pdf,
|
||||
move_sanitised_letter_to_test_or_live_pdf_bucket,
|
||||
move_scan_to_invalid_pdf_bucket,
|
||||
)
|
||||
from app.models import (
|
||||
INTERNATIONAL_LETTERS,
|
||||
INTERNATIONAL_POSTAGE_TYPES,
|
||||
KEY_TYPE_NORMAL,
|
||||
KEY_TYPE_TEST,
|
||||
NOTIFICATION_CREATED,
|
||||
NOTIFICATION_DELIVERED,
|
||||
NOTIFICATION_PENDING_VIRUS_CHECK,
|
||||
NOTIFICATION_TECHNICAL_FAILURE,
|
||||
NOTIFICATION_VALIDATION_FAILED,
|
||||
NOTIFICATION_VIRUS_SCAN_FAILED,
|
||||
POSTAGE_TYPES,
|
||||
RESOLVE_POSTAGE_FOR_FILE_NAME,
|
||||
Service,
|
||||
)
|
||||
|
||||
|
||||
@notify_celery.task(bind=True, name="get-pdf-for-templated-letter", max_retries=15, default_retry_delay=300)
|
||||
def get_pdf_for_templated_letter(self, notification_id):
|
||||
try:
|
||||
notification = get_notification_by_id(notification_id, _raise=True)
|
||||
letter_filename = generate_letter_pdf_filename(
|
||||
reference=notification.reference,
|
||||
created_at=notification.created_at,
|
||||
ignore_folder=notification.key_type == KEY_TYPE_TEST,
|
||||
postage=notification.postage
|
||||
)
|
||||
letter_data = {
|
||||
'letter_contact_block': notification.reply_to_text,
|
||||
'template': {
|
||||
"subject": notification.template.subject,
|
||||
"content": notification.template.content,
|
||||
"template_type": notification.template.template_type
|
||||
},
|
||||
'values': notification.personalisation,
|
||||
'logo_filename': notification.service.letter_branding and notification.service.letter_branding.filename,
|
||||
'letter_filename': letter_filename,
|
||||
"notification_id": str(notification_id),
|
||||
'key_type': notification.key_type
|
||||
}
|
||||
|
||||
encrypted_data = encryption.sign(letter_data)
|
||||
|
||||
notify_celery.send_task(
|
||||
name=TaskNames.CREATE_PDF_FOR_TEMPLATED_LETTER,
|
||||
args=(encrypted_data,),
|
||||
queue=QueueNames.SANITISE_LETTERS
|
||||
)
|
||||
except Exception as e:
|
||||
try:
|
||||
current_app.logger.exception(
|
||||
f"RETRY: calling create-letter-pdf task for notification {notification_id} failed"
|
||||
)
|
||||
self.retry(exc=e, queue=QueueNames.RETRY)
|
||||
except self.MaxRetriesExceededError:
|
||||
message = f"RETRY FAILED: Max retries reached. " \
|
||||
f"The task create-letter-pdf failed for notification id {notification_id}. " \
|
||||
f"Notification has been updated to technical-failure"
|
||||
update_notification_status_by_id(notification_id, NOTIFICATION_TECHNICAL_FAILURE)
|
||||
raise NotificationTechnicalFailureException(message)
|
||||
|
||||
|
||||
@notify_celery.task(bind=True, name="update-billable-units-for-letter", max_retries=15, default_retry_delay=300)
|
||||
def update_billable_units_for_letter(self, notification_id, page_count):
|
||||
notification = get_notification_by_id(notification_id, _raise=True)
|
||||
|
||||
billable_units = get_billable_units_for_letter_page_count(page_count)
|
||||
|
||||
if notification.key_type != KEY_TYPE_TEST:
|
||||
notification.billable_units = billable_units
|
||||
dao_update_notification(notification)
|
||||
|
||||
current_app.logger.info(
|
||||
f"Letter notification id: {notification_id} reference {notification.reference}: "
|
||||
f"billable units set to {billable_units}"
|
||||
)
|
||||
|
||||
|
||||
@notify_celery.task(
|
||||
bind=True, name="update-validation-failed-for-templated-letter", max_retries=15, default_retry_delay=300
|
||||
)
|
||||
def update_validation_failed_for_templated_letter(self, notification_id, page_count):
|
||||
notification = get_notification_by_id(notification_id, _raise=True)
|
||||
notification.status = NOTIFICATION_VALIDATION_FAILED
|
||||
dao_update_notification(notification)
|
||||
current_app.logger.info(f"Validation failed: letter is too long {page_count} for letter with id: {notification_id}")
|
||||
|
||||
|
||||
@notify_celery.task(name='collate-letter-pdfs-to-be-sent')
|
||||
@cronitor("collate-letter-pdfs-to-be-sent")
|
||||
def collate_letter_pdfs_to_be_sent():
|
||||
"""
|
||||
Finds all letters which are still waiting to be sent to DVLA for printing
|
||||
|
||||
This would usually be run at 5.50pm and collect up letters created between before 5:30pm today
|
||||
that have not yet been sent.
|
||||
If run after midnight, it will collect up letters created before 5:30pm the day before.
|
||||
"""
|
||||
print_run_date = convert_utc_to_local_timezone(datetime.utcnow())
|
||||
if print_run_date.time() < LETTER_PROCESSING_DEADLINE:
|
||||
print_run_date = print_run_date - timedelta(days=1)
|
||||
|
||||
print_run_deadline = print_run_date.replace(
|
||||
hour=17, minute=30, second=0, microsecond=0
|
||||
)
|
||||
_get_letters_and_sheets_volumes_and_send_to_dvla(print_run_deadline)
|
||||
|
||||
for postage in POSTAGE_TYPES:
|
||||
current_app.logger.info(f"starting collate-letter-pdfs-to-be-sent processing for postage class {postage}")
|
||||
letters_to_print = get_key_and_size_of_letters_to_be_sent_to_print(print_run_deadline, postage)
|
||||
|
||||
for i, letters in enumerate(group_letters(letters_to_print)):
|
||||
filenames = [letter['Key'] for letter in letters]
|
||||
|
||||
service_id = letters[0]['ServiceId']
|
||||
organisation_id = letters[0]['OrganisationId']
|
||||
|
||||
hash = urlsafe_b64encode(sha512(''.join(filenames).encode()).digest())[:20].decode()
|
||||
# eg NOTIFY.2018-12-31.001.Wjrui5nAvObjPd-3GEL-.ZIP
|
||||
dvla_filename = 'NOTIFY.{date}.{postage}.{num:03}.{hash}.{service_id}.{organisation_id}.ZIP'.format(
|
||||
date=print_run_deadline.strftime("%Y-%m-%d"),
|
||||
postage=RESOLVE_POSTAGE_FOR_FILE_NAME[postage],
|
||||
num=i + 1,
|
||||
hash=hash,
|
||||
service_id=service_id,
|
||||
organisation_id=organisation_id
|
||||
)
|
||||
|
||||
current_app.logger.info(
|
||||
'Calling task zip-and-send-letter-pdfs for {} pdfs to upload {} with total size {:,} bytes'.format(
|
||||
len(filenames),
|
||||
dvla_filename,
|
||||
sum(letter['Size'] for letter in letters)
|
||||
)
|
||||
)
|
||||
notify_celery.send_task(
|
||||
name=TaskNames.ZIP_AND_SEND_LETTER_PDFS,
|
||||
kwargs={
|
||||
'filenames_to_zip': filenames,
|
||||
'upload_filename': dvla_filename
|
||||
},
|
||||
queue=QueueNames.PROCESS_FTP,
|
||||
compression='zlib'
|
||||
)
|
||||
current_app.logger.info(f"finished collate-letter-pdfs-to-be-sent processing for postage class {postage}")
|
||||
|
||||
current_app.logger.info("finished collate-letter-pdfs-to-be-sent")
|
||||
|
||||
|
||||
def _get_letters_and_sheets_volumes_and_send_to_dvla(print_run_deadline):
|
||||
letters_volumes = dao_get_letters_and_sheets_volume_by_postage(print_run_deadline)
|
||||
send_letters_volume_email_to_dvla(letters_volumes, print_run_deadline.date())
|
||||
|
||||
|
||||
def send_letters_volume_email_to_dvla(letters_volumes, date):
|
||||
personalisation = {
|
||||
'total_volume': 0,
|
||||
'first_class_volume': 0,
|
||||
'second_class_volume': 0,
|
||||
'international_volume': 0,
|
||||
'total_sheets': 0,
|
||||
'first_class_sheets': 0,
|
||||
"second_class_sheets": 0,
|
||||
'international_sheets': 0,
|
||||
'date': date.strftime("%d %B %Y")
|
||||
}
|
||||
for item in letters_volumes:
|
||||
personalisation['total_volume'] += item.letters_count
|
||||
personalisation['total_sheets'] += item.sheets_count
|
||||
if f"{item.postage}_class_volume" in personalisation:
|
||||
personalisation[f"{item.postage}_class_volume"] = item.letters_count
|
||||
personalisation[f"{item.postage}_class_sheets"] = item.sheets_count
|
||||
else:
|
||||
personalisation["international_volume"] += item.letters_count
|
||||
personalisation["international_sheets"] += item.sheets_count
|
||||
|
||||
template = dao_get_template_by_id(current_app.config['LETTERS_VOLUME_EMAIL_TEMPLATE_ID'])
|
||||
recipients = current_app.config['DVLA_EMAIL_ADDRESSES']
|
||||
reply_to = template.service.get_default_reply_to_email_address()
|
||||
service = Service.query.get(current_app.config['NOTIFY_SERVICE_ID'])
|
||||
|
||||
# avoid circular imports:
|
||||
from app.notifications.process_notifications import (
|
||||
persist_notification,
|
||||
send_notification_to_queue,
|
||||
)
|
||||
for recipient in recipients:
|
||||
saved_notification = persist_notification(
|
||||
template_id=template.id,
|
||||
template_version=template.version,
|
||||
recipient=recipient,
|
||||
service=service,
|
||||
personalisation=personalisation,
|
||||
notification_type=template.template_type,
|
||||
api_key_id=None,
|
||||
key_type=KEY_TYPE_NORMAL,
|
||||
reply_to_text=reply_to
|
||||
)
|
||||
|
||||
send_notification_to_queue(saved_notification, False, queue=QueueNames.NOTIFY)
|
||||
|
||||
|
||||
def get_key_and_size_of_letters_to_be_sent_to_print(print_run_deadline, postage):
|
||||
letters_awaiting_sending = dao_get_letters_to_be_printed(print_run_deadline, postage)
|
||||
for letter in letters_awaiting_sending:
|
||||
try:
|
||||
letter_pdf = find_letter_pdf_in_s3(letter)
|
||||
yield {
|
||||
"Key": letter_pdf.key,
|
||||
"Size": letter_pdf.size,
|
||||
"ServiceId": str(letter.service_id),
|
||||
"OrganisationId": str(letter.service.organisation_id)
|
||||
}
|
||||
except (BotoClientError, LetterPDFNotFound) as e:
|
||||
current_app.logger.exception(
|
||||
f"Error getting letter from bucket for notification: {letter.id} with reference: {letter.reference}", e)
|
||||
|
||||
|
||||
def group_letters(letter_pdfs):
|
||||
"""
|
||||
Group letters in chunks of MAX_LETTER_PDF_ZIP_FILESIZE. Will add files to lists, never going over that size.
|
||||
If a single file is (somehow) larger than MAX_LETTER_PDF_ZIP_FILESIZE that'll be in a list on it's own.
|
||||
If there are no files, will just exit (rather than yielding an empty list).
|
||||
"""
|
||||
running_filesize = 0
|
||||
list_of_files = []
|
||||
service_id = None
|
||||
for letter in letter_pdfs:
|
||||
if letter['Key'].lower().endswith('.pdf'):
|
||||
if not service_id:
|
||||
service_id = letter['ServiceId']
|
||||
if (
|
||||
running_filesize + letter['Size'] > current_app.config['MAX_LETTER_PDF_ZIP_FILESIZE']
|
||||
or len(list_of_files) >= current_app.config['MAX_LETTER_PDF_COUNT_PER_ZIP']
|
||||
or letter['ServiceId'] != service_id
|
||||
):
|
||||
yield list_of_files
|
||||
running_filesize = 0
|
||||
list_of_files = []
|
||||
service_id = None
|
||||
|
||||
if not service_id:
|
||||
service_id = letter['ServiceId']
|
||||
running_filesize += letter['Size']
|
||||
list_of_files.append(letter)
|
||||
|
||||
if list_of_files:
|
||||
yield list_of_files
|
||||
|
||||
|
||||
@notify_celery.task(bind=True, name='sanitise-letter', max_retries=15, default_retry_delay=300)
|
||||
def sanitise_letter(self, filename):
|
||||
try:
|
||||
reference = get_reference_from_filename(filename)
|
||||
notification = dao_get_notification_by_reference(reference)
|
||||
|
||||
current_app.logger.info('Notification ID {} Virus scan passed: {}'.format(notification.id, filename))
|
||||
|
||||
if notification.status != NOTIFICATION_PENDING_VIRUS_CHECK:
|
||||
current_app.logger.info('Sanitise letter called for notification {} which is in {} state'.format(
|
||||
notification.id, notification.status))
|
||||
return
|
||||
|
||||
notify_celery.send_task(
|
||||
name=TaskNames.SANITISE_LETTER,
|
||||
kwargs={
|
||||
'notification_id': str(notification.id),
|
||||
'filename': filename,
|
||||
'allow_international_letters': notification.service.has_permission(
|
||||
INTERNATIONAL_LETTERS
|
||||
),
|
||||
},
|
||||
queue=QueueNames.SANITISE_LETTERS,
|
||||
)
|
||||
except Exception:
|
||||
try:
|
||||
current_app.logger.exception(
|
||||
"RETRY: calling sanitise_letter task for notification {} failed".format(notification.id)
|
||||
)
|
||||
self.retry(queue=QueueNames.RETRY)
|
||||
except self.MaxRetriesExceededError:
|
||||
message = "RETRY FAILED: Max retries reached. " \
|
||||
"The task sanitise_letter failed for notification {}. " \
|
||||
"Notification has been updated to technical-failure".format(notification.id)
|
||||
update_notification_status_by_id(notification.id, NOTIFICATION_TECHNICAL_FAILURE)
|
||||
raise NotificationTechnicalFailureException(message)
|
||||
|
||||
|
||||
@notify_celery.task(bind=True, name='process-sanitised-letter', max_retries=15, default_retry_delay=300)
|
||||
def process_sanitised_letter(self, sanitise_data):
|
||||
letter_details = encryption.verify_signature(sanitise_data)
|
||||
|
||||
filename = letter_details['filename']
|
||||
notification_id = letter_details['notification_id']
|
||||
|
||||
current_app.logger.info('Processing sanitised letter with id {}'.format(notification_id))
|
||||
notification = get_notification_by_id(notification_id, _raise=True)
|
||||
|
||||
if notification.status != NOTIFICATION_PENDING_VIRUS_CHECK:
|
||||
current_app.logger.info(
|
||||
'process-sanitised-letter task called for notification {} which is in {} state'.format(
|
||||
notification.id, notification.status)
|
||||
)
|
||||
return
|
||||
|
||||
try:
|
||||
original_pdf_object = s3.get_s3_object(current_app.config['LETTERS_SCAN_BUCKET_NAME'], filename)
|
||||
|
||||
if letter_details['validation_status'] == 'failed':
|
||||
current_app.logger.info('Processing invalid precompiled pdf with id {} (file {})'.format(
|
||||
notification_id, filename))
|
||||
|
||||
_move_invalid_letter_and_update_status(
|
||||
notification=notification,
|
||||
filename=filename,
|
||||
scan_pdf_object=original_pdf_object,
|
||||
message=letter_details['message'],
|
||||
invalid_pages=letter_details['invalid_pages'],
|
||||
page_count=letter_details['page_count'],
|
||||
)
|
||||
return
|
||||
|
||||
current_app.logger.info('Processing valid precompiled pdf with id {} (file {})'.format(
|
||||
notification_id, filename))
|
||||
|
||||
billable_units = get_billable_units_for_letter_page_count(letter_details['page_count'])
|
||||
is_test_key = notification.key_type == KEY_TYPE_TEST
|
||||
|
||||
# Updating the notification needs to happen before the file is moved. This is so that if updating the
|
||||
# notification fails, the task can retry because the file is in the same place.
|
||||
update_letter_pdf_status(
|
||||
reference=notification.reference,
|
||||
status=NOTIFICATION_DELIVERED if is_test_key else NOTIFICATION_CREATED,
|
||||
billable_units=billable_units,
|
||||
recipient_address=letter_details['address']
|
||||
)
|
||||
|
||||
# The original filename could be wrong because we didn't know the postage.
|
||||
# Now we know if the letter is international, we can check what the filename should be.
|
||||
upload_file_name = generate_letter_pdf_filename(
|
||||
reference=notification.reference,
|
||||
created_at=notification.created_at,
|
||||
ignore_folder=True,
|
||||
postage=notification.postage
|
||||
)
|
||||
|
||||
move_sanitised_letter_to_test_or_live_pdf_bucket(
|
||||
filename,
|
||||
is_test_key,
|
||||
notification.created_at,
|
||||
upload_file_name,
|
||||
)
|
||||
# We've moved the sanitised PDF from the sanitise bucket, but still need to delete the original file:
|
||||
original_pdf_object.delete()
|
||||
|
||||
except BotoClientError:
|
||||
# Boto exceptions are likely to be caused by the file(s) being in the wrong place, so retrying won't help -
|
||||
# we'll need to manually investigate
|
||||
current_app.logger.exception(
|
||||
f"Boto error when processing sanitised letter for notification {notification.id} (file {filename})"
|
||||
)
|
||||
update_notification_status_by_id(notification.id, NOTIFICATION_TECHNICAL_FAILURE)
|
||||
raise NotificationTechnicalFailureException
|
||||
except Exception:
|
||||
try:
|
||||
current_app.logger.exception(
|
||||
"RETRY: calling process_sanitised_letter task for notification {} failed".format(notification.id)
|
||||
)
|
||||
self.retry(queue=QueueNames.RETRY)
|
||||
except self.MaxRetriesExceededError:
|
||||
message = "RETRY FAILED: Max retries reached. " \
|
||||
"The task process_sanitised_letter failed for notification {}. " \
|
||||
"Notification has been updated to technical-failure".format(notification.id)
|
||||
update_notification_status_by_id(notification.id, NOTIFICATION_TECHNICAL_FAILURE)
|
||||
raise NotificationTechnicalFailureException(message)
|
||||
|
||||
|
||||
def _move_invalid_letter_and_update_status(
|
||||
*, notification, filename, scan_pdf_object, message=None, invalid_pages=None, page_count=None
|
||||
):
|
||||
try:
|
||||
move_scan_to_invalid_pdf_bucket(
|
||||
source_filename=filename,
|
||||
message=message,
|
||||
invalid_pages=invalid_pages,
|
||||
page_count=page_count
|
||||
)
|
||||
scan_pdf_object.delete()
|
||||
|
||||
update_letter_pdf_status(
|
||||
reference=notification.reference,
|
||||
status=NOTIFICATION_VALIDATION_FAILED,
|
||||
billable_units=0)
|
||||
except BotoClientError:
|
||||
current_app.logger.exception(
|
||||
"Error when moving letter with id {} to invalid PDF bucket".format(notification.id)
|
||||
)
|
||||
update_notification_status_by_id(notification.id, NOTIFICATION_TECHNICAL_FAILURE)
|
||||
raise NotificationTechnicalFailureException
|
||||
|
||||
|
||||
@notify_celery.task(name='process-virus-scan-failed')
|
||||
def process_virus_scan_failed(filename):
|
||||
move_failed_pdf(filename, ScanErrorType.FAILURE)
|
||||
reference = get_reference_from_filename(filename)
|
||||
notification = dao_get_notification_by_reference(reference)
|
||||
updated_count = update_letter_pdf_status(reference, NOTIFICATION_VIRUS_SCAN_FAILED, billable_units=0)
|
||||
|
||||
if updated_count != 1:
|
||||
raise Exception(
|
||||
"There should only be one letter notification for each reference. Found {} notifications".format(
|
||||
updated_count
|
||||
)
|
||||
)
|
||||
|
||||
error = VirusScanError('notification id {} Virus scan failed: {}'.format(notification.id, filename))
|
||||
current_app.logger.exception(error)
|
||||
raise error
|
||||
|
||||
|
||||
@notify_celery.task(name='process-virus-scan-error')
|
||||
def process_virus_scan_error(filename):
|
||||
move_failed_pdf(filename, ScanErrorType.ERROR)
|
||||
reference = get_reference_from_filename(filename)
|
||||
notification = dao_get_notification_by_reference(reference)
|
||||
updated_count = update_letter_pdf_status(reference, NOTIFICATION_TECHNICAL_FAILURE, billable_units=0)
|
||||
|
||||
if updated_count != 1:
|
||||
raise Exception(
|
||||
"There should only be one letter notification for each reference. Found {} notifications".format(
|
||||
updated_count
|
||||
)
|
||||
)
|
||||
error = VirusScanError('notification id {} Virus scan error: {}'.format(notification.id, filename))
|
||||
current_app.logger.exception(error)
|
||||
raise error
|
||||
|
||||
|
||||
def update_letter_pdf_status(reference, status, billable_units, recipient_address=None):
|
||||
postage = None
|
||||
if recipient_address:
|
||||
# fix allow_international_letters
|
||||
postage = PostalAddress(raw_address=recipient_address.replace(',', '\n'),
|
||||
allow_international_letters=True
|
||||
).postage
|
||||
postage = postage if postage in INTERNATIONAL_POSTAGE_TYPES else None
|
||||
update_dict = {'status': status, 'billable_units': billable_units, 'updated_at': datetime.utcnow()}
|
||||
if postage:
|
||||
update_dict.update({'postage': postage, 'international': True})
|
||||
if recipient_address:
|
||||
update_dict['to'] = recipient_address
|
||||
update_dict['normalised_to'] = ''.join(recipient_address.split()).lower()
|
||||
return dao_update_notifications_by_reference(
|
||||
references=[reference],
|
||||
update_dict=update_dict)[0]
|
||||
|
||||
|
||||
def replay_letters_in_error(filename=None):
|
||||
# This method can be used to replay letters that end up in the ERROR directory.
|
||||
# We had an incident where clamAV was not processing the virus scan.
|
||||
if filename:
|
||||
move_error_pdf_to_scan_bucket(filename)
|
||||
# call task to add the filename to anti virus queue
|
||||
current_app.logger.info("Calling scan_file for: {}".format(filename))
|
||||
|
||||
if current_app.config['ANTIVIRUS_ENABLED']:
|
||||
notify_celery.send_task(
|
||||
name=TaskNames.SCAN_FILE,
|
||||
kwargs={'filename': filename},
|
||||
queue=QueueNames.ANTIVIRUS,
|
||||
)
|
||||
else:
|
||||
# stub out antivirus in dev
|
||||
sanitise_letter.apply_async(
|
||||
[filename],
|
||||
queue=QueueNames.LETTERS
|
||||
)
|
||||
else:
|
||||
error_files = get_file_names_from_error_bucket()
|
||||
for item in error_files:
|
||||
moved_file_name = item.key.split('/')[1]
|
||||
current_app.logger.info("Calling scan_file for: {}".format(moved_file_name))
|
||||
move_error_pdf_to_scan_bucket(moved_file_name)
|
||||
# call task to add the filename to anti virus queue
|
||||
if current_app.config['ANTIVIRUS_ENABLED']:
|
||||
notify_celery.send_task(
|
||||
name=TaskNames.SCAN_FILE,
|
||||
kwargs={'filename': moved_file_name},
|
||||
queue=QueueNames.ANTIVIRUS,
|
||||
)
|
||||
else:
|
||||
# stub out antivirus in dev
|
||||
sanitise_letter.apply_async(
|
||||
[filename],
|
||||
queue=QueueNames.LETTERS
|
||||
)
|
||||
|
||||
|
||||
@notify_celery.task(name='resanitise-pdf')
|
||||
def resanitise_pdf(notification_id):
|
||||
"""
|
||||
`notification_id` is the notification id for a PDF letter which was either uploaded or sent using the API.
|
||||
|
||||
This task calls the `recreate_pdf_for_precompiled_letter` template preview task which recreates the
|
||||
PDF for a letter which is already sanitised and in the letters-pdf bucket. The new file that is generated
|
||||
will then overwrite the existing letter in the letters-pdf bucket.
|
||||
"""
|
||||
notification = get_notification_by_id(notification_id)
|
||||
|
||||
# folder_name is the folder that the letter is in the letters-pdf bucket e.g. '2021-10-10/'
|
||||
folder_name = get_folder_name(notification.created_at)
|
||||
|
||||
filename = generate_letter_pdf_filename(
|
||||
reference=notification.reference,
|
||||
created_at=notification.created_at,
|
||||
ignore_folder=True,
|
||||
postage=notification.postage
|
||||
)
|
||||
|
||||
notify_celery.send_task(
|
||||
name=TaskNames.RECREATE_PDF_FOR_PRECOMPILED_LETTER,
|
||||
kwargs={
|
||||
'notification_id': str(notification.id),
|
||||
'file_location': f'{folder_name}{filename}',
|
||||
'allow_international_letters': notification.service.has_permission(
|
||||
INTERNATIONAL_LETTERS
|
||||
),
|
||||
},
|
||||
queue=QueueNames.SANITISE_LETTERS,
|
||||
)
|
||||
@@ -1,15 +1,10 @@
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
import pytz
|
||||
from flask import current_app
|
||||
from notifications_utils.clients.zendesk.zendesk_client import (
|
||||
NotifySupportTicket,
|
||||
)
|
||||
from notifications_utils.timezones import convert_utc_to_local_timezone
|
||||
from sqlalchemy import func
|
||||
from sqlalchemy.exc import SQLAlchemyError
|
||||
|
||||
from app import notify_celery, statsd_client, zendesk_client
|
||||
from app import notify_celery, statsd_client
|
||||
from app.aws import s3
|
||||
from app.celery.process_ses_receipts_tasks import check_and_queue_callback_task
|
||||
from app.config import QueueNames
|
||||
@@ -29,15 +24,7 @@ from app.dao.notifications_dao import (
|
||||
from app.dao.service_data_retention_dao import (
|
||||
fetch_service_data_retention_for_all_services_by_notification_type,
|
||||
)
|
||||
from app.models import (
|
||||
EMAIL_TYPE,
|
||||
KEY_TYPE_NORMAL,
|
||||
LETTER_TYPE,
|
||||
NOTIFICATION_SENDING,
|
||||
SMS_TYPE,
|
||||
FactProcessingTime,
|
||||
Notification,
|
||||
)
|
||||
from app.models import EMAIL_TYPE, SMS_TYPE, FactProcessingTime
|
||||
from app.utils import get_local_midnight_in_utc
|
||||
|
||||
|
||||
@@ -47,12 +34,6 @@ def remove_sms_email_csv_files():
|
||||
_remove_csv_files([EMAIL_TYPE, SMS_TYPE])
|
||||
|
||||
|
||||
@notify_celery.task(name="remove_letter_jobs")
|
||||
@cronitor("remove_letter_jobs")
|
||||
def remove_letter_csv_files():
|
||||
_remove_csv_files([LETTER_TYPE])
|
||||
|
||||
|
||||
def _remove_csv_files(job_types):
|
||||
jobs = dao_get_jobs_older_than_data_retention(notification_types=job_types)
|
||||
for job in jobs:
|
||||
@@ -65,7 +46,6 @@ def _remove_csv_files(job_types):
|
||||
def delete_notifications_older_than_retention():
|
||||
delete_email_notifications_older_than_retention.apply_async(queue=QueueNames.REPORTING)
|
||||
delete_sms_notifications_older_than_retention.apply_async(queue=QueueNames.REPORTING)
|
||||
delete_letter_notifications_older_than_retention.apply_async(queue=QueueNames.REPORTING)
|
||||
|
||||
|
||||
@notify_celery.task(name="delete-sms-notifications")
|
||||
@@ -80,12 +60,6 @@ def delete_email_notifications_older_than_retention():
|
||||
_delete_notifications_older_than_retention_by_type('email')
|
||||
|
||||
|
||||
@notify_celery.task(name="delete-letter-notifications")
|
||||
@cronitor("delete-letter-notifications")
|
||||
def delete_letter_notifications_older_than_retention():
|
||||
_delete_notifications_older_than_retention_by_type('letter')
|
||||
|
||||
|
||||
def _delete_notifications_older_than_retention_by_type(notification_type):
|
||||
flexible_data_retention = fetch_service_data_retention_for_all_services_by_notification_type(notification_type)
|
||||
|
||||
@@ -185,110 +159,6 @@ def delete_inbound_sms():
|
||||
raise
|
||||
|
||||
|
||||
@notify_celery.task(name="raise-alert-if-letter-notifications-still-sending")
|
||||
@cronitor("raise-alert-if-letter-notifications-still-sending")
|
||||
def raise_alert_if_letter_notifications_still_sending():
|
||||
still_sending_count, sent_date = get_letter_notifications_still_sending_when_they_shouldnt_be()
|
||||
|
||||
if still_sending_count:
|
||||
message = "There are {} letters in the 'sending' state from {}".format(
|
||||
still_sending_count,
|
||||
sent_date.strftime('%A %d %B')
|
||||
)
|
||||
# Only send alerts in production
|
||||
if current_app.config['NOTIFY_ENVIRONMENT'] in ['live', 'production', 'test']:
|
||||
message += ". Resolve using https://github.com/alphagov/notifications-manuals/wiki/Support-Runbook#deal-with-letters-still-in-sending" # noqa
|
||||
|
||||
ticket = NotifySupportTicket(
|
||||
subject=f"[{current_app.config['NOTIFY_ENVIRONMENT']}] Letters still sending",
|
||||
email_ccs=current_app.config['DVLA_EMAIL_ADDRESSES'],
|
||||
message=message,
|
||||
ticket_type=NotifySupportTicket.TYPE_INCIDENT,
|
||||
technical_ticket=True,
|
||||
ticket_categories=['notify_letters']
|
||||
)
|
||||
zendesk_client.send_ticket_to_zendesk(ticket)
|
||||
else:
|
||||
current_app.logger.info(message)
|
||||
|
||||
|
||||
def get_letter_notifications_still_sending_when_they_shouldnt_be():
|
||||
today = datetime.utcnow().date()
|
||||
|
||||
# Do nothing on the weekend
|
||||
if today.isoweekday() in {6, 7}: # sat, sun
|
||||
return 0, None
|
||||
|
||||
if today.isoweekday() in {1, 2}: # mon, tues. look for files from before the weekend
|
||||
offset_days = 4
|
||||
else:
|
||||
offset_days = 2
|
||||
|
||||
expected_sent_date = today - timedelta(days=offset_days)
|
||||
|
||||
q = Notification.query.filter(
|
||||
Notification.notification_type == LETTER_TYPE,
|
||||
Notification.status == NOTIFICATION_SENDING,
|
||||
Notification.key_type == KEY_TYPE_NORMAL,
|
||||
func.date(Notification.sent_at) <= expected_sent_date
|
||||
)
|
||||
|
||||
return q.count(), expected_sent_date
|
||||
|
||||
|
||||
@notify_celery.task(name='raise-alert-if-no-letter-ack-file')
|
||||
@cronitor('raise-alert-if-no-letter-ack-file')
|
||||
def letter_raise_alert_if_no_ack_file_for_zip():
|
||||
# get a list of zip files since yesterday
|
||||
zip_file_set = set()
|
||||
today_str = datetime.utcnow().strftime('%Y-%m-%d')
|
||||
yesterday = datetime.now(tz=pytz.utc) - timedelta(days=1) # AWS datetime format
|
||||
|
||||
for key in s3.get_list_of_files_by_suffix(bucket_name=current_app.config['LETTERS_PDF_BUCKET_NAME'],
|
||||
subfolder=today_str + '/zips_sent',
|
||||
suffix='.TXT'):
|
||||
subname = key.split('/')[-1] # strip subfolder in name
|
||||
zip_file_set.add(subname.upper().replace('.ZIP.TXT', ''))
|
||||
|
||||
# get acknowledgement file
|
||||
ack_file_set = set()
|
||||
|
||||
for key in s3.get_list_of_files_by_suffix(bucket_name=current_app.config['DVLA_RESPONSE_BUCKET_NAME'],
|
||||
subfolder='root/dispatch', suffix='.ACK.txt', last_modified=yesterday):
|
||||
ack_file_set.add(key.lstrip('root/dispatch').upper().replace('.ACK.TXT', '')) # noqa
|
||||
|
||||
message = '\n'.join([
|
||||
"Letter ack file does not contain all zip files sent."
|
||||
"",
|
||||
f"See runbook at https://github.com/alphagov/notifications-manuals/wiki/Support-Runbook#letter-ack-file-does-not-contain-all-zip-files-sent\n", # noqa
|
||||
f"pdf bucket: {current_app.config['LETTERS_PDF_BUCKET_NAME']}, subfolder: {datetime.utcnow().strftime('%Y-%m-%d')}/zips_sent", # noqa
|
||||
f"ack bucket: {current_app.config['DVLA_RESPONSE_BUCKET_NAME']}",
|
||||
"",
|
||||
f"Missing ack for zip files: {str(sorted(zip_file_set - ack_file_set))}",
|
||||
])
|
||||
|
||||
# strip empty element before comparison
|
||||
ack_file_set.discard('')
|
||||
zip_file_set.discard('')
|
||||
|
||||
if len(zip_file_set - ack_file_set) > 0:
|
||||
if current_app.config['NOTIFY_ENVIRONMENT'] in ['live', 'production', 'test']:
|
||||
ticket = NotifySupportTicket(
|
||||
subject="Letter acknowledge error",
|
||||
message=message,
|
||||
ticket_type=NotifySupportTicket.TYPE_INCIDENT,
|
||||
technical_ticket=True,
|
||||
ticket_categories=['notify_letters']
|
||||
)
|
||||
zendesk_client.send_ticket_to_zendesk(ticket)
|
||||
current_app.logger.error(message)
|
||||
|
||||
if len(ack_file_set - zip_file_set) > 0:
|
||||
current_app.logger.info(
|
||||
"letter ack contains zip that is not for today: {}".format(ack_file_set - zip_file_set)
|
||||
)
|
||||
|
||||
|
||||
@notify_celery.task(name='save-daily-notification-processing-time')
|
||||
@cronitor("save-daily-notification-processing-time")
|
||||
def save_daily_notification_processing_time(local_date=None):
|
||||
|
||||
@@ -12,7 +12,7 @@ from app.dao.fact_billing_dao import (
|
||||
)
|
||||
from app.dao.fact_notification_status_dao import update_fact_notification_status
|
||||
from app.dao.notifications_dao import get_service_ids_with_notifications_on_date
|
||||
from app.models import EMAIL_TYPE, LETTER_TYPE, SMS_TYPE
|
||||
from app.models import EMAIL_TYPE, SMS_TYPE
|
||||
|
||||
|
||||
@notify_celery.task(name="create-nightly-billing")
|
||||
@@ -72,10 +72,6 @@ def create_nightly_notification_status():
|
||||
because all outstanding email / SMS are "timed out" after 3 days, and
|
||||
we reject delivery receipts after this point.
|
||||
|
||||
- Letter statuses don't change after 9 days. There's no "timeout" for
|
||||
letters but this is the longest we've had to cope with in the past - due
|
||||
to major issues with our print provider.
|
||||
|
||||
Because the time range of the task exceeds the minimum possible retention
|
||||
period (3 days), we need to choose which table to query for each service.
|
||||
|
||||
@@ -89,8 +85,8 @@ def create_nightly_notification_status():
|
||||
|
||||
yesterday = convert_utc_to_local_timezone(datetime.utcnow()).date() - timedelta(days=1)
|
||||
|
||||
for notification_type in [SMS_TYPE, EMAIL_TYPE, LETTER_TYPE]:
|
||||
days = 10 if notification_type == LETTER_TYPE else 4
|
||||
for notification_type in [SMS_TYPE, EMAIL_TYPE]:
|
||||
days = 4
|
||||
|
||||
for i in range(days):
|
||||
process_day = yesterday - timedelta(days=i)
|
||||
|
||||
@@ -1,13 +1,8 @@
|
||||
import json
|
||||
import random
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
from flask import current_app
|
||||
from notifications_utils.s3 import s3upload
|
||||
from requests import HTTPError, request
|
||||
|
||||
from app import notify_celery
|
||||
from app.aws.s3 import file_exists
|
||||
from app.celery.process_ses_receipts_tasks import process_ses_results
|
||||
from app.config import QueueNames
|
||||
from app.models import SMS_TYPE
|
||||
@@ -85,46 +80,6 @@ def sns_callback(notification_id, to):
|
||||
# "deliverytime": "2016-04-05 16:01:07"})
|
||||
|
||||
|
||||
@notify_celery.task(bind=True, name="create-fake-letter-response-file", max_retries=5, default_retry_delay=300)
|
||||
def create_fake_letter_response_file(self, reference):
|
||||
now = datetime.utcnow()
|
||||
dvla_response_data = '{}|Sent|0|Sorted'.format(reference)
|
||||
|
||||
# try and find a filename that hasn't been taken yet - from a random time within the last 30 seconds
|
||||
for i in sorted(range(30), key=lambda _: random.random()): # nosec B311 - not security related
|
||||
upload_file_name = 'NOTIFY-{}-RSP.TXT'.format((now - timedelta(seconds=i)).strftime('%Y%m%d%H%M%S'))
|
||||
if not file_exists(current_app.config['DVLA_RESPONSE_BUCKET_NAME'], upload_file_name):
|
||||
break
|
||||
else:
|
||||
raise ValueError(
|
||||
'cant create fake letter response file for {} - too many files for that time already exist on s3'.format(
|
||||
reference
|
||||
)
|
||||
)
|
||||
|
||||
s3upload(
|
||||
filedata=dvla_response_data,
|
||||
region=current_app.config['AWS_REGION'],
|
||||
bucket_name=current_app.config['DVLA_RESPONSE_BUCKET_NAME'],
|
||||
file_location=upload_file_name
|
||||
)
|
||||
current_app.logger.info("Fake DVLA response file {}, content [{}], uploaded to {}, created at {}".format(
|
||||
upload_file_name, dvla_response_data, current_app.config['DVLA_RESPONSE_BUCKET_NAME'], now))
|
||||
|
||||
# on development we can't trigger SNS callbacks so we need to manually hit the DVLA callback endpoint
|
||||
if current_app.config['NOTIFY_ENVIRONMENT'] == 'development':
|
||||
make_request('letter', 'dvla', _fake_sns_s3_callback(upload_file_name), None)
|
||||
|
||||
|
||||
def _fake_sns_s3_callback(filename):
|
||||
message_contents = '{"Records":[{"s3":{"object":{"key":"%s"}}}]}' % (filename) # noqa
|
||||
return json.dumps({
|
||||
"Type": "Notification",
|
||||
"MessageId": "some-message-id",
|
||||
"Message": message_contents
|
||||
})
|
||||
|
||||
|
||||
def ses_notification_callback(reference):
|
||||
ses_message_body = {
|
||||
'delivery': {
|
||||
|
||||
@@ -8,15 +8,13 @@ from sqlalchemy import between
|
||||
from sqlalchemy.exc import SQLAlchemyError
|
||||
|
||||
from app import notify_celery, zendesk_client
|
||||
from app.aws import s3
|
||||
from app.celery.letters_pdf_tasks import get_pdf_for_templated_letter
|
||||
from app.celery.tasks import (
|
||||
get_recipient_csv_and_template_and_sender_id,
|
||||
process_incomplete_jobs,
|
||||
process_job,
|
||||
process_row,
|
||||
)
|
||||
from app.config import QueueNames, TaskNames
|
||||
from app.config import QueueNames
|
||||
from app.dao.invited_org_user_dao import (
|
||||
delete_org_invitations_created_more_than_two_days_ago,
|
||||
)
|
||||
@@ -29,12 +27,7 @@ from app.dao.jobs_dao import (
|
||||
find_jobs_with_missing_rows,
|
||||
find_missing_row_for_job,
|
||||
)
|
||||
from app.dao.notifications_dao import (
|
||||
dao_old_letters_with_created_status,
|
||||
dao_precompiled_letters_still_pending_virus_check,
|
||||
letters_missing_from_sending_bucket,
|
||||
notifications_not_yet_sent,
|
||||
)
|
||||
from app.dao.notifications_dao import notifications_not_yet_sent
|
||||
from app.dao.provider_details_dao import (
|
||||
dao_adjust_provider_priority_back_to_resting_points,
|
||||
)
|
||||
@@ -43,7 +36,6 @@ from app.dao.services_dao import (
|
||||
dao_find_services_with_high_failure_rates,
|
||||
)
|
||||
from app.dao.users_dao import delete_codes_older_created_more_than_a_day_ago
|
||||
from app.letters.utils import generate_letter_pdf_filename
|
||||
from app.models import (
|
||||
EMAIL_TYPE,
|
||||
JOB_STATUS_ERROR,
|
||||
@@ -165,88 +157,6 @@ def replay_created_notifications():
|
||||
for n in notifications_to_resend:
|
||||
send_notification_to_queue(notification=n, research_mode=n.service.research_mode)
|
||||
|
||||
# if the letter has not be send after an hour, then create a zendesk ticket
|
||||
letters = letters_missing_from_sending_bucket(resend_created_notifications_older_than)
|
||||
|
||||
if len(letters) > 0:
|
||||
msg = "{} letters were created over an hour ago, " \
|
||||
"but do not have an updated_at timestamp or billable units. " \
|
||||
"\n Creating app.celery.letters_pdf_tasks.create_letters tasks to upload letter to S3 " \
|
||||
"and update notifications for the following notification ids: " \
|
||||
"\n {}".format(len(letters), [x.id for x in letters])
|
||||
|
||||
current_app.logger.info(msg)
|
||||
for letter in letters:
|
||||
get_pdf_for_templated_letter.apply_async([str(letter.id)], queue=QueueNames.CREATE_LETTERS_PDF)
|
||||
|
||||
|
||||
@notify_celery.task(name='check-if-letters-still-pending-virus-check')
|
||||
def check_if_letters_still_pending_virus_check():
|
||||
letters = []
|
||||
|
||||
for letter in dao_precompiled_letters_still_pending_virus_check():
|
||||
# find letter in the scan bucket
|
||||
filename = generate_letter_pdf_filename(
|
||||
letter.reference,
|
||||
letter.created_at,
|
||||
ignore_folder=True,
|
||||
postage=letter.postage
|
||||
)
|
||||
|
||||
if s3.file_exists(current_app.config['LETTERS_SCAN_BUCKET_NAME'], filename):
|
||||
current_app.logger.warning(
|
||||
f'Letter id {letter.id} got stuck in pending-virus-check. Sending off for scan again.'
|
||||
)
|
||||
notify_celery.send_task(
|
||||
name=TaskNames.SCAN_FILE,
|
||||
kwargs={'filename': filename},
|
||||
queue=QueueNames.ANTIVIRUS,
|
||||
)
|
||||
else:
|
||||
letters.append(letter)
|
||||
|
||||
if len(letters) > 0:
|
||||
letter_ids = [(str(letter.id), letter.reference) for letter in letters]
|
||||
|
||||
msg = f"""{len(letters)} precompiled letters have been pending-virus-check for over 90 minutes.
|
||||
We couldn't find them in the scan bucket. We'll need to find out where the files are and kick them off
|
||||
again or move them to technical failure.
|
||||
|
||||
Notifications: {sorted(letter_ids)}"""
|
||||
|
||||
if current_app.config['NOTIFY_ENVIRONMENT'] in ['live', 'production', 'test']:
|
||||
ticket = NotifySupportTicket(
|
||||
subject=f"[{current_app.config['NOTIFY_ENVIRONMENT']}] Letters still pending virus check",
|
||||
message=msg,
|
||||
ticket_type=NotifySupportTicket.TYPE_INCIDENT,
|
||||
technical_ticket=True,
|
||||
ticket_categories=['notify_letters']
|
||||
)
|
||||
zendesk_client.send_ticket_to_zendesk(ticket)
|
||||
current_app.logger.error(msg)
|
||||
|
||||
|
||||
@notify_celery.task(name='check-if-letters-still-in-created')
|
||||
def check_if_letters_still_in_created():
|
||||
letters = dao_old_letters_with_created_status()
|
||||
|
||||
if len(letters) > 0:
|
||||
msg = "{} letters were created before 17.30 yesterday and still have 'created' status. " \
|
||||
"Follow runbook to resolve: " \
|
||||
"https://github.com/alphagov/notifications-manuals/wiki/Support-Runbook" \
|
||||
"#deal-with-Letters-still-in-created.".format(len(letters))
|
||||
|
||||
if current_app.config['NOTIFY_ENVIRONMENT'] in ['live', 'production', 'test']:
|
||||
ticket = NotifySupportTicket(
|
||||
subject=f"[{current_app.config['NOTIFY_ENVIRONMENT']}] Letters still in 'created' status",
|
||||
message=msg,
|
||||
ticket_type=NotifySupportTicket.TYPE_INCIDENT,
|
||||
technical_ticket=True,
|
||||
ticket_categories=['notify_letters']
|
||||
)
|
||||
zendesk_client.send_ticket_to_zendesk(ticket)
|
||||
current_app.logger.error(msg)
|
||||
|
||||
|
||||
@notify_celery.task(name='check-for-missing-rows-in-completed-jobs')
|
||||
def check_for_missing_rows_in_completed_jobs():
|
||||
|
||||
@@ -1,63 +1,39 @@
|
||||
import json
|
||||
from collections import defaultdict, namedtuple
|
||||
from datetime import datetime
|
||||
|
||||
from flask import current_app
|
||||
from notifications_utils.insensitive_dict import InsensitiveDict
|
||||
from notifications_utils.postal_address import PostalAddress
|
||||
from notifications_utils.recipients import RecipientCSV
|
||||
from notifications_utils.timezones import convert_utc_to_local_timezone
|
||||
from requests import HTTPError, RequestException, request
|
||||
from sqlalchemy.exc import IntegrityError, SQLAlchemyError
|
||||
|
||||
from app import create_random_identifier, create_uuid, encryption, notify_celery
|
||||
from app import create_uuid, encryption, notify_celery
|
||||
from app.aws import s3
|
||||
from app.celery import letters_pdf_tasks, provider_tasks, research_mode_tasks
|
||||
from app.celery import provider_tasks
|
||||
from app.config import QueueNames
|
||||
from app.dao.daily_sorted_letter_dao import (
|
||||
dao_create_or_update_daily_sorted_letter,
|
||||
)
|
||||
from app.dao.inbound_sms_dao import dao_get_inbound_sms_by_id
|
||||
from app.dao.jobs_dao import dao_get_job_by_id, dao_update_job
|
||||
from app.dao.notifications_dao import (
|
||||
dao_get_last_notification_added_for_job_id,
|
||||
dao_get_notification_history_by_reference,
|
||||
dao_update_notifications_by_reference,
|
||||
get_notification_by_id,
|
||||
update_notification_status_by_reference,
|
||||
)
|
||||
from app.dao.provider_details_dao import (
|
||||
get_provider_details_by_notification_type,
|
||||
)
|
||||
from app.dao.returned_letters_dao import insert_or_update_returned_letters
|
||||
from app.dao.service_email_reply_to_dao import dao_get_reply_to_by_id
|
||||
from app.dao.service_inbound_api_dao import get_service_inbound_api_for_service
|
||||
from app.dao.service_sms_sender_dao import dao_get_service_sms_senders_by_id
|
||||
from app.dao.templates_dao import dao_get_template_by_id
|
||||
from app.exceptions import DVLAException, NotificationTechnicalFailureException
|
||||
from app.models import (
|
||||
DVLA_RESPONSE_STATUS_SENT,
|
||||
EMAIL_TYPE,
|
||||
JOB_STATUS_CANCELLED,
|
||||
JOB_STATUS_FINISHED,
|
||||
JOB_STATUS_IN_PROGRESS,
|
||||
JOB_STATUS_PENDING,
|
||||
KEY_TYPE_NORMAL,
|
||||
LETTER_TYPE,
|
||||
NOTIFICATION_CREATED,
|
||||
NOTIFICATION_DELIVERED,
|
||||
NOTIFICATION_RETURNED_LETTER,
|
||||
NOTIFICATION_SENDING,
|
||||
NOTIFICATION_TECHNICAL_FAILURE,
|
||||
NOTIFICATION_TEMPORARY_FAILURE,
|
||||
SMS_TYPE,
|
||||
DailySortedLetter,
|
||||
)
|
||||
from app.notifications.process_notifications import persist_notification
|
||||
from app.notifications.validators import check_service_over_daily_message_limit
|
||||
from app.serialised_models import SerialisedService, SerialisedTemplate
|
||||
from app.service.utils import service_allowed_to_send_to
|
||||
from app.utils import DATETIME_FORMAT, get_reference_from_personalisation
|
||||
from app.utils import DATETIME_FORMAT
|
||||
from app.v2.errors import TooManyRequestsError
|
||||
|
||||
|
||||
@@ -136,8 +112,7 @@ def process_row(row, template, job, service, sender_id=None):
|
||||
|
||||
send_fns = {
|
||||
SMS_TYPE: save_sms,
|
||||
EMAIL_TYPE: save_email,
|
||||
LETTER_TYPE: save_letter
|
||||
EMAIL_TYPE: save_email
|
||||
}
|
||||
|
||||
send_fn = send_fns[template_type]
|
||||
@@ -341,103 +316,6 @@ def save_api_email_or_sms(self, encrypted_notification):
|
||||
current_app.logger.error(f"Max retry failed Failed to persist notification {notification['id']}")
|
||||
|
||||
|
||||
@notify_celery.task(bind=True, name="save-letter", max_retries=5, default_retry_delay=300)
|
||||
def save_letter(
|
||||
self,
|
||||
service_id,
|
||||
notification_id,
|
||||
encrypted_notification,
|
||||
):
|
||||
notification = encryption.decrypt(encrypted_notification)
|
||||
|
||||
postal_address = PostalAddress.from_personalisation(
|
||||
InsensitiveDict(notification['personalisation'])
|
||||
)
|
||||
|
||||
service = SerialisedService.from_id(service_id)
|
||||
template = SerialisedTemplate.from_id_and_service_id(
|
||||
notification['template'],
|
||||
service_id=service.id,
|
||||
version=notification['template_version'],
|
||||
)
|
||||
|
||||
try:
|
||||
# if we don't want to actually send the letter, then start it off in SENDING so we don't pick it up
|
||||
status = NOTIFICATION_CREATED if not service.research_mode else NOTIFICATION_SENDING
|
||||
|
||||
saved_notification = persist_notification(
|
||||
template_id=notification['template'],
|
||||
template_version=notification['template_version'],
|
||||
postage=postal_address.postage if postal_address.international else template.postage,
|
||||
recipient=postal_address.normalised,
|
||||
service=service,
|
||||
personalisation=notification['personalisation'],
|
||||
notification_type=LETTER_TYPE,
|
||||
api_key_id=None,
|
||||
key_type=KEY_TYPE_NORMAL,
|
||||
created_at=datetime.utcnow(),
|
||||
job_id=notification['job'],
|
||||
job_row_number=notification['row_number'],
|
||||
notification_id=notification_id,
|
||||
reference=create_random_identifier(),
|
||||
client_reference=get_reference_from_personalisation(notification['personalisation']),
|
||||
reply_to_text=template.reply_to_text,
|
||||
status=status
|
||||
)
|
||||
|
||||
if not service.research_mode:
|
||||
letters_pdf_tasks.get_pdf_for_templated_letter.apply_async(
|
||||
[str(saved_notification.id)],
|
||||
queue=QueueNames.CREATE_LETTERS_PDF
|
||||
)
|
||||
elif current_app.config['NOTIFY_ENVIRONMENT'] in ['preview', 'development']:
|
||||
research_mode_tasks.create_fake_letter_response_file.apply_async(
|
||||
(saved_notification.reference,),
|
||||
queue=QueueNames.RESEARCH_MODE
|
||||
)
|
||||
else:
|
||||
update_notification_status_by_reference(saved_notification.reference, 'delivered')
|
||||
|
||||
current_app.logger.debug("Letter {} created at {}".format(saved_notification.id, saved_notification.created_at))
|
||||
except SQLAlchemyError as e:
|
||||
handle_exception(self, notification, notification_id, e)
|
||||
|
||||
|
||||
@notify_celery.task(bind=True, name='update-letter-notifications-to-sent')
|
||||
def update_letter_notifications_to_sent_to_dvla(self, notification_references):
|
||||
# This task will be called by the FTP app to update notifications as sent to DVLA
|
||||
provider = get_provider_details_by_notification_type(LETTER_TYPE)[0]
|
||||
|
||||
updated_count, _ = dao_update_notifications_by_reference(
|
||||
notification_references,
|
||||
{
|
||||
'status': NOTIFICATION_SENDING,
|
||||
'sent_by': provider.identifier,
|
||||
'sent_at': datetime.utcnow(),
|
||||
'updated_at': datetime.utcnow()
|
||||
}
|
||||
)
|
||||
|
||||
current_app.logger.info("Updated {} letter notifications to sending".format(updated_count))
|
||||
|
||||
|
||||
@notify_celery.task(bind=True, name='update-letter-notifications-to-error')
|
||||
def update_letter_notifications_to_error(self, notification_references):
|
||||
# This task will be called by the FTP app to update notifications as sent to DVLA
|
||||
|
||||
updated_count, _ = dao_update_notifications_by_reference(
|
||||
notification_references,
|
||||
{
|
||||
'status': NOTIFICATION_TECHNICAL_FAILURE,
|
||||
'updated_at': datetime.utcnow()
|
||||
}
|
||||
)
|
||||
message = "Updated {} letter notifications to technical-failure with references {}".format(
|
||||
updated_count, notification_references
|
||||
)
|
||||
raise NotificationTechnicalFailureException(message)
|
||||
|
||||
|
||||
def handle_exception(task, notification, notification_id, exc):
|
||||
if not get_notification_by_id(notification_id):
|
||||
retry_msg = '{task} notification for job {job} row number {row} and notification id {noti}'.format(
|
||||
@@ -457,108 +335,6 @@ def handle_exception(task, notification, notification_id, exc):
|
||||
current_app.logger.error('Max retry failed' + retry_msg)
|
||||
|
||||
|
||||
@notify_celery.task(bind=True, name='update-letter-notifications-statuses')
|
||||
def update_letter_notifications_statuses(self, filename):
|
||||
notification_updates = parse_dvla_file(filename)
|
||||
|
||||
temporary_failures = []
|
||||
|
||||
for update in notification_updates:
|
||||
check_billable_units(update)
|
||||
update_letter_notification(filename, temporary_failures, update)
|
||||
if temporary_failures:
|
||||
# This will alert Notify that DVLA was unable to deliver the letters, we need to investigate
|
||||
message = "DVLA response file: {filename} has failed letters with notification.reference {failures}" \
|
||||
.format(filename=filename, failures=temporary_failures)
|
||||
raise DVLAException(message)
|
||||
|
||||
|
||||
@notify_celery.task(bind=True, name="record-daily-sorted-counts")
|
||||
def record_daily_sorted_counts(self, filename):
|
||||
sorted_letter_counts = defaultdict(int)
|
||||
notification_updates = parse_dvla_file(filename)
|
||||
for update in notification_updates:
|
||||
sorted_letter_counts[update.cost_threshold.lower()] += 1
|
||||
|
||||
unknown_status = sorted_letter_counts.keys() - {'unsorted', 'sorted'}
|
||||
if unknown_status:
|
||||
message = 'DVLA response file: {} contains unknown Sorted status {}'.format(
|
||||
filename, unknown_status.__repr__()
|
||||
)
|
||||
raise DVLAException(message)
|
||||
|
||||
billing_date = get_local_billing_date_from_filename(filename)
|
||||
persist_daily_sorted_letter_counts(day=billing_date,
|
||||
file_name=filename,
|
||||
sorted_letter_counts=sorted_letter_counts)
|
||||
|
||||
|
||||
def parse_dvla_file(filename):
|
||||
bucket_location = '{}-ftp'.format(current_app.config['NOTIFY_EMAIL_DOMAIN'])
|
||||
response_file_content = s3.get_s3_file(bucket_location, filename)
|
||||
|
||||
try:
|
||||
return process_updates_from_file(response_file_content)
|
||||
except TypeError:
|
||||
raise DVLAException('DVLA response file: {} has an invalid format'.format(filename))
|
||||
|
||||
|
||||
def get_local_billing_date_from_filename(filename):
|
||||
# exclude seconds from the date since we don't need it. We got a date ending in 60 second - which is not valid.
|
||||
datetime_string = filename.split('-')[1][:-2]
|
||||
datetime_obj = datetime.strptime(datetime_string, '%Y%m%d%H%M')
|
||||
return convert_utc_to_local_timezone(datetime_obj).date()
|
||||
|
||||
|
||||
def persist_daily_sorted_letter_counts(day, file_name, sorted_letter_counts):
|
||||
daily_letter_count = DailySortedLetter(
|
||||
billing_day=day,
|
||||
file_name=file_name,
|
||||
unsorted_count=sorted_letter_counts['unsorted'],
|
||||
sorted_count=sorted_letter_counts['sorted']
|
||||
)
|
||||
dao_create_or_update_daily_sorted_letter(daily_letter_count)
|
||||
|
||||
|
||||
def process_updates_from_file(response_file):
|
||||
NotificationUpdate = namedtuple('NotificationUpdate', ['reference', 'status', 'page_count', 'cost_threshold'])
|
||||
notification_updates = [NotificationUpdate(*line.split('|')) for line in response_file.splitlines()]
|
||||
return notification_updates
|
||||
|
||||
|
||||
def update_letter_notification(filename, temporary_failures, update):
|
||||
if update.status == DVLA_RESPONSE_STATUS_SENT:
|
||||
status = NOTIFICATION_DELIVERED
|
||||
else:
|
||||
status = NOTIFICATION_TEMPORARY_FAILURE
|
||||
temporary_failures.append(update.reference)
|
||||
|
||||
updated_count, _ = dao_update_notifications_by_reference(
|
||||
references=[update.reference],
|
||||
update_dict={"status": status,
|
||||
"updated_at": datetime.utcnow()
|
||||
}
|
||||
)
|
||||
|
||||
if not updated_count:
|
||||
msg = "Update letter notification file {filename} failed: notification either not found " \
|
||||
"or already updated from delivered. Status {status} for notification reference {reference}".format(
|
||||
filename=filename, status=status, reference=update.reference)
|
||||
current_app.logger.info(msg)
|
||||
|
||||
|
||||
def check_billable_units(notification_update):
|
||||
notification = dao_get_notification_history_by_reference(notification_update.reference)
|
||||
|
||||
if int(notification_update.page_count) != notification.billable_units:
|
||||
msg = 'Notification with id {} has {} billable_units but DVLA says page count is {}'.format(
|
||||
notification.id, notification.billable_units, notification_update.page_count)
|
||||
try:
|
||||
raise DVLAException(msg)
|
||||
except DVLAException:
|
||||
current_app.logger.exception(msg)
|
||||
|
||||
|
||||
@notify_celery.task(bind=True, name="send-inbound-sms", max_retries=5, default_retry_delay=300)
|
||||
def send_inbound_sms_to_service(self, inbound_sms_id, service_id):
|
||||
inbound_api = get_service_inbound_api_for_service(service_id=service_id)
|
||||
@@ -647,19 +423,3 @@ def process_incomplete_job(job_id):
|
||||
process_row(row, template, job, job.service, sender_id=sender_id)
|
||||
|
||||
job_complete(job, resumed=True)
|
||||
|
||||
|
||||
@notify_celery.task(name='process-returned-letters-list')
|
||||
def process_returned_letters_list(notification_references):
|
||||
updated, updated_history = dao_update_notifications_by_reference(
|
||||
notification_references,
|
||||
{"status": NOTIFICATION_RETURNED_LETTER}
|
||||
)
|
||||
|
||||
insert_or_update_returned_letters(notification_references)
|
||||
|
||||
current_app.logger.info(
|
||||
"Updated {} letter notifications ({} history notifications, from {} references) to returned-letter".format(
|
||||
updated, updated_history, len(notification_references)
|
||||
)
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user