This commit is contained in:
Rebecca Law
2019-11-08 10:30:26 +00:00
parent d25fcc8748
commit 516190262a
3 changed files with 18 additions and 10 deletions

View File

@@ -37,6 +37,11 @@ def get_job_location(service_id, job_id):
)
def get_job_and_metadata_from_s3(service_id, job_id):
obj = get_s3_object(*get_job_location(service_id, job_id))
return obj.get()['Body'].read().decode('utf-8'), obj.get()['Metadata']
def get_job_from_s3(service_id, job_id):
obj = get_s3_object(*get_job_location(service_id, job_id))
return obj.get()['Body'].read().decode('utf-8')

View File

@@ -11,7 +11,7 @@ from sqlalchemy.exc import SQLAlchemyError
from app import notify_celery, zendesk_client
from app.celery.tasks import (
process_job,
get_recipient_csv_and_template,
get_recipient_csv_and_template_and_sender_id,
process_row
)
from app.config import QueueNames, TaskNames
@@ -242,9 +242,9 @@ def check_for_missing_rows_in_completed_jobs():
# The sender_id is passed in with job, at this point we no longer have the sender that is passed in.
# The notification will be created with the default sender.
# There is a bug to fix this https://www.pivotaltracker.com/story/show/169569144
recipient_csv, template = get_recipient_csv_and_template(job)
recipient_csv, template, sender_id = get_recipient_csv_and_template_and_sender_id(job)
for row in recipient_csv.get_rows():
if row.index == row_to_process.missing_row:
current_app.logger.info(
"Processing missing row: {} for job: {}".format(row_to_process.missing_row, job.id))
process_row(row, template, job, job.service)
process_row(row, template, job, job.service, sender_id=sender_id)

View File

@@ -27,6 +27,7 @@ from app import (
notify_celery,
)
from app.aws import s3
from app.aws.s3 import get_job_and_metadata_from_s3
from app.celery import provider_tasks, letters_pdf_tasks, research_mode_tasks
from app.config import QueueNames
from app.dao.daily_sorted_letter_dao import dao_create_or_update_daily_sorted_letter
@@ -97,7 +98,8 @@ def process_job(job_id, sender_id=None):
job.processing_started = start
dao_update_job(job)
recipient_csv, template = get_recipient_csv_and_template(job)
# should I rename the variable?
recipient_csv, template, sender_id = get_recipient_csv_and_template_and_sender_id(job)
current_app.logger.info("Starting job {} processing {} notifications".format(job_id, job.notification_count))
@@ -124,16 +126,17 @@ def job_complete(job, resumed=False, start=None):
)
def get_recipient_csv_and_template(job):
def get_recipient_csv_and_template_and_sender_id(job):
db_template = dao_get_template_by_id(job.template_id, job.template_version)
TemplateClass = get_template_class(db_template.template_type)
template = TemplateClass(db_template.__dict__)
recipient_csv = RecipientCSV(file_data=s3.get_job_from_s3(str(job.service_id), str(job.id)),
contents, meta_data = get_job_and_metadata_from_s3(service_id=str(job.service_id), job_id=str(job.id))
recipient_csv = RecipientCSV(file_data=contents,
template_type=template.template_type,
placeholders=template.placeholders)
return recipient_csv, template
return recipient_csv, template, meta_data.get("sender_id")
def process_row(row, template, job, service, sender_id=None):
@@ -601,11 +604,11 @@ def process_incomplete_job(job_id):
current_app.logger.info("Resuming job {} from row {}".format(job_id, resume_from_row))
recipient_csv, template = get_recipient_csv_and_template(job)
recipient_csv, template, sender_id = get_recipient_csv_and_template_and_sender_id(job)
for row in recipient_csv.get_rows():
if row.index > resume_from_row:
process_row(row, template, job, job.service)
process_row(row, template, job, job.service, sender_id=sender_id)
job_complete(job, resumed=True)