diff --git a/app/celery/tasks.py b/app/celery/tasks.py index 942a0ec5f..0763d02e2 100644 --- a/app/celery/tasks.py +++ b/app/celery/tasks.py @@ -559,6 +559,10 @@ def process_incomplete_job(job_id): job = dao_get_job_by_id(job_id) + # reset the processing start time so that the check_job_status scheduled task doesn't pick this job up again + job.processing_started = datetime.utcnow() + dao_update_job(job) + last_notification_added = dao_get_last_notification_added_for_job_id(job_id) if last_notification_added: diff --git a/tests/app/celery/test_scheduled_tasks.py b/tests/app/celery/test_scheduled_tasks.py index 8e3a3a33e..72a25aa19 100644 --- a/tests/app/celery/test_scheduled_tasks.py +++ b/tests/app/celery/test_scheduled_tasks.py @@ -896,6 +896,36 @@ def test_check_job_status_task_raises_job_incomplete_error_for_multiple_jobs(moc ) +def test_check_job_status_task_only_sends_old_tasks(mocker, sample_template): + mock_celery = mocker.patch('app.celery.tasks.notify_celery.send_task') + job = create_job( + template=sample_template, + notification_count=3, + created_at=datetime.utcnow() - timedelta(hours=2), + scheduled_for=datetime.utcnow() - timedelta(minutes=31), + processing_started=datetime.utcnow() - timedelta(minutes=31), + job_status=JOB_STATUS_IN_PROGRESS + ) + job_2 = create_job( + template=sample_template, + notification_count=3, + created_at=datetime.utcnow() - timedelta(minutes=31), + processing_started=datetime.utcnow() - timedelta(minutes=29), + job_status=JOB_STATUS_IN_PROGRESS + ) + with pytest.raises(expected_exception=JobIncompleteError) as e: + check_job_status() + assert str(job.id) in e.value.message + assert str(job_2.id) not in e.value.message + + # job 2 not in celery task + mock_celery.assert_called_once_with( + name=TaskNames.PROCESS_INCOMPLETE_JOBS, + args=([str(job.id)],), + queue=QueueNames.JOBS + ) + + def test_daily_stats_template_usage_by_month(notify_db, notify_db_session): notification_history = functools.partial( create_notification_history, diff --git a/tests/app/celery/test_tasks.py b/tests/app/celery/test_tasks.py index e996ed62b..3f48da1e6 100644 --- a/tests/app/celery/test_tasks.py +++ b/tests/app/celery/test_tasks.py @@ -1362,6 +1362,25 @@ def test_process_incomplete_job_sms(mocker, sample_template): assert save_sms.call_count == 8 # There are 10 in the file and we've added two already +@freeze_time('2017-01-01') +def test_process_incomplete_job_resets_start_time(mocker, sample_template): + mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_sms')) + save_sms = mocker.patch('app.celery.tasks.save_sms.apply_async') + + job = create_job(template=sample_template, notification_count=10, + created_at=datetime.utcnow() - timedelta(hours=2), + scheduled_for=datetime.utcnow() - timedelta(minutes=31), + processing_started=datetime.utcnow() - timedelta(minutes=31), + job_status=JOB_STATUS_IN_PROGRESS) + + process_incomplete_job(str(job.id)) + + completed_job = Job.query.get(job.id) + + assert completed_job.processing_started == datetime.utcnow() + assert completed_job.job_status == JOB_STATUS_FINISHED + + def test_process_incomplete_job_with_notifications_all_sent(mocker, sample_template): mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_sms'))