Check for incomplete pending jobs

We have a scheduled task that was checking for jobs still in progress.
We saw a case where a scheduled job was stuck in a `pending` status as a
result of an app shutting down. This changes the `check_job_status` task
so that it also checks for scheduled jobs which are still pending after
30 minutes.
This commit is contained in:
Katie Smith
2021-03-17 14:53:34 +00:00
parent c1b56aa752
commit 3b78f863d5
2 changed files with 62 additions and 8 deletions

View File

@@ -3,7 +3,7 @@ from datetime import datetime, timedelta
from flask import current_app from flask import current_app
from notifications_utils.statsd_decorators import statsd from notifications_utils.statsd_decorators import statsd
from sqlalchemy import and_ from sqlalchemy import between
from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.exc import SQLAlchemyError
from app import cbc_proxy_client, notify_celery, zendesk_client from app import cbc_proxy_client, notify_celery, zendesk_client
@@ -48,6 +48,7 @@ from app.models import (
EMAIL_TYPE, EMAIL_TYPE,
JOB_STATUS_ERROR, JOB_STATUS_ERROR,
JOB_STATUS_IN_PROGRESS, JOB_STATUS_IN_PROGRESS,
JOB_STATUS_PENDING,
SMS_TYPE, SMS_TYPE,
Job, Job,
) )
@@ -132,19 +133,31 @@ def check_job_status():
select select
from jobs from jobs
where job_status == 'in progress' where job_status == 'in progress'
and template_type in ('sms', 'email') and processing started between 30 and 35 minutes ago
and scheduled_at or created_at is older that 30 minutes. OR where the job_status == 'pending'
and the job scheduled_for timestamp is between 30 and 35 minutes ago.
if any results then if any results then
raise error update the job_status to 'error'
process the rows in the csv that are missing (in another task) just do the check here. process the rows in the csv that are missing (in another task) just do the check here.
""" """
thirty_minutes_ago = datetime.utcnow() - timedelta(minutes=30) thirty_minutes_ago = datetime.utcnow() - timedelta(minutes=30)
thirty_five_minutes_ago = datetime.utcnow() - timedelta(minutes=35) thirty_five_minutes_ago = datetime.utcnow() - timedelta(minutes=35)
jobs_not_complete_after_30_minutes = Job.query.filter( incomplete_in_progress_jobs = Job.query.filter(
Job.job_status == JOB_STATUS_IN_PROGRESS, Job.job_status == JOB_STATUS_IN_PROGRESS,
and_(thirty_five_minutes_ago < Job.processing_started, Job.processing_started < thirty_minutes_ago) between(Job.processing_started, thirty_five_minutes_ago, thirty_minutes_ago)
).order_by(Job.processing_started).all() )
incomplete_pending_jobs = Job.query.filter(
Job.job_status == JOB_STATUS_PENDING,
Job.scheduled_for.isnot(None),
between(Job.scheduled_for, thirty_five_minutes_ago, thirty_minutes_ago)
)
jobs_not_complete_after_30_minutes = incomplete_in_progress_jobs.union(
incomplete_pending_jobs
).order_by(
Job.processing_started, Job.scheduled_for
).all()
# temporarily mark them as ERROR so that they don't get picked up by future check_job_status tasks # temporarily mark them as ERROR so that they don't get picked up by future check_job_status tasks
# if they haven't been re-processed in time. # if they haven't been re-processed in time.

View File

@@ -28,6 +28,7 @@ from app.models import (
JOB_STATUS_ERROR, JOB_STATUS_ERROR,
JOB_STATUS_FINISHED, JOB_STATUS_FINISHED,
JOB_STATUS_IN_PROGRESS, JOB_STATUS_IN_PROGRESS,
JOB_STATUS_PENDING,
NOTIFICATION_DELIVERED, NOTIFICATION_DELIVERED,
NOTIFICATION_PENDING_VIRUS_CHECK, NOTIFICATION_PENDING_VIRUS_CHECK,
) )
@@ -170,6 +171,39 @@ def test_check_job_status_task_calls_process_incomplete_jobs_when_scheduled_job_
) )
def test_check_job_status_task_calls_process_incomplete_jobs_for_pending_scheduled_jobs(
mocker, sample_template
):
mock_celery = mocker.patch('app.celery.tasks.process_incomplete_jobs.apply_async')
job = create_job(template=sample_template, notification_count=3,
created_at=datetime.utcnow() - timedelta(hours=2),
scheduled_for=datetime.utcnow() - timedelta(minutes=31),
job_status=JOB_STATUS_PENDING)
check_job_status()
mock_celery.assert_called_once_with(
[[str(job.id)]],
queue=QueueNames.JOBS
)
def test_check_job_status_task_does_not_call_process_incomplete_jobs_for_non_scheduled_pending_jobs(
mocker,
sample_template,
):
mock_celery = mocker.patch('app.celery.tasks.process_incomplete_jobs.apply_async')
create_job(
template=sample_template,
notification_count=3,
created_at=datetime.utcnow() - timedelta(hours=2),
job_status=JOB_STATUS_PENDING
)
check_job_status()
assert not mock_celery.called
def test_check_job_status_task_calls_process_incomplete_jobs_for_multiple_jobs(mocker, sample_template): def test_check_job_status_task_calls_process_incomplete_jobs_for_multiple_jobs(mocker, sample_template):
mock_celery = mocker.patch('app.celery.tasks.process_incomplete_jobs.apply_async') mock_celery = mocker.patch('app.celery.tasks.process_incomplete_jobs.apply_async')
job = create_job(template=sample_template, notification_count=3, job = create_job(template=sample_template, notification_count=3,
@@ -207,9 +241,16 @@ def test_check_job_status_task_only_sends_old_tasks(mocker, sample_template):
processing_started=datetime.utcnow() - timedelta(minutes=29), processing_started=datetime.utcnow() - timedelta(minutes=29),
job_status=JOB_STATUS_IN_PROGRESS job_status=JOB_STATUS_IN_PROGRESS
) )
create_job(
template=sample_template,
notification_count=3,
created_at=datetime.utcnow() - timedelta(minutes=50),
scheduled_for=datetime.utcnow() - timedelta(minutes=29),
job_status=JOB_STATUS_PENDING
)
check_job_status() check_job_status()
# job 2 not in celery task # jobs 2 and 3 were created less than 30 minutes ago, so are not sent to Celery task
mock_celery.assert_called_once_with( mock_celery.assert_called_once_with(
[[str(job.id)]], [[str(job.id)]],
queue=QueueNames.JOBS queue=QueueNames.JOBS