Process Incomplete Jobs

- Added a new task to process incomplete jobs
- Added tests to test the new method
- Updated the check for incomplete jobs method to start the new task

This will effectively resume tasks which for some reason were interrupted
whilst they were being processed. In some cases only some of the csv
was processed, this will find the place in the csv and continue processing
from that point.
This commit is contained in:
Richard Chapman
2017-10-13 16:46:17 +01:00
parent 83585a4fdb
commit 0509669969
4 changed files with 251 additions and 12 deletions

View File

@@ -5,12 +5,15 @@ from datetime import (
from celery.signals import worker_process_shutdown
from flask import current_app
from notifications_utils.recipients import RecipientCSV
from sqlalchemy import or_, and_
from sqlalchemy.exc import SQLAlchemyError
from notifications_utils.s3 import s3upload
from app.aws import s3
from app import notify_celery
from app.celery import celery
from app.dao.templates_dao import dao_get_template_by_id
from app.performance_platform import total_sent_notifications, processing_time
from app import performance_platform_client
from app.dao.date_util import get_month_start_and_end_date_in_utc
@@ -39,11 +42,22 @@ from app.dao.provider_details_dao import (
dao_toggle_sms_provider
)
from app.dao.users_dao import delete_codes_older_created_more_than_a_day_ago
from app.models import LETTER_TYPE, JOB_STATUS_READY_TO_SEND, JOB_STATUS_SENT_TO_DVLA, JOB_STATUS_FINISHED, Job, \
EMAIL_TYPE, SMS_TYPE, JOB_STATUS_IN_PROGRESS
from app.models import (
Job,
Notification,
LETTER_TYPE,
JOB_STATUS_READY_TO_SEND,
JOB_STATUS_IN_PROGRESS
)
from app.notifications.process_notifications import send_notification_to_queue
from app.statsd_decorators import statsd
from app.celery.tasks import process_job, create_dvla_file_contents_for_notifications
from app.celery.tasks import (
create_dvla_file_contents_for_notifications,
get_template_class,
process_job,
process_row,
job_complete
)
from app.config import QueueNames, TaskNames
from app.utils import convert_utc_to_bst
from app.v2.errors import JobIncompleteError
@@ -389,4 +403,50 @@ def check_job_status():
job_ids = [str(x.id) for x in jobs_not_complete_after_30_minutes]
if job_ids:
notify_celery.send_task(
name=TaskNames.PROCESS_INCOMPLETE_JOBS,
args=(job_ids,),
queue=QueueNames.JOBS
)
raise JobIncompleteError("Job(s) {} have not completed.".format(job_ids))
@notify_celery.task(name='process-incomplete-jobs')
@statsd(namespace="tasks")
def process_incomplete_jobs(job_ids):
current_app.logger.info("Resuming Job(s) {}".format(job_ids))
for job_id in job_ids:
process_incomplete_job(job_id)
def process_incomplete_job(job_id):
job = Job.query.filter(Job.id == job_id).one()
last_notification_added = Notification.query.filter(
Notification.job_id == job_id
).order_by(
Notification.job_row_number.desc()
).first()
if last_notification_added:
resume_from_row = last_notification_added.job_row_number
else:
resume_from_row = -1 # The first row in the csv with a number is row 0
current_app.logger.info("Resuming job {} from row {}".format(job_id, resume_from_row))
db_template = dao_get_template_by_id(job.template_id, job.template_version)
TemplateClass = get_template_class(db_template.template_type)
template = TemplateClass(db_template.__dict__)
for row_number, recipient, personalisation in RecipientCSV(
s3.get_job_from_s3(str(job.service_id), str(job.id)),
template_type=template.template_type,
placeholders=template.placeholders
).enumerated_recipients_and_personalisation:
if row_number > resume_from_row:
process_row(row_number, recipient, personalisation, template, job, job.service)
job_complete(job, job.service, template, True)

View File

@@ -107,21 +107,31 @@ def process_job(job_id):
).enumerated_recipients_and_personalisation:
process_row(row_number, recipient, personalisation, template, job, service)
job_complete(job, service, template, False, start)
def job_complete(job, service, template, resumed, start=None):
if template.template_type == LETTER_TYPE:
if service.research_mode:
update_job_to_sent_to_dvla.apply_async([str(job.id)], queue=QueueNames.RESEARCH_MODE)
else:
build_dvla_file.apply_async([str(job.id)], queue=QueueNames.JOBS)
current_app.logger.info("send job {} to build-dvla-file in the {} queue".format(job_id, QueueNames.JOBS))
current_app.logger.info("send job {} to build-dvla-file in the {} queue".format(job.id, QueueNames.JOBS))
else:
job.job_status = JOB_STATUS_FINISHED
finished = datetime.utcnow()
job.processing_finished = finished
dao_update_job(job)
current_app.logger.info(
"Job {} created at {} started at {} finished at {}".format(job_id, job.created_at, start, finished)
)
if resumed:
current_app.logger.info(
"Resumed Job {} completed at {}".format(job.id, job.created_at, start, finished)
)
else:
current_app.logger.info(
"Job {} created at {} started at {} finished at {}".format(job.id, job.created_at, start, finished)
)
def process_row(row_number, recipient, personalisation, template, job, service):

View File

@@ -50,6 +50,7 @@ class QueueNames(object):
class TaskNames(object):
DVLA_JOBS = 'send-jobs-to-dvla'
DVLA_NOTIFICATIONS = 'send-api-notifications-to-dvla'
PROCESS_INCOMPLETE_JOBS = 'process-incomplete-jobs'
class Config(object):