From 64bb94af9e1a51fc14cc93736192d525ee4cb2f6 Mon Sep 17 00:00:00 2001 From: Leo Hemsted Date: Fri, 9 Mar 2018 16:34:47 +0000 Subject: [PATCH] set job status to error in the check_job_status scheduled task the process_incomplete_jobs task runs through all incomplete jobs in a loop, so it might not get a chance to update the processing_started time of the last job before check_job_status runs again (every minute). So before we even trigger the process_incomplete_jobs task, lets set the status of the jobs to error, so that we don't identify them for re-processing again. --- app/celery/scheduled_tasks.py | 13 ++++++++-- app/celery/tasks.py | 1 + tests/app/celery/test_scheduled_tasks.py | 33 ++++++++++++++++++++++++ tests/app/celery/test_tasks.py | 2 +- 4 files changed, 46 insertions(+), 3 deletions(-) diff --git a/app/celery/scheduled_tasks.py b/app/celery/scheduled_tasks.py index b5f904f2b..2e3e725d4 100644 --- a/app/celery/scheduled_tasks.py +++ b/app/celery/scheduled_tasks.py @@ -45,13 +45,15 @@ from app.dao.provider_details_dao import ( dao_toggle_sms_provider ) from app.dao.users_dao import delete_codes_older_created_more_than_a_day_ago +from app.dao.jobs_dao import dao_update_job from app.models import ( Job, Notification, NOTIFICATION_SENDING, LETTER_TYPE, JOB_STATUS_IN_PROGRESS, - JOB_STATUS_READY_TO_SEND + JOB_STATUS_READY_TO_SEND, + JOB_STATUS_ERROR ) from app.notifications.process_notifications import send_notification_to_queue from app.celery.tasks import ( @@ -442,7 +444,14 @@ def check_job_status(): and_(thirty_five_minutes_ago < Job.processing_started, Job.processing_started < thirty_minutes_ago) ).order_by(Job.processing_started).all() - job_ids = [str(x.id) for x in jobs_not_complete_after_30_minutes] + # temporarily mark them as ERROR so that they don't get picked up by future check_job_status tasks + # if they haven't been re-processed in time. + job_ids = [] + for job in jobs_not_complete_after_30_minutes: + job.job_status = JOB_STATUS_ERROR + dao_update_job(job) + job_ids.append(str(job.id)) + if job_ids: notify_celery.send_task( name=TaskNames.PROCESS_INCOMPLETE_JOBS, diff --git a/app/celery/tasks.py b/app/celery/tasks.py index 0763d02e2..ab9389685 100644 --- a/app/celery/tasks.py +++ b/app/celery/tasks.py @@ -560,6 +560,7 @@ def process_incomplete_job(job_id): job = dao_get_job_by_id(job_id) # reset the processing start time so that the check_job_status scheduled task doesn't pick this job up again + job.job_status = JOB_STATUS_PENDING job.processing_started = datetime.utcnow() dao_update_job(job) diff --git a/tests/app/celery/test_scheduled_tasks.py b/tests/app/celery/test_scheduled_tasks.py index 72a25aa19..40023c92d 100644 --- a/tests/app/celery/test_scheduled_tasks.py +++ b/tests/app/celery/test_scheduled_tasks.py @@ -53,6 +53,7 @@ from app.models import ( JOB_STATUS_READY_TO_SEND, JOB_STATUS_IN_PROGRESS, JOB_STATUS_SENT_TO_DVLA, + JOB_STATUS_ERROR, LETTER_TYPE, SMS_TYPE ) @@ -926,6 +927,38 @@ def test_check_job_status_task_only_sends_old_tasks(mocker, sample_template): ) +def test_check_job_status_task_sets_jobs_to_error(mocker, sample_template): + mock_celery = mocker.patch('app.celery.tasks.notify_celery.send_task') + job = create_job( + template=sample_template, + notification_count=3, + created_at=datetime.utcnow() - timedelta(hours=2), + scheduled_for=datetime.utcnow() - timedelta(minutes=31), + processing_started=datetime.utcnow() - timedelta(minutes=31), + job_status=JOB_STATUS_IN_PROGRESS + ) + job_2 = create_job( + template=sample_template, + notification_count=3, + created_at=datetime.utcnow() - timedelta(minutes=31), + processing_started=datetime.utcnow() - timedelta(minutes=29), + job_status=JOB_STATUS_IN_PROGRESS + ) + with pytest.raises(expected_exception=JobIncompleteError) as e: + check_job_status() + assert str(job.id) in e.value.message + assert str(job_2.id) not in e.value.message + + # job 2 not in celery task + mock_celery.assert_called_once_with( + name=TaskNames.PROCESS_INCOMPLETE_JOBS, + args=([str(job.id)],), + queue=QueueNames.JOBS + ) + assert job.job_status == JOB_STATUS_ERROR + assert job_2.job_status == JOB_STATUS_IN_PROGRESS + + def test_daily_stats_template_usage_by_month(notify_db, notify_db_session): notification_history = functools.partial( create_notification_history, diff --git a/tests/app/celery/test_tasks.py b/tests/app/celery/test_tasks.py index 3f48da1e6..9a2f5502c 100644 --- a/tests/app/celery/test_tasks.py +++ b/tests/app/celery/test_tasks.py @@ -1365,7 +1365,7 @@ def test_process_incomplete_job_sms(mocker, sample_template): @freeze_time('2017-01-01') def test_process_incomplete_job_resets_start_time(mocker, sample_template): mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_sms')) - save_sms = mocker.patch('app.celery.tasks.save_sms.apply_async') + mocker.patch('app.celery.tasks.save_sms.apply_async') job = create_job(template=sample_template, notification_count=10, created_at=datetime.utcnow() - timedelta(hours=2),