set processing_started in before earlier jobs are processed

process_incomplete_jobs loops through jobs processing them in a single
task. This means that if the job statuses are all 'error', and then the
process_incomplete_jobs task fails, the later jobs in the list that
never got picked up won't have their status set back to in progress, or
their processing_started time - so will be stuck in 'error' forever.

Instead, we set the job statuses to in progress and the start time to
now before we process any - so if the incomplete_jobs task fails later,
the jobs will be picked up (again) by the check_job_statuses task in
half an hour's time
This commit is contained in:
Leo Hemsted
2018-03-09 17:16:48 +00:00
parent 64bb94af9e
commit ea2b0dfbc9
2 changed files with 43 additions and 32 deletions

View File

@@ -1,7 +1,7 @@
import json
import uuid
from datetime import datetime, timedelta
from unittest.mock import Mock
from unittest.mock import Mock, call
import pytest
import requests_mock
@@ -38,6 +38,7 @@ from app.models import (
KEY_TYPE_TEAM,
KEY_TYPE_TEST,
JOB_STATUS_FINISHED,
JOB_STATUS_ERROR,
JOB_STATUS_IN_PROGRESS,
LETTER_TYPE,
SMS_TYPE
@@ -1346,7 +1347,7 @@ def test_process_incomplete_job_sms(mocker, sample_template):
created_at=datetime.utcnow() - timedelta(hours=2),
scheduled_for=datetime.utcnow() - timedelta(minutes=31),
processing_started=datetime.utcnow() - timedelta(minutes=31),
job_status=JOB_STATUS_IN_PROGRESS)
job_status=JOB_STATUS_ERROR)
create_notification(sample_template, job, 0)
create_notification(sample_template, job, 1)
@@ -1362,25 +1363,6 @@ def test_process_incomplete_job_sms(mocker, sample_template):
assert save_sms.call_count == 8 # There are 10 in the file and we've added two already
@freeze_time('2017-01-01')
def test_process_incomplete_job_resets_start_time(mocker, sample_template):
mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_sms'))
mocker.patch('app.celery.tasks.save_sms.apply_async')
job = create_job(template=sample_template, notification_count=10,
created_at=datetime.utcnow() - timedelta(hours=2),
scheduled_for=datetime.utcnow() - timedelta(minutes=31),
processing_started=datetime.utcnow() - timedelta(minutes=31),
job_status=JOB_STATUS_IN_PROGRESS)
process_incomplete_job(str(job.id))
completed_job = Job.query.get(job.id)
assert completed_job.processing_started == datetime.utcnow()
assert completed_job.job_status == JOB_STATUS_FINISHED
def test_process_incomplete_job_with_notifications_all_sent(mocker, sample_template):
mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_sms'))
@@ -1390,7 +1372,7 @@ def test_process_incomplete_job_with_notifications_all_sent(mocker, sample_templ
created_at=datetime.utcnow() - timedelta(hours=2),
scheduled_for=datetime.utcnow() - timedelta(minutes=31),
processing_started=datetime.utcnow() - timedelta(minutes=31),
job_status=JOB_STATUS_IN_PROGRESS)
job_status=JOB_STATUS_ERROR)
create_notification(sample_template, job, 0)
create_notification(sample_template, job, 1)
@@ -1423,7 +1405,7 @@ def test_process_incomplete_jobs_sms(mocker, sample_template):
created_at=datetime.utcnow() - timedelta(hours=2),
scheduled_for=datetime.utcnow() - timedelta(minutes=31),
processing_started=datetime.utcnow() - timedelta(minutes=31),
job_status=JOB_STATUS_IN_PROGRESS)
job_status=JOB_STATUS_ERROR)
create_notification(sample_template, job, 0)
create_notification(sample_template, job, 1)
create_notification(sample_template, job, 2)
@@ -1434,7 +1416,7 @@ def test_process_incomplete_jobs_sms(mocker, sample_template):
created_at=datetime.utcnow() - timedelta(hours=2),
scheduled_for=datetime.utcnow() - timedelta(minutes=31),
processing_started=datetime.utcnow() - timedelta(minutes=31),
job_status=JOB_STATUS_IN_PROGRESS)
job_status=JOB_STATUS_ERROR)
create_notification(sample_template, job2, 0)
create_notification(sample_template, job2, 1)
@@ -1465,7 +1447,7 @@ def test_process_incomplete_jobs_no_notifications_added(mocker, sample_template)
created_at=datetime.utcnow() - timedelta(hours=2),
scheduled_for=datetime.utcnow() - timedelta(minutes=31),
processing_started=datetime.utcnow() - timedelta(minutes=31),
job_status=JOB_STATUS_IN_PROGRESS)
job_status=JOB_STATUS_ERROR)
assert Notification.query.filter(Notification.job_id == job.id).count() == 0
@@ -1509,7 +1491,7 @@ def test_process_incomplete_job_email(mocker, sample_email_template):
created_at=datetime.utcnow() - timedelta(hours=2),
scheduled_for=datetime.utcnow() - timedelta(minutes=31),
processing_started=datetime.utcnow() - timedelta(minutes=31),
job_status=JOB_STATUS_IN_PROGRESS)
job_status=JOB_STATUS_ERROR)
create_notification(sample_email_template, job, 0)
create_notification(sample_email_template, job, 1)
@@ -1533,7 +1515,7 @@ def test_process_incomplete_job_letter(mocker, sample_letter_template):
created_at=datetime.utcnow() - timedelta(hours=2),
scheduled_for=datetime.utcnow() - timedelta(minutes=31),
processing_started=datetime.utcnow() - timedelta(minutes=31),
job_status=JOB_STATUS_IN_PROGRESS)
job_status=JOB_STATUS_ERROR)
create_notification(sample_letter_template, job, 0)
create_notification(sample_letter_template, job, 1)
@@ -1543,3 +1525,29 @@ def test_process_incomplete_job_letter(mocker, sample_letter_template):
process_incomplete_job(str(job.id))
assert mock_letter_saver.call_count == 8
@freeze_time('2017-01-01')
def test_process_incomplete_jobs_sets_status_to_in_progress_and_resets_processing_started_time(mocker, sample_template):
mock_process_incomplete_job = mocker.patch('app.celery.tasks.process_incomplete_job')
job1 = create_job(
sample_template,
processing_started=datetime.utcnow() - timedelta(minutes=30),
job_status=JOB_STATUS_ERROR
)
job2 = create_job(
sample_template,
processing_started=datetime.utcnow() - timedelta(minutes=31),
job_status=JOB_STATUS_ERROR
)
process_incomplete_jobs([str(job1.id), str(job2.id)])
assert job1.job_status == JOB_STATUS_IN_PROGRESS
assert job1.processing_started == datetime.utcnow()
assert job2.job_status == JOB_STATUS_IN_PROGRESS
assert job2.processing_started == datetime.utcnow()
assert mock_process_incomplete_job.mock_calls == [call(str(job1.id)), call(str(job2.id))]