diff --git a/app/celery/scheduled_tasks.py b/app/celery/scheduled_tasks.py index 93ba9e8a5..22cc89708 100644 --- a/app/celery/scheduled_tasks.py +++ b/app/celery/scheduled_tasks.py @@ -3,7 +3,7 @@ from datetime import datetime, timedelta from flask import current_app from notifications_utils.statsd_decorators import statsd -from sqlalchemy import and_ +from sqlalchemy import between from sqlalchemy.exc import SQLAlchemyError from app import cbc_proxy_client, notify_celery, zendesk_client @@ -48,6 +48,7 @@ from app.models import ( EMAIL_TYPE, JOB_STATUS_ERROR, JOB_STATUS_IN_PROGRESS, + JOB_STATUS_PENDING, SMS_TYPE, Job, ) @@ -132,19 +133,31 @@ def check_job_status(): select from jobs where job_status == 'in progress' - and template_type in ('sms', 'email') - and scheduled_at or created_at is older that 30 minutes. + and processing started between 30 and 35 minutes ago + OR where the job_status == 'pending' + and the job scheduled_for timestamp is between 30 and 35 minutes ago. if any results then - raise error + update the job_status to 'error' process the rows in the csv that are missing (in another task) just do the check here. """ thirty_minutes_ago = datetime.utcnow() - timedelta(minutes=30) thirty_five_minutes_ago = datetime.utcnow() - timedelta(minutes=35) - jobs_not_complete_after_30_minutes = Job.query.filter( + incomplete_in_progress_jobs = Job.query.filter( Job.job_status == JOB_STATUS_IN_PROGRESS, - and_(thirty_five_minutes_ago < Job.processing_started, Job.processing_started < thirty_minutes_ago) - ).order_by(Job.processing_started).all() + between(Job.processing_started, thirty_five_minutes_ago, thirty_minutes_ago) + ) + incomplete_pending_jobs = Job.query.filter( + Job.job_status == JOB_STATUS_PENDING, + Job.scheduled_for.isnot(None), + between(Job.scheduled_for, thirty_five_minutes_ago, thirty_minutes_ago) + ) + + jobs_not_complete_after_30_minutes = incomplete_in_progress_jobs.union( + incomplete_pending_jobs + ).order_by( + Job.processing_started, Job.scheduled_for + ).all() # temporarily mark them as ERROR so that they don't get picked up by future check_job_status tasks # if they haven't been re-processed in time. diff --git a/tests/app/celery/test_scheduled_tasks.py b/tests/app/celery/test_scheduled_tasks.py index 573eacf5b..d1da59318 100644 --- a/tests/app/celery/test_scheduled_tasks.py +++ b/tests/app/celery/test_scheduled_tasks.py @@ -28,6 +28,7 @@ from app.models import ( JOB_STATUS_ERROR, JOB_STATUS_FINISHED, JOB_STATUS_IN_PROGRESS, + JOB_STATUS_PENDING, NOTIFICATION_DELIVERED, NOTIFICATION_PENDING_VIRUS_CHECK, ) @@ -170,6 +171,39 @@ def test_check_job_status_task_calls_process_incomplete_jobs_when_scheduled_job_ ) +def test_check_job_status_task_calls_process_incomplete_jobs_for_pending_scheduled_jobs( + mocker, sample_template +): + mock_celery = mocker.patch('app.celery.tasks.process_incomplete_jobs.apply_async') + job = create_job(template=sample_template, notification_count=3, + created_at=datetime.utcnow() - timedelta(hours=2), + scheduled_for=datetime.utcnow() - timedelta(minutes=31), + job_status=JOB_STATUS_PENDING) + + check_job_status() + + mock_celery.assert_called_once_with( + [[str(job.id)]], + queue=QueueNames.JOBS + ) + + +def test_check_job_status_task_does_not_call_process_incomplete_jobs_for_non_scheduled_pending_jobs( + mocker, + sample_template, +): + mock_celery = mocker.patch('app.celery.tasks.process_incomplete_jobs.apply_async') + create_job( + template=sample_template, + notification_count=3, + created_at=datetime.utcnow() - timedelta(hours=2), + job_status=JOB_STATUS_PENDING + ) + check_job_status() + + assert not mock_celery.called + + def test_check_job_status_task_calls_process_incomplete_jobs_for_multiple_jobs(mocker, sample_template): mock_celery = mocker.patch('app.celery.tasks.process_incomplete_jobs.apply_async') job = create_job(template=sample_template, notification_count=3, @@ -207,9 +241,16 @@ def test_check_job_status_task_only_sends_old_tasks(mocker, sample_template): processing_started=datetime.utcnow() - timedelta(minutes=29), job_status=JOB_STATUS_IN_PROGRESS ) + create_job( + template=sample_template, + notification_count=3, + created_at=datetime.utcnow() - timedelta(minutes=50), + scheduled_for=datetime.utcnow() - timedelta(minutes=29), + job_status=JOB_STATUS_PENDING + ) check_job_status() - # job 2 not in celery task + # jobs 2 and 3 were created less than 30 minutes ago, so are not sent to Celery task mock_celery.assert_called_once_with( [[str(job.id)]], queue=QueueNames.JOBS