reset job processing time when re-processing incomplete jobs

we might stop processing jobs mid-way through if, for example, a
deploy or downscale kills the box working on it. We have a scheduled
task that identifies any job that we started processing more than half
an hour ago that is still processing.

However, we encountered a bug where we triggered the
process_incomplete_job multiple times, because the processing_started
of the job was still set to half an hour ago. If we reset the
processing_started to the current time, then it won't get picked up by
future runs of the check_job_status scheduled task.
This commit is contained in:
Leo Hemsted
2018-03-09 16:30:50 +00:00
parent c7cc7822f7
commit f0ca3d40de
3 changed files with 53 additions and 0 deletions

View File

@@ -896,6 +896,36 @@ def test_check_job_status_task_raises_job_incomplete_error_for_multiple_jobs(moc
)
def test_check_job_status_task_only_sends_old_tasks(mocker, sample_template):
mock_celery = mocker.patch('app.celery.tasks.notify_celery.send_task')
job = create_job(
template=sample_template,
notification_count=3,
created_at=datetime.utcnow() - timedelta(hours=2),
scheduled_for=datetime.utcnow() - timedelta(minutes=31),
processing_started=datetime.utcnow() - timedelta(minutes=31),
job_status=JOB_STATUS_IN_PROGRESS
)
job_2 = create_job(
template=sample_template,
notification_count=3,
created_at=datetime.utcnow() - timedelta(minutes=31),
processing_started=datetime.utcnow() - timedelta(minutes=29),
job_status=JOB_STATUS_IN_PROGRESS
)
with pytest.raises(expected_exception=JobIncompleteError) as e:
check_job_status()
assert str(job.id) in e.value.message
assert str(job_2.id) not in e.value.message
# job 2 not in celery task
mock_celery.assert_called_once_with(
name=TaskNames.PROCESS_INCOMPLETE_JOBS,
args=([str(job.id)],),
queue=QueueNames.JOBS
)
def test_daily_stats_template_usage_by_month(notify_db, notify_db_session):
notification_history = functools.partial(
create_notification_history,

View File

@@ -1362,6 +1362,25 @@ def test_process_incomplete_job_sms(mocker, sample_template):
assert save_sms.call_count == 8 # There are 10 in the file and we've added two already
@freeze_time('2017-01-01')
def test_process_incomplete_job_resets_start_time(mocker, sample_template):
mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_sms'))
save_sms = mocker.patch('app.celery.tasks.save_sms.apply_async')
job = create_job(template=sample_template, notification_count=10,
created_at=datetime.utcnow() - timedelta(hours=2),
scheduled_for=datetime.utcnow() - timedelta(minutes=31),
processing_started=datetime.utcnow() - timedelta(minutes=31),
job_status=JOB_STATUS_IN_PROGRESS)
process_incomplete_job(str(job.id))
completed_job = Job.query.get(job.id)
assert completed_job.processing_started == datetime.utcnow()
assert completed_job.job_status == JOB_STATUS_FINISHED
def test_process_incomplete_job_with_notifications_all_sent(mocker, sample_template):
mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_sms'))