From ea2b0dfbc9c713433bc74d9a275fa500d7e1fed0 Mon Sep 17 00:00:00 2001 From: Leo Hemsted Date: Fri, 9 Mar 2018 17:16:48 +0000 Subject: [PATCH] set processing_started in before earlier jobs are processed process_incomplete_jobs loops through jobs processing them in a single task. This means that if the job statuses are all 'error', and then the process_incomplete_jobs task fails, the later jobs in the list that never got picked up won't have their status set back to in progress, or their processing_started time - so will be stuck in 'error' forever. Instead, we set the job statuses to in progress and the start time to now before we process any - so if the incomplete_jobs task fails later, the jobs will be picked up (again) by the check_job_statuses task in half an hour's time --- app/celery/tasks.py | 13 ++++--- tests/app/celery/test_tasks.py | 62 +++++++++++++++++++--------------- 2 files changed, 43 insertions(+), 32 deletions(-) diff --git a/app/celery/tasks.py b/app/celery/tasks.py index ab9389685..d7f3d7a15 100644 --- a/app/celery/tasks.py +++ b/app/celery/tasks.py @@ -550,6 +550,14 @@ def send_inbound_sms_to_service(self, inbound_sms_id, service_id): @notify_celery.task(name='process-incomplete-jobs') @statsd(namespace="tasks") def process_incomplete_jobs(job_ids): + jobs = [dao_get_job_by_id(job_id) for job_id in job_ids] + + # reset the processing start time so that the check_job_status scheduled task doesn't pick this job up again + for job in jobs: + job.job_status = JOB_STATUS_IN_PROGRESS + job.processing_started = datetime.utcnow() + dao_update_job(job) + current_app.logger.info("Resuming Job(s) {}".format(job_ids)) for job_id in job_ids: process_incomplete_job(job_id) @@ -559,11 +567,6 @@ def process_incomplete_job(job_id): job = dao_get_job_by_id(job_id) - # reset the processing start time so that the check_job_status scheduled task doesn't pick this job up again - job.job_status = JOB_STATUS_PENDING - job.processing_started = datetime.utcnow() - dao_update_job(job) - last_notification_added = dao_get_last_notification_added_for_job_id(job_id) if last_notification_added: diff --git a/tests/app/celery/test_tasks.py b/tests/app/celery/test_tasks.py index 9a2f5502c..873285e00 100644 --- a/tests/app/celery/test_tasks.py +++ b/tests/app/celery/test_tasks.py @@ -1,7 +1,7 @@ import json import uuid from datetime import datetime, timedelta -from unittest.mock import Mock +from unittest.mock import Mock, call import pytest import requests_mock @@ -38,6 +38,7 @@ from app.models import ( KEY_TYPE_TEAM, KEY_TYPE_TEST, JOB_STATUS_FINISHED, + JOB_STATUS_ERROR, JOB_STATUS_IN_PROGRESS, LETTER_TYPE, SMS_TYPE @@ -1346,7 +1347,7 @@ def test_process_incomplete_job_sms(mocker, sample_template): created_at=datetime.utcnow() - timedelta(hours=2), scheduled_for=datetime.utcnow() - timedelta(minutes=31), processing_started=datetime.utcnow() - timedelta(minutes=31), - job_status=JOB_STATUS_IN_PROGRESS) + job_status=JOB_STATUS_ERROR) create_notification(sample_template, job, 0) create_notification(sample_template, job, 1) @@ -1362,25 +1363,6 @@ def test_process_incomplete_job_sms(mocker, sample_template): assert save_sms.call_count == 8 # There are 10 in the file and we've added two already -@freeze_time('2017-01-01') -def test_process_incomplete_job_resets_start_time(mocker, sample_template): - mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_sms')) - mocker.patch('app.celery.tasks.save_sms.apply_async') - - job = create_job(template=sample_template, notification_count=10, - created_at=datetime.utcnow() - timedelta(hours=2), - scheduled_for=datetime.utcnow() - timedelta(minutes=31), - processing_started=datetime.utcnow() - timedelta(minutes=31), - job_status=JOB_STATUS_IN_PROGRESS) - - process_incomplete_job(str(job.id)) - - completed_job = Job.query.get(job.id) - - assert completed_job.processing_started == datetime.utcnow() - assert completed_job.job_status == JOB_STATUS_FINISHED - - def test_process_incomplete_job_with_notifications_all_sent(mocker, sample_template): mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_sms')) @@ -1390,7 +1372,7 @@ def test_process_incomplete_job_with_notifications_all_sent(mocker, sample_templ created_at=datetime.utcnow() - timedelta(hours=2), scheduled_for=datetime.utcnow() - timedelta(minutes=31), processing_started=datetime.utcnow() - timedelta(minutes=31), - job_status=JOB_STATUS_IN_PROGRESS) + job_status=JOB_STATUS_ERROR) create_notification(sample_template, job, 0) create_notification(sample_template, job, 1) @@ -1423,7 +1405,7 @@ def test_process_incomplete_jobs_sms(mocker, sample_template): created_at=datetime.utcnow() - timedelta(hours=2), scheduled_for=datetime.utcnow() - timedelta(minutes=31), processing_started=datetime.utcnow() - timedelta(minutes=31), - job_status=JOB_STATUS_IN_PROGRESS) + job_status=JOB_STATUS_ERROR) create_notification(sample_template, job, 0) create_notification(sample_template, job, 1) create_notification(sample_template, job, 2) @@ -1434,7 +1416,7 @@ def test_process_incomplete_jobs_sms(mocker, sample_template): created_at=datetime.utcnow() - timedelta(hours=2), scheduled_for=datetime.utcnow() - timedelta(minutes=31), processing_started=datetime.utcnow() - timedelta(minutes=31), - job_status=JOB_STATUS_IN_PROGRESS) + job_status=JOB_STATUS_ERROR) create_notification(sample_template, job2, 0) create_notification(sample_template, job2, 1) @@ -1465,7 +1447,7 @@ def test_process_incomplete_jobs_no_notifications_added(mocker, sample_template) created_at=datetime.utcnow() - timedelta(hours=2), scheduled_for=datetime.utcnow() - timedelta(minutes=31), processing_started=datetime.utcnow() - timedelta(minutes=31), - job_status=JOB_STATUS_IN_PROGRESS) + job_status=JOB_STATUS_ERROR) assert Notification.query.filter(Notification.job_id == job.id).count() == 0 @@ -1509,7 +1491,7 @@ def test_process_incomplete_job_email(mocker, sample_email_template): created_at=datetime.utcnow() - timedelta(hours=2), scheduled_for=datetime.utcnow() - timedelta(minutes=31), processing_started=datetime.utcnow() - timedelta(minutes=31), - job_status=JOB_STATUS_IN_PROGRESS) + job_status=JOB_STATUS_ERROR) create_notification(sample_email_template, job, 0) create_notification(sample_email_template, job, 1) @@ -1533,7 +1515,7 @@ def test_process_incomplete_job_letter(mocker, sample_letter_template): created_at=datetime.utcnow() - timedelta(hours=2), scheduled_for=datetime.utcnow() - timedelta(minutes=31), processing_started=datetime.utcnow() - timedelta(minutes=31), - job_status=JOB_STATUS_IN_PROGRESS) + job_status=JOB_STATUS_ERROR) create_notification(sample_letter_template, job, 0) create_notification(sample_letter_template, job, 1) @@ -1543,3 +1525,29 @@ def test_process_incomplete_job_letter(mocker, sample_letter_template): process_incomplete_job(str(job.id)) assert mock_letter_saver.call_count == 8 + + +@freeze_time('2017-01-01') +def test_process_incomplete_jobs_sets_status_to_in_progress_and_resets_processing_started_time(mocker, sample_template): + mock_process_incomplete_job = mocker.patch('app.celery.tasks.process_incomplete_job') + + job1 = create_job( + sample_template, + processing_started=datetime.utcnow() - timedelta(minutes=30), + job_status=JOB_STATUS_ERROR + ) + job2 = create_job( + sample_template, + processing_started=datetime.utcnow() - timedelta(minutes=31), + job_status=JOB_STATUS_ERROR + ) + + process_incomplete_jobs([str(job1.id), str(job2.id)]) + + assert job1.job_status == JOB_STATUS_IN_PROGRESS + assert job1.processing_started == datetime.utcnow() + + assert job2.job_status == JOB_STATUS_IN_PROGRESS + assert job2.processing_started == datetime.utcnow() + + assert mock_process_incomplete_job.mock_calls == [call(str(job1.id)), call(str(job2.id))]