Adding a scheduled task to processing missing rows from job

Sometimes a job finishes but has missed a row in the middle. It is a mystery why this is happening, it could be that the task to save the notifications has been dropped.
So until we solve the missing let's find missing rows and process them.

A new scheduled task has been added to find any "finished" jobs that do not have enough notifications created. If there are missing notifications the job processes those rows for the job.
Adding the new task to beat schedule will be done in the next commit.

A unique key constraint has been added to Notifications to ensure that the row is not added twice. Any index or constraint can affect performance, but this unique constraint should not affect it enough for us to notice.
This commit is contained in:
Rebecca Law
2019-11-05 16:47:00 +00:00
parent 975af113e4
commit db5a50c5a7
7 changed files with 160 additions and 56 deletions

View File

@@ -9,11 +9,19 @@ from sqlalchemy import and_
from sqlalchemy.exc import SQLAlchemyError
from app import notify_celery, zendesk_client
from app.celery.tasks import process_job
from app.celery.tasks import (
process_job,
get_recipient_csv_and_template,
process_row
)
from app.config import QueueNames, TaskNames
from app.dao.invited_org_user_dao import delete_org_invitations_created_more_than_two_days_ago
from app.dao.invited_user_dao import delete_invitations_created_more_than_two_days_ago
from app.dao.jobs_dao import dao_set_scheduled_jobs_to_pending, find_jobs_with_missing_rows, find_missing_row_for_job
from app.dao.jobs_dao import (
dao_set_scheduled_jobs_to_pending,
find_jobs_with_missing_rows,
find_missing_row_for_job
)
from app.dao.jobs_dao import dao_update_job
from app.dao.notifications_dao import (
is_delivery_slow_for_provider,
@@ -223,26 +231,19 @@ def check_templated_letter_state():
ticket_type=zendesk_client.TYPE_INCIDENT
)
@notify_celery.task(name='find-missing-rows-from-completed-jobs')
def find_missing_rows_from_completed_jobs():
@notify_celery.task(name='check-for-missing-rows-in-completed-jobs')
def check_for_missing_rows_in_completed_jobs():
jobs_and_job_size = find_jobs_with_missing_rows()
for x in jobs_and_job_size:
missing_rows = find_missing_row_for_job(jobs_and_job_size.job_id, jobs_and_job_size.notification_count)
# job = dao_get_job_by_id(job_id)
# db_template = dao_get_template_by_id(job.template_id, job.template_version)
#
# TemplateClass = get_template_class(db_template.template_type)
# template = TemplateClass(db_template.__dict__)
#
# for row in RecipientCSV(
# s3.get_job_from_s3(str(job.service_id), str(job.id)),
# template_type=template.template_type,
# placeholders=template.placeholders
# ).get_rows():
# if row.index == job_row_number:
# notification_id = process_row(row, template, job, job.service)
# current_app.logger.info("Process row {} for job {} created notification_id: {}".format(
# job_row_number, job_id, notification_id))
pass
job = x[1]
missing_rows = find_missing_row_for_job(job.id, job.notification_count)
for row_to_process in missing_rows:
# The sender_id is passed in with job, at this point we no longer have the sender that is passed in.
# The notification will be created with the default sender.
# There is a bug to fix this https://www.pivotaltracker.com/story/show/169569144
recipient_csv, template = get_recipient_csv_and_template(job)
for row in recipient_csv.get_rows():
if row.index == row_to_process.missing_row:
current_app.logger.info("Processing row: {} for job: {}")
process_row(row, template, job, job.service)

View File

@@ -97,18 +97,11 @@ def process_job(job_id, sender_id=None):
job.processing_started = start
dao_update_job(job)
db_template = dao_get_template_by_id(job.template_id, job.template_version)
TemplateClass = get_template_class(db_template.template_type)
template = TemplateClass(db_template.__dict__)
recipient_csv, template = get_recipient_csv_and_template(job)
current_app.logger.info("Starting job {} processing {} notifications".format(job_id, job.notification_count))
for row in RecipientCSV(
s3.get_job_from_s3(str(service.id), str(job_id)),
template_type=template.template_type,
placeholders=template.placeholders
).get_rows():
for row in recipient_csv.get_rows():
process_row(row, template, job, service, sender_id=sender_id)
job_complete(job, start=start)
@@ -131,6 +124,18 @@ def job_complete(job, resumed=False, start=None):
)
def get_recipient_csv_and_template(job):
db_template = dao_get_template_by_id(job.template_id, job.template_version)
TemplateClass = get_template_class(db_template.template_type)
template = TemplateClass(db_template.__dict__)
recipient_csv = RecipientCSV(file_data=s3.get_job_from_s3(str(job.service_id), str(job.id)),
template_type=template.template_type,
placeholders=template.placeholders)
return recipient_csv, template
def process_row(row, template, job, service, sender_id=None):
template_type = template.template_type
encrypted = encryption.encrypt({
@@ -596,16 +601,9 @@ def process_incomplete_job(job_id):
current_app.logger.info("Resuming job {} from row {}".format(job_id, resume_from_row))
db_template = dao_get_template_by_id(job.template_id, job.template_version)
recipient_csv, template = get_recipient_csv_and_template(job)
TemplateClass = get_template_class(db_template.template_type)
template = TemplateClass(db_template.__dict__)
for row in RecipientCSV(
s3.get_job_from_s3(str(job.service_id), str(job.id)),
template_type=template.template_type,
placeholders=template.placeholders
).get_rows():
for row in recipient_csv.get_rows():
if row.index > resume_from_row:
process_row(row, template, job, job.service)