mirror of
https://github.com/GSA/notifications-api.git
synced 2026-01-31 23:26:23 -05:00
Merge pull request #1313 from alphagov/rc_process_missing_rows_in_csv
Process Incomplete Jobs, if a job have been discovered to be incomplete.
This commit is contained in:
@@ -5,7 +5,7 @@ from datetime import (
|
||||
|
||||
from celery.signals import worker_process_shutdown
|
||||
from flask import current_app
|
||||
from sqlalchemy import or_, and_
|
||||
from sqlalchemy import and_
|
||||
from sqlalchemy.exc import SQLAlchemyError
|
||||
from notifications_utils.s3 import s3upload
|
||||
|
||||
@@ -19,8 +19,8 @@ from app.dao.invited_user_dao import delete_invitations_created_more_than_two_da
|
||||
from app.dao.jobs_dao import (
|
||||
dao_get_letter_job_ids_by_status,
|
||||
dao_set_scheduled_jobs_to_pending,
|
||||
dao_get_jobs_older_than_limited_by,
|
||||
dao_get_job_by_id)
|
||||
dao_get_jobs_older_than_limited_by
|
||||
)
|
||||
from app.dao.monthly_billing_dao import (
|
||||
get_service_ids_that_need_billing_populated,
|
||||
create_or_update_monthly_billing
|
||||
@@ -39,11 +39,18 @@ from app.dao.provider_details_dao import (
|
||||
dao_toggle_sms_provider
|
||||
)
|
||||
from app.dao.users_dao import delete_codes_older_created_more_than_a_day_ago
|
||||
from app.models import LETTER_TYPE, JOB_STATUS_READY_TO_SEND, JOB_STATUS_SENT_TO_DVLA, JOB_STATUS_FINISHED, Job, \
|
||||
EMAIL_TYPE, SMS_TYPE, JOB_STATUS_IN_PROGRESS
|
||||
from app.models import (
|
||||
Job,
|
||||
LETTER_TYPE,
|
||||
JOB_STATUS_IN_PROGRESS,
|
||||
JOB_STATUS_READY_TO_SEND
|
||||
)
|
||||
from app.notifications.process_notifications import send_notification_to_queue
|
||||
from app.statsd_decorators import statsd
|
||||
from app.celery.tasks import process_job, create_dvla_file_contents_for_notifications
|
||||
from app.celery.tasks import (
|
||||
create_dvla_file_contents_for_notifications,
|
||||
process_job
|
||||
)
|
||||
from app.config import QueueNames, TaskNames
|
||||
from app.utils import convert_utc_to_bst
|
||||
from app.v2.errors import JobIncompleteError
|
||||
@@ -389,4 +396,9 @@ def check_job_status():
|
||||
|
||||
job_ids = [str(x.id) for x in jobs_not_complete_after_30_minutes]
|
||||
if job_ids:
|
||||
notify_celery.send_task(
|
||||
name=TaskNames.PROCESS_INCOMPLETE_JOBS,
|
||||
args=(job_ids,),
|
||||
queue=QueueNames.JOBS
|
||||
)
|
||||
raise JobIncompleteError("Job(s) {} have not completed.".format(job_ids))
|
||||
|
||||
@@ -2,7 +2,7 @@ import json
|
||||
from datetime import datetime
|
||||
from collections import namedtuple
|
||||
|
||||
from celery.signals import worker_process_init, worker_process_shutdown
|
||||
from celery.signals import worker_process_shutdown
|
||||
from flask import current_app
|
||||
from notifications_utils.recipients import (
|
||||
RecipientCSV
|
||||
@@ -37,25 +37,27 @@ from app.dao.jobs_dao import (
|
||||
from app.dao.notifications_dao import (
|
||||
get_notification_by_id,
|
||||
dao_update_notifications_for_job_to_sent_to_dvla,
|
||||
dao_update_notifications_by_reference
|
||||
)
|
||||
dao_update_notifications_by_reference,
|
||||
dao_get_last_notification_added_for_job_id)
|
||||
from app.dao.provider_details_dao import get_current_provider
|
||||
from app.dao.service_inbound_api_dao import get_service_inbound_api_for_service
|
||||
from app.dao.services_dao import dao_fetch_service_by_id, fetch_todays_total_message_count
|
||||
from app.dao.templates_dao import dao_get_template_by_id
|
||||
from app.models import (
|
||||
Job,
|
||||
Notification,
|
||||
EMAIL_TYPE,
|
||||
SMS_TYPE,
|
||||
LETTER_TYPE,
|
||||
KEY_TYPE_NORMAL,
|
||||
JOB_STATUS_CANCELLED,
|
||||
JOB_STATUS_PENDING,
|
||||
JOB_STATUS_IN_PROGRESS,
|
||||
JOB_STATUS_FINISHED,
|
||||
JOB_STATUS_IN_PROGRESS,
|
||||
JOB_STATUS_PENDING,
|
||||
JOB_STATUS_READY_TO_SEND,
|
||||
JOB_STATUS_SENT_TO_DVLA, JOB_STATUS_ERROR,
|
||||
KEY_TYPE_NORMAL,
|
||||
LETTER_TYPE,
|
||||
NOTIFICATION_SENDING,
|
||||
NOTIFICATION_TECHNICAL_FAILURE
|
||||
NOTIFICATION_TECHNICAL_FAILURE,
|
||||
SMS_TYPE,
|
||||
)
|
||||
from app.notifications.process_notifications import persist_notification
|
||||
from app.service.utils import service_allowed_to_send_to
|
||||
@@ -107,21 +109,31 @@ def process_job(job_id):
|
||||
).enumerated_recipients_and_personalisation:
|
||||
process_row(row_number, recipient, personalisation, template, job, service)
|
||||
|
||||
if template.template_type == LETTER_TYPE:
|
||||
job_complete(job, service, template.template_type, start=start)
|
||||
|
||||
|
||||
def job_complete(job, service, template_type, resumed=False, start=None):
|
||||
if template_type == LETTER_TYPE:
|
||||
if service.research_mode:
|
||||
update_job_to_sent_to_dvla.apply_async([str(job.id)], queue=QueueNames.RESEARCH_MODE)
|
||||
else:
|
||||
build_dvla_file.apply_async([str(job.id)], queue=QueueNames.JOBS)
|
||||
current_app.logger.info("send job {} to build-dvla-file in the {} queue".format(job_id, QueueNames.JOBS))
|
||||
current_app.logger.info("send job {} to build-dvla-file in the {} queue".format(job.id, QueueNames.JOBS))
|
||||
else:
|
||||
job.job_status = JOB_STATUS_FINISHED
|
||||
|
||||
finished = datetime.utcnow()
|
||||
job.processing_finished = finished
|
||||
dao_update_job(job)
|
||||
current_app.logger.info(
|
||||
"Job {} created at {} started at {} finished at {}".format(job_id, job.created_at, start, finished)
|
||||
)
|
||||
|
||||
if resumed:
|
||||
current_app.logger.info(
|
||||
"Resumed Job {} completed at {}".format(job.id, job.created_at, start, finished)
|
||||
)
|
||||
else:
|
||||
current_app.logger.info(
|
||||
"Job {} created at {} started at {} finished at {}".format(job.id, job.created_at, start, finished)
|
||||
)
|
||||
|
||||
|
||||
def process_row(row_number, recipient, personalisation, template, job, service):
|
||||
@@ -482,3 +494,40 @@ def send_inbound_sms_to_service(self, inbound_sms_id, service_id):
|
||||
service_id, inbound_api.url, e))
|
||||
except self.MaxRetriesExceededError:
|
||||
current_app.logger.exception('Retry: send_inbound_sms_to_service has retried the max number of times')
|
||||
|
||||
|
||||
@notify_celery.task(name='process-incomplete-jobs')
|
||||
@statsd(namespace="tasks")
|
||||
def process_incomplete_jobs(job_ids):
|
||||
current_app.logger.info("Resuming Job(s) {}".format(job_ids))
|
||||
for job_id in job_ids:
|
||||
process_incomplete_job(job_id)
|
||||
|
||||
|
||||
def process_incomplete_job(job_id):
|
||||
|
||||
job = dao_get_job_by_id(job_id)
|
||||
|
||||
last_notification_added = dao_get_last_notification_added_for_job_id(job_id)
|
||||
|
||||
if last_notification_added:
|
||||
resume_from_row = last_notification_added.job_row_number
|
||||
else:
|
||||
resume_from_row = -1 # The first row in the csv with a number is row 0
|
||||
|
||||
current_app.logger.info("Resuming job {} from row {}".format(job_id, resume_from_row))
|
||||
|
||||
db_template = dao_get_template_by_id(job.template_id, job.template_version)
|
||||
|
||||
TemplateClass = get_template_class(db_template.template_type)
|
||||
template = TemplateClass(db_template.__dict__)
|
||||
|
||||
for row_number, recipient, personalisation in RecipientCSV(
|
||||
s3.get_job_from_s3(str(job.service_id), str(job.id)),
|
||||
template_type=template.template_type,
|
||||
placeholders=template.placeholders
|
||||
).enumerated_recipients_and_personalisation:
|
||||
if row_number > resume_from_row:
|
||||
process_row(row_number, recipient, personalisation, template, job, job.service)
|
||||
|
||||
job_complete(job, job.service, template, resumed=True)
|
||||
|
||||
@@ -50,6 +50,7 @@ class QueueNames(object):
|
||||
class TaskNames(object):
|
||||
DVLA_JOBS = 'send-jobs-to-dvla'
|
||||
DVLA_NOTIFICATIONS = 'send-api-notifications-to-dvla'
|
||||
PROCESS_INCOMPLETE_JOBS = 'process-incomplete-jobs'
|
||||
|
||||
|
||||
class Config(object):
|
||||
|
||||
@@ -640,3 +640,14 @@ def dao_get_notification_email_reply_for_notification(notification_id):
|
||||
|
||||
if email_reply_to:
|
||||
return email_reply_to.email_address
|
||||
|
||||
|
||||
@statsd(namespace="dao")
|
||||
def dao_get_last_notification_added_for_job_id(job_id):
|
||||
last_notification_added = Notification.query.filter(
|
||||
Notification.job_id == job_id
|
||||
).order_by(
|
||||
Notification.job_row_number.desc()
|
||||
).first()
|
||||
|
||||
return last_notification_added
|
||||
|
||||
Reference in New Issue
Block a user