Process Incomplete Jobs Refactor

- Moved the process_incomplete_jobs to tasks.py
- Moved the process_incomplete_jobs test to test_tasks.py
- Cleaned up imports and other code style issues.

As the new tasks is not a scheduled one, moved the the tasks.py file.
This makes it more consisted with other tasks. Updated a few code style
issues to make it more consistent with other coe and hence more
maintainable in future.
This commit is contained in:
Richard Chapman
2017-10-16 12:32:44 +01:00
parent 9e6deddbd4
commit 0ff80bcb76
4 changed files with 229 additions and 227 deletions

View File

@@ -5,15 +5,12 @@ from datetime import (
from celery.signals import worker_process_shutdown
from flask import current_app
from notifications_utils.recipients import RecipientCSV
from sqlalchemy import or_, and_
from sqlalchemy import and_
from sqlalchemy.exc import SQLAlchemyError
from notifications_utils.s3 import s3upload
from app.aws import s3
from app import notify_celery
from app.celery import celery
from app.dao.templates_dao import dao_get_template_by_id
from app.performance_platform import total_sent_notifications, processing_time
from app import performance_platform_client
from app.dao.date_util import get_month_start_and_end_date_in_utc
@@ -22,8 +19,8 @@ from app.dao.invited_user_dao import delete_invitations_created_more_than_two_da
from app.dao.jobs_dao import (
dao_get_letter_job_ids_by_status,
dao_set_scheduled_jobs_to_pending,
dao_get_jobs_older_than_limited_by,
dao_get_job_by_id)
dao_get_jobs_older_than_limited_by
)
from app.dao.monthly_billing_dao import (
get_service_ids_that_need_billing_populated,
create_or_update_monthly_billing
@@ -44,7 +41,6 @@ from app.dao.provider_details_dao import (
from app.dao.users_dao import delete_codes_older_created_more_than_a_day_ago
from app.models import (
Job,
Notification,
LETTER_TYPE,
JOB_STATUS_READY_TO_SEND,
JOB_STATUS_IN_PROGRESS
@@ -53,10 +49,7 @@ from app.notifications.process_notifications import send_notification_to_queue
from app.statsd_decorators import statsd
from app.celery.tasks import (
create_dvla_file_contents_for_notifications,
get_template_class,
process_job,
process_row,
job_complete
process_job
)
from app.config import QueueNames, TaskNames
from app.utils import convert_utc_to_bst
@@ -409,44 +402,3 @@ def check_job_status():
queue=QueueNames.JOBS
)
raise JobIncompleteError("Job(s) {} have not completed.".format(job_ids))
@notify_celery.task(name='process-incomplete-jobs')
@statsd(namespace="tasks")
def process_incomplete_jobs(job_ids):
current_app.logger.info("Resuming Job(s) {}".format(job_ids))
for job_id in job_ids:
process_incomplete_job(job_id)
def process_incomplete_job(job_id):
job = Job.query.filter(Job.id == job_id).one()
last_notification_added = Notification.query.filter(
Notification.job_id == job_id
).order_by(
Notification.job_row_number.desc()
).first()
if last_notification_added:
resume_from_row = last_notification_added.job_row_number
else:
resume_from_row = -1 # The first row in the csv with a number is row 0
current_app.logger.info("Resuming job {} from row {}".format(job_id, resume_from_row))
db_template = dao_get_template_by_id(job.template_id, job.template_version)
TemplateClass = get_template_class(db_template.template_type)
template = TemplateClass(db_template.__dict__)
for row_number, recipient, personalisation in RecipientCSV(
s3.get_job_from_s3(str(job.service_id), str(job.id)),
template_type=template.template_type,
placeholders=template.placeholders
).enumerated_recipients_and_personalisation:
if row_number > resume_from_row:
process_row(row_number, recipient, personalisation, template, job, job.service)
job_complete(job, job.service, template, True)

View File

@@ -2,7 +2,7 @@ import json
from datetime import datetime
from collections import namedtuple
from celery.signals import worker_process_init, worker_process_shutdown
from celery.signals import worker_process_shutdown
from flask import current_app
from notifications_utils.recipients import (
RecipientCSV
@@ -44,9 +44,9 @@ from app.dao.service_inbound_api_dao import get_service_inbound_api_for_service
from app.dao.services_dao import dao_fetch_service_by_id, fetch_todays_total_message_count
from app.dao.templates_dao import dao_get_template_by_id
from app.models import (
Job,
Notification,
EMAIL_TYPE,
SMS_TYPE,
LETTER_TYPE,
KEY_TYPE_NORMAL,
JOB_STATUS_CANCELLED,
JOB_STATUS_PENDING,
@@ -54,8 +54,10 @@ from app.models import (
JOB_STATUS_FINISHED,
JOB_STATUS_READY_TO_SEND,
JOB_STATUS_SENT_TO_DVLA, JOB_STATUS_ERROR,
LETTER_TYPE,
NOTIFICATION_SENDING,
NOTIFICATION_TECHNICAL_FAILURE
NOTIFICATION_TECHNICAL_FAILURE,
SMS_TYPE,
)
from app.notifications.process_notifications import persist_notification
from app.service.utils import service_allowed_to_send_to
@@ -492,3 +494,44 @@ def send_inbound_sms_to_service(self, inbound_sms_id, service_id):
service_id, inbound_api.url, e))
except self.MaxRetriesExceededError:
current_app.logger.exception('Retry: send_inbound_sms_to_service has retried the max number of times')
@notify_celery.task(name='process-incomplete-jobs')
@statsd(namespace="tasks")
def process_incomplete_jobs(job_ids):
current_app.logger.info("Resuming Job(s) {}".format(job_ids))
for job_id in job_ids:
process_incomplete_job(job_id)
def process_incomplete_job(job_id):
job = Job.query.filter(Job.id == job_id).one()
last_notification_added = Notification.query.filter(
Notification.job_id == job_id
).order_by(
Notification.job_row_number.desc()
).first()
if last_notification_added:
resume_from_row = last_notification_added.job_row_number
else:
resume_from_row = -1 # The first row in the csv with a number is row 0
current_app.logger.info("Resuming job {} from row {}".format(job_id, resume_from_row))
db_template = dao_get_template_by_id(job.template_id, job.template_version)
TemplateClass = get_template_class(db_template.template_type)
template = TemplateClass(db_template.__dict__)
for row_number, recipient, personalisation in RecipientCSV(
s3.get_job_from_s3(str(job.service_id), str(job.id)),
template_type=template.template_type,
placeholders=template.placeholders
).enumerated_recipients_and_personalisation:
if row_number > resume_from_row:
process_row(row_number, recipient, personalisation, template, job, job.service)
job_complete(job, job.service, template, True)