From f0ca3d40de7dd03f6afed212409a1d5f2d4dd4e9 Mon Sep 17 00:00:00 2001 From: Leo Hemsted Date: Fri, 9 Mar 2018 16:30:50 +0000 Subject: [PATCH 1/3] reset job processing time when re-processing incomplete jobs we might stop processing jobs mid-way through if, for example, a deploy or downscale kills the box working on it. We have a scheduled task that identifies any job that we started processing more than half an hour ago that is still processing. However, we encountered a bug where we triggered the process_incomplete_job multiple times, because the processing_started of the job was still set to half an hour ago. If we reset the processing_started to the current time, then it won't get picked up by future runs of the check_job_status scheduled task. --- app/celery/tasks.py | 4 ++++ tests/app/celery/test_scheduled_tasks.py | 30 ++++++++++++++++++++++++ tests/app/celery/test_tasks.py | 19 +++++++++++++++ 3 files changed, 53 insertions(+) diff --git a/app/celery/tasks.py b/app/celery/tasks.py index 942a0ec5f..0763d02e2 100644 --- a/app/celery/tasks.py +++ b/app/celery/tasks.py @@ -559,6 +559,10 @@ def process_incomplete_job(job_id): job = dao_get_job_by_id(job_id) + # reset the processing start time so that the check_job_status scheduled task doesn't pick this job up again + job.processing_started = datetime.utcnow() + dao_update_job(job) + last_notification_added = dao_get_last_notification_added_for_job_id(job_id) if last_notification_added: diff --git a/tests/app/celery/test_scheduled_tasks.py b/tests/app/celery/test_scheduled_tasks.py index 8e3a3a33e..72a25aa19 100644 --- a/tests/app/celery/test_scheduled_tasks.py +++ b/tests/app/celery/test_scheduled_tasks.py @@ -896,6 +896,36 @@ def test_check_job_status_task_raises_job_incomplete_error_for_multiple_jobs(moc ) +def test_check_job_status_task_only_sends_old_tasks(mocker, sample_template): + mock_celery = mocker.patch('app.celery.tasks.notify_celery.send_task') + job = create_job( + template=sample_template, + notification_count=3, + created_at=datetime.utcnow() - timedelta(hours=2), + scheduled_for=datetime.utcnow() - timedelta(minutes=31), + processing_started=datetime.utcnow() - timedelta(minutes=31), + job_status=JOB_STATUS_IN_PROGRESS + ) + job_2 = create_job( + template=sample_template, + notification_count=3, + created_at=datetime.utcnow() - timedelta(minutes=31), + processing_started=datetime.utcnow() - timedelta(minutes=29), + job_status=JOB_STATUS_IN_PROGRESS + ) + with pytest.raises(expected_exception=JobIncompleteError) as e: + check_job_status() + assert str(job.id) in e.value.message + assert str(job_2.id) not in e.value.message + + # job 2 not in celery task + mock_celery.assert_called_once_with( + name=TaskNames.PROCESS_INCOMPLETE_JOBS, + args=([str(job.id)],), + queue=QueueNames.JOBS + ) + + def test_daily_stats_template_usage_by_month(notify_db, notify_db_session): notification_history = functools.partial( create_notification_history, diff --git a/tests/app/celery/test_tasks.py b/tests/app/celery/test_tasks.py index e996ed62b..3f48da1e6 100644 --- a/tests/app/celery/test_tasks.py +++ b/tests/app/celery/test_tasks.py @@ -1362,6 +1362,25 @@ def test_process_incomplete_job_sms(mocker, sample_template): assert save_sms.call_count == 8 # There are 10 in the file and we've added two already +@freeze_time('2017-01-01') +def test_process_incomplete_job_resets_start_time(mocker, sample_template): + mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_sms')) + save_sms = mocker.patch('app.celery.tasks.save_sms.apply_async') + + job = create_job(template=sample_template, notification_count=10, + created_at=datetime.utcnow() - timedelta(hours=2), + scheduled_for=datetime.utcnow() - timedelta(minutes=31), + processing_started=datetime.utcnow() - timedelta(minutes=31), + job_status=JOB_STATUS_IN_PROGRESS) + + process_incomplete_job(str(job.id)) + + completed_job = Job.query.get(job.id) + + assert completed_job.processing_started == datetime.utcnow() + assert completed_job.job_status == JOB_STATUS_FINISHED + + def test_process_incomplete_job_with_notifications_all_sent(mocker, sample_template): mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_sms')) From 64bb94af9e1a51fc14cc93736192d525ee4cb2f6 Mon Sep 17 00:00:00 2001 From: Leo Hemsted Date: Fri, 9 Mar 2018 16:34:47 +0000 Subject: [PATCH 2/3] set job status to error in the check_job_status scheduled task the process_incomplete_jobs task runs through all incomplete jobs in a loop, so it might not get a chance to update the processing_started time of the last job before check_job_status runs again (every minute). So before we even trigger the process_incomplete_jobs task, lets set the status of the jobs to error, so that we don't identify them for re-processing again. --- app/celery/scheduled_tasks.py | 13 ++++++++-- app/celery/tasks.py | 1 + tests/app/celery/test_scheduled_tasks.py | 33 ++++++++++++++++++++++++ tests/app/celery/test_tasks.py | 2 +- 4 files changed, 46 insertions(+), 3 deletions(-) diff --git a/app/celery/scheduled_tasks.py b/app/celery/scheduled_tasks.py index b5f904f2b..2e3e725d4 100644 --- a/app/celery/scheduled_tasks.py +++ b/app/celery/scheduled_tasks.py @@ -45,13 +45,15 @@ from app.dao.provider_details_dao import ( dao_toggle_sms_provider ) from app.dao.users_dao import delete_codes_older_created_more_than_a_day_ago +from app.dao.jobs_dao import dao_update_job from app.models import ( Job, Notification, NOTIFICATION_SENDING, LETTER_TYPE, JOB_STATUS_IN_PROGRESS, - JOB_STATUS_READY_TO_SEND + JOB_STATUS_READY_TO_SEND, + JOB_STATUS_ERROR ) from app.notifications.process_notifications import send_notification_to_queue from app.celery.tasks import ( @@ -442,7 +444,14 @@ def check_job_status(): and_(thirty_five_minutes_ago < Job.processing_started, Job.processing_started < thirty_minutes_ago) ).order_by(Job.processing_started).all() - job_ids = [str(x.id) for x in jobs_not_complete_after_30_minutes] + # temporarily mark them as ERROR so that they don't get picked up by future check_job_status tasks + # if they haven't been re-processed in time. + job_ids = [] + for job in jobs_not_complete_after_30_minutes: + job.job_status = JOB_STATUS_ERROR + dao_update_job(job) + job_ids.append(str(job.id)) + if job_ids: notify_celery.send_task( name=TaskNames.PROCESS_INCOMPLETE_JOBS, diff --git a/app/celery/tasks.py b/app/celery/tasks.py index 0763d02e2..ab9389685 100644 --- a/app/celery/tasks.py +++ b/app/celery/tasks.py @@ -560,6 +560,7 @@ def process_incomplete_job(job_id): job = dao_get_job_by_id(job_id) # reset the processing start time so that the check_job_status scheduled task doesn't pick this job up again + job.job_status = JOB_STATUS_PENDING job.processing_started = datetime.utcnow() dao_update_job(job) diff --git a/tests/app/celery/test_scheduled_tasks.py b/tests/app/celery/test_scheduled_tasks.py index 72a25aa19..40023c92d 100644 --- a/tests/app/celery/test_scheduled_tasks.py +++ b/tests/app/celery/test_scheduled_tasks.py @@ -53,6 +53,7 @@ from app.models import ( JOB_STATUS_READY_TO_SEND, JOB_STATUS_IN_PROGRESS, JOB_STATUS_SENT_TO_DVLA, + JOB_STATUS_ERROR, LETTER_TYPE, SMS_TYPE ) @@ -926,6 +927,38 @@ def test_check_job_status_task_only_sends_old_tasks(mocker, sample_template): ) +def test_check_job_status_task_sets_jobs_to_error(mocker, sample_template): + mock_celery = mocker.patch('app.celery.tasks.notify_celery.send_task') + job = create_job( + template=sample_template, + notification_count=3, + created_at=datetime.utcnow() - timedelta(hours=2), + scheduled_for=datetime.utcnow() - timedelta(minutes=31), + processing_started=datetime.utcnow() - timedelta(minutes=31), + job_status=JOB_STATUS_IN_PROGRESS + ) + job_2 = create_job( + template=sample_template, + notification_count=3, + created_at=datetime.utcnow() - timedelta(minutes=31), + processing_started=datetime.utcnow() - timedelta(minutes=29), + job_status=JOB_STATUS_IN_PROGRESS + ) + with pytest.raises(expected_exception=JobIncompleteError) as e: + check_job_status() + assert str(job.id) in e.value.message + assert str(job_2.id) not in e.value.message + + # job 2 not in celery task + mock_celery.assert_called_once_with( + name=TaskNames.PROCESS_INCOMPLETE_JOBS, + args=([str(job.id)],), + queue=QueueNames.JOBS + ) + assert job.job_status == JOB_STATUS_ERROR + assert job_2.job_status == JOB_STATUS_IN_PROGRESS + + def test_daily_stats_template_usage_by_month(notify_db, notify_db_session): notification_history = functools.partial( create_notification_history, diff --git a/tests/app/celery/test_tasks.py b/tests/app/celery/test_tasks.py index 3f48da1e6..9a2f5502c 100644 --- a/tests/app/celery/test_tasks.py +++ b/tests/app/celery/test_tasks.py @@ -1365,7 +1365,7 @@ def test_process_incomplete_job_sms(mocker, sample_template): @freeze_time('2017-01-01') def test_process_incomplete_job_resets_start_time(mocker, sample_template): mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_sms')) - save_sms = mocker.patch('app.celery.tasks.save_sms.apply_async') + mocker.patch('app.celery.tasks.save_sms.apply_async') job = create_job(template=sample_template, notification_count=10, created_at=datetime.utcnow() - timedelta(hours=2), From ea2b0dfbc9c713433bc74d9a275fa500d7e1fed0 Mon Sep 17 00:00:00 2001 From: Leo Hemsted Date: Fri, 9 Mar 2018 17:16:48 +0000 Subject: [PATCH 3/3] set processing_started in before earlier jobs are processed process_incomplete_jobs loops through jobs processing them in a single task. This means that if the job statuses are all 'error', and then the process_incomplete_jobs task fails, the later jobs in the list that never got picked up won't have their status set back to in progress, or their processing_started time - so will be stuck in 'error' forever. Instead, we set the job statuses to in progress and the start time to now before we process any - so if the incomplete_jobs task fails later, the jobs will be picked up (again) by the check_job_statuses task in half an hour's time --- app/celery/tasks.py | 13 ++++--- tests/app/celery/test_tasks.py | 62 +++++++++++++++++++--------------- 2 files changed, 43 insertions(+), 32 deletions(-) diff --git a/app/celery/tasks.py b/app/celery/tasks.py index ab9389685..d7f3d7a15 100644 --- a/app/celery/tasks.py +++ b/app/celery/tasks.py @@ -550,6 +550,14 @@ def send_inbound_sms_to_service(self, inbound_sms_id, service_id): @notify_celery.task(name='process-incomplete-jobs') @statsd(namespace="tasks") def process_incomplete_jobs(job_ids): + jobs = [dao_get_job_by_id(job_id) for job_id in job_ids] + + # reset the processing start time so that the check_job_status scheduled task doesn't pick this job up again + for job in jobs: + job.job_status = JOB_STATUS_IN_PROGRESS + job.processing_started = datetime.utcnow() + dao_update_job(job) + current_app.logger.info("Resuming Job(s) {}".format(job_ids)) for job_id in job_ids: process_incomplete_job(job_id) @@ -559,11 +567,6 @@ def process_incomplete_job(job_id): job = dao_get_job_by_id(job_id) - # reset the processing start time so that the check_job_status scheduled task doesn't pick this job up again - job.job_status = JOB_STATUS_PENDING - job.processing_started = datetime.utcnow() - dao_update_job(job) - last_notification_added = dao_get_last_notification_added_for_job_id(job_id) if last_notification_added: diff --git a/tests/app/celery/test_tasks.py b/tests/app/celery/test_tasks.py index 9a2f5502c..873285e00 100644 --- a/tests/app/celery/test_tasks.py +++ b/tests/app/celery/test_tasks.py @@ -1,7 +1,7 @@ import json import uuid from datetime import datetime, timedelta -from unittest.mock import Mock +from unittest.mock import Mock, call import pytest import requests_mock @@ -38,6 +38,7 @@ from app.models import ( KEY_TYPE_TEAM, KEY_TYPE_TEST, JOB_STATUS_FINISHED, + JOB_STATUS_ERROR, JOB_STATUS_IN_PROGRESS, LETTER_TYPE, SMS_TYPE @@ -1346,7 +1347,7 @@ def test_process_incomplete_job_sms(mocker, sample_template): created_at=datetime.utcnow() - timedelta(hours=2), scheduled_for=datetime.utcnow() - timedelta(minutes=31), processing_started=datetime.utcnow() - timedelta(minutes=31), - job_status=JOB_STATUS_IN_PROGRESS) + job_status=JOB_STATUS_ERROR) create_notification(sample_template, job, 0) create_notification(sample_template, job, 1) @@ -1362,25 +1363,6 @@ def test_process_incomplete_job_sms(mocker, sample_template): assert save_sms.call_count == 8 # There are 10 in the file and we've added two already -@freeze_time('2017-01-01') -def test_process_incomplete_job_resets_start_time(mocker, sample_template): - mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_sms')) - mocker.patch('app.celery.tasks.save_sms.apply_async') - - job = create_job(template=sample_template, notification_count=10, - created_at=datetime.utcnow() - timedelta(hours=2), - scheduled_for=datetime.utcnow() - timedelta(minutes=31), - processing_started=datetime.utcnow() - timedelta(minutes=31), - job_status=JOB_STATUS_IN_PROGRESS) - - process_incomplete_job(str(job.id)) - - completed_job = Job.query.get(job.id) - - assert completed_job.processing_started == datetime.utcnow() - assert completed_job.job_status == JOB_STATUS_FINISHED - - def test_process_incomplete_job_with_notifications_all_sent(mocker, sample_template): mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_sms')) @@ -1390,7 +1372,7 @@ def test_process_incomplete_job_with_notifications_all_sent(mocker, sample_templ created_at=datetime.utcnow() - timedelta(hours=2), scheduled_for=datetime.utcnow() - timedelta(minutes=31), processing_started=datetime.utcnow() - timedelta(minutes=31), - job_status=JOB_STATUS_IN_PROGRESS) + job_status=JOB_STATUS_ERROR) create_notification(sample_template, job, 0) create_notification(sample_template, job, 1) @@ -1423,7 +1405,7 @@ def test_process_incomplete_jobs_sms(mocker, sample_template): created_at=datetime.utcnow() - timedelta(hours=2), scheduled_for=datetime.utcnow() - timedelta(minutes=31), processing_started=datetime.utcnow() - timedelta(minutes=31), - job_status=JOB_STATUS_IN_PROGRESS) + job_status=JOB_STATUS_ERROR) create_notification(sample_template, job, 0) create_notification(sample_template, job, 1) create_notification(sample_template, job, 2) @@ -1434,7 +1416,7 @@ def test_process_incomplete_jobs_sms(mocker, sample_template): created_at=datetime.utcnow() - timedelta(hours=2), scheduled_for=datetime.utcnow() - timedelta(minutes=31), processing_started=datetime.utcnow() - timedelta(minutes=31), - job_status=JOB_STATUS_IN_PROGRESS) + job_status=JOB_STATUS_ERROR) create_notification(sample_template, job2, 0) create_notification(sample_template, job2, 1) @@ -1465,7 +1447,7 @@ def test_process_incomplete_jobs_no_notifications_added(mocker, sample_template) created_at=datetime.utcnow() - timedelta(hours=2), scheduled_for=datetime.utcnow() - timedelta(minutes=31), processing_started=datetime.utcnow() - timedelta(minutes=31), - job_status=JOB_STATUS_IN_PROGRESS) + job_status=JOB_STATUS_ERROR) assert Notification.query.filter(Notification.job_id == job.id).count() == 0 @@ -1509,7 +1491,7 @@ def test_process_incomplete_job_email(mocker, sample_email_template): created_at=datetime.utcnow() - timedelta(hours=2), scheduled_for=datetime.utcnow() - timedelta(minutes=31), processing_started=datetime.utcnow() - timedelta(minutes=31), - job_status=JOB_STATUS_IN_PROGRESS) + job_status=JOB_STATUS_ERROR) create_notification(sample_email_template, job, 0) create_notification(sample_email_template, job, 1) @@ -1533,7 +1515,7 @@ def test_process_incomplete_job_letter(mocker, sample_letter_template): created_at=datetime.utcnow() - timedelta(hours=2), scheduled_for=datetime.utcnow() - timedelta(minutes=31), processing_started=datetime.utcnow() - timedelta(minutes=31), - job_status=JOB_STATUS_IN_PROGRESS) + job_status=JOB_STATUS_ERROR) create_notification(sample_letter_template, job, 0) create_notification(sample_letter_template, job, 1) @@ -1543,3 +1525,29 @@ def test_process_incomplete_job_letter(mocker, sample_letter_template): process_incomplete_job(str(job.id)) assert mock_letter_saver.call_count == 8 + + +@freeze_time('2017-01-01') +def test_process_incomplete_jobs_sets_status_to_in_progress_and_resets_processing_started_time(mocker, sample_template): + mock_process_incomplete_job = mocker.patch('app.celery.tasks.process_incomplete_job') + + job1 = create_job( + sample_template, + processing_started=datetime.utcnow() - timedelta(minutes=30), + job_status=JOB_STATUS_ERROR + ) + job2 = create_job( + sample_template, + processing_started=datetime.utcnow() - timedelta(minutes=31), + job_status=JOB_STATUS_ERROR + ) + + process_incomplete_jobs([str(job1.id), str(job2.id)]) + + assert job1.job_status == JOB_STATUS_IN_PROGRESS + assert job1.processing_started == datetime.utcnow() + + assert job2.job_status == JOB_STATUS_IN_PROGRESS + assert job2.processing_started == datetime.utcnow() + + assert mock_process_incomplete_job.mock_calls == [call(str(job1.id)), call(str(job2.id))]