From 516190262ab8fcc151dffd9f933dc89cf8ef1ed0 Mon Sep 17 00:00:00 2001 From: Rebecca Law Date: Fri, 8 Nov 2019 10:30:26 +0000 Subject: [PATCH] [WIP] --- app/aws/s3.py | 5 +++++ app/celery/scheduled_tasks.py | 6 +++--- app/celery/tasks.py | 17 ++++++++++------- 3 files changed, 18 insertions(+), 10 deletions(-) diff --git a/app/aws/s3.py b/app/aws/s3.py index a926bb101..cb4fef679 100644 --- a/app/aws/s3.py +++ b/app/aws/s3.py @@ -37,6 +37,11 @@ def get_job_location(service_id, job_id): ) +def get_job_and_metadata_from_s3(service_id, job_id): + obj = get_s3_object(*get_job_location(service_id, job_id)) + return obj.get()['Body'].read().decode('utf-8'), obj.get()['Metadata'] + + def get_job_from_s3(service_id, job_id): obj = get_s3_object(*get_job_location(service_id, job_id)) return obj.get()['Body'].read().decode('utf-8') diff --git a/app/celery/scheduled_tasks.py b/app/celery/scheduled_tasks.py index 383f80462..12949a436 100644 --- a/app/celery/scheduled_tasks.py +++ b/app/celery/scheduled_tasks.py @@ -11,7 +11,7 @@ from sqlalchemy.exc import SQLAlchemyError from app import notify_celery, zendesk_client from app.celery.tasks import ( process_job, - get_recipient_csv_and_template, + get_recipient_csv_and_template_and_sender_id, process_row ) from app.config import QueueNames, TaskNames @@ -242,9 +242,9 @@ def check_for_missing_rows_in_completed_jobs(): # The sender_id is passed in with job, at this point we no longer have the sender that is passed in. # The notification will be created with the default sender. # There is a bug to fix this https://www.pivotaltracker.com/story/show/169569144 - recipient_csv, template = get_recipient_csv_and_template(job) + recipient_csv, template, sender_id = get_recipient_csv_and_template_and_sender_id(job) for row in recipient_csv.get_rows(): if row.index == row_to_process.missing_row: current_app.logger.info( "Processing missing row: {} for job: {}".format(row_to_process.missing_row, job.id)) - process_row(row, template, job, job.service) + process_row(row, template, job, job.service, sender_id=sender_id) diff --git a/app/celery/tasks.py b/app/celery/tasks.py index f489f281c..1b30093e2 100644 --- a/app/celery/tasks.py +++ b/app/celery/tasks.py @@ -27,6 +27,7 @@ from app import ( notify_celery, ) from app.aws import s3 +from app.aws.s3 import get_job_and_metadata_from_s3 from app.celery import provider_tasks, letters_pdf_tasks, research_mode_tasks from app.config import QueueNames from app.dao.daily_sorted_letter_dao import dao_create_or_update_daily_sorted_letter @@ -97,7 +98,8 @@ def process_job(job_id, sender_id=None): job.processing_started = start dao_update_job(job) - recipient_csv, template = get_recipient_csv_and_template(job) + # should I rename the variable? + recipient_csv, template, sender_id = get_recipient_csv_and_template_and_sender_id(job) current_app.logger.info("Starting job {} processing {} notifications".format(job_id, job.notification_count)) @@ -124,16 +126,17 @@ def job_complete(job, resumed=False, start=None): ) -def get_recipient_csv_and_template(job): +def get_recipient_csv_and_template_and_sender_id(job): db_template = dao_get_template_by_id(job.template_id, job.template_version) TemplateClass = get_template_class(db_template.template_type) template = TemplateClass(db_template.__dict__) - - recipient_csv = RecipientCSV(file_data=s3.get_job_from_s3(str(job.service_id), str(job.id)), + contents, meta_data = get_job_and_metadata_from_s3(service_id=str(job.service_id), job_id=str(job.id)) + recipient_csv = RecipientCSV(file_data=contents, template_type=template.template_type, placeholders=template.placeholders) - return recipient_csv, template + + return recipient_csv, template, meta_data.get("sender_id") def process_row(row, template, job, service, sender_id=None): @@ -601,11 +604,11 @@ def process_incomplete_job(job_id): current_app.logger.info("Resuming job {} from row {}".format(job_id, resume_from_row)) - recipient_csv, template = get_recipient_csv_and_template(job) + recipient_csv, template, sender_id = get_recipient_csv_and_template_and_sender_id(job) for row in recipient_csv.get_rows(): if row.index > resume_from_row: - process_row(row, template, job, job.service) + process_row(row, template, job, job.service, sender_id=sender_id) job_complete(job, resumed=True)