mirror of
https://github.com/GSA/notifications-api.git
synced 2025-12-17 18:52:30 -05:00
Create new task to build dvla file.
This will transform each notification in a job to a row in a file. The file is then uploaded to S3. The files will later be aggregated by the notifications-ftp app to send to dvla. The method to upload the file to S3 should be pulled into notifications-utils package. It is the same method used in notifications-admin.
This commit is contained in:
@@ -1,10 +1,12 @@
|
||||
import botocore
|
||||
from boto3 import resource
|
||||
from datetime import (datetime)
|
||||
|
||||
from flask import current_app
|
||||
from notifications_utils.recipients import (
|
||||
RecipientCSV
|
||||
)
|
||||
from notifications_utils.template import SMSMessageTemplate, WithSubjectTemplate
|
||||
from notifications_utils.template import SMSMessageTemplate, WithSubjectTemplate, LetterDVLATemplate
|
||||
from sqlalchemy.exc import SQLAlchemyError
|
||||
from app import (
|
||||
create_uuid,
|
||||
@@ -16,8 +18,9 @@ from app.aws import s3
|
||||
from app.celery import provider_tasks
|
||||
from app.dao.jobs_dao import (
|
||||
dao_update_job,
|
||||
dao_get_job_by_id
|
||||
)
|
||||
dao_get_job_by_id,
|
||||
all_notifications_are_created_for_job,
|
||||
dao_get_all_notifications_for_job)
|
||||
from app.dao.notifications_dao import get_notification_by_id
|
||||
from app.dao.services_dao import dao_fetch_service_by_id, fetch_todays_total_message_count
|
||||
from app.dao.templates_dao import dao_get_template_by_id
|
||||
@@ -76,6 +79,8 @@ def process_job(job_id):
|
||||
current_app.logger.info(
|
||||
"Job {} created at {} started at {} finished at {}".format(job_id, job.created_at, start, finished)
|
||||
)
|
||||
if template.template_type == LETTER_TYPE:
|
||||
build_dvla_file.apply_async([str(job.id)], queue='process-job')
|
||||
|
||||
|
||||
def process_row(row_number, recipient, personalisation, template, job, service):
|
||||
@@ -248,13 +253,61 @@ def persist_letter(
|
||||
notification_id=notification_id
|
||||
)
|
||||
|
||||
# TODO: deliver letters
|
||||
|
||||
current_app.logger.info("Letter {} created at {}".format(saved_notification.id, created_at))
|
||||
except SQLAlchemyError as e:
|
||||
handle_exception(self, notification, notification_id, e)
|
||||
|
||||
|
||||
@notify_celery.task(bind=True, name="build-dvla-file", max_retries=5, default_retry_delay=300)
|
||||
@statsd(namespace="tasks")
|
||||
def build_dvla_file(self, job_id):
|
||||
if all_notifications_are_created_for_job(job_id):
|
||||
notifications = dao_get_all_notifications_for_job(job_id)
|
||||
file = ""
|
||||
for n in notifications:
|
||||
t = {"content": n.template.content, "subject": n.template.subject}
|
||||
template = LetterDVLATemplate(t, n.personalisation, 1)
|
||||
# print(str(template))
|
||||
file = file + str(template) + "\n"
|
||||
s3upload(filedata=file,
|
||||
region=current_app.config['AWS_REGION'],
|
||||
bucket_name=current_app.config['DVLA_UPLOAD_BUCKET_NAME'],
|
||||
file_location="{}-dvla-job.text".format(job_id))
|
||||
else:
|
||||
self.retry(queue="retry", exc="All notifications for job {} are not persisted".format(job_id))
|
||||
|
||||
|
||||
def s3upload(filedata, region, bucket_name, file_location):
|
||||
# TODO: move this method to utils. Will need to change the filedata from here to send contents in filedata['data']
|
||||
_s3 = resource('s3')
|
||||
# contents = filedata['data']
|
||||
contents = filedata
|
||||
|
||||
exists = True
|
||||
try:
|
||||
_s3.meta.client.head_bucket(
|
||||
Bucket=bucket_name)
|
||||
except botocore.exceptions.ClientError as e:
|
||||
error_code = int(e.response['Error']['Code'])
|
||||
if error_code == 404:
|
||||
exists = False
|
||||
else:
|
||||
current_app.logger.error(
|
||||
"Unable to create s3 bucket {}".format(bucket_name))
|
||||
raise e
|
||||
|
||||
if not exists:
|
||||
_s3.create_bucket(Bucket=bucket_name,
|
||||
CreateBucketConfiguration={'LocationConstraint': region})
|
||||
|
||||
upload_id = create_uuid()
|
||||
upload_file_name = file_location
|
||||
key = _s3.Object(bucket_name, upload_file_name)
|
||||
key.put(Body=contents, ServerSideEncryption='AES256')
|
||||
|
||||
return upload_id
|
||||
|
||||
|
||||
def handle_exception(task, notification, notification_id, exc):
|
||||
if not get_notification_by_id(notification_id):
|
||||
retry_msg = '{task} notification for job {job} row number {row} and notification id {noti}'.format(
|
||||
|
||||
Reference in New Issue
Block a user