diff --git a/app/celery/scheduled_tasks.py b/app/celery/scheduled_tasks.py index b5f904f2b..2e3e725d4 100644 --- a/app/celery/scheduled_tasks.py +++ b/app/celery/scheduled_tasks.py @@ -45,13 +45,15 @@ from app.dao.provider_details_dao import ( dao_toggle_sms_provider ) from app.dao.users_dao import delete_codes_older_created_more_than_a_day_ago +from app.dao.jobs_dao import dao_update_job from app.models import ( Job, Notification, NOTIFICATION_SENDING, LETTER_TYPE, JOB_STATUS_IN_PROGRESS, - JOB_STATUS_READY_TO_SEND + JOB_STATUS_READY_TO_SEND, + JOB_STATUS_ERROR ) from app.notifications.process_notifications import send_notification_to_queue from app.celery.tasks import ( @@ -442,7 +444,14 @@ def check_job_status(): and_(thirty_five_minutes_ago < Job.processing_started, Job.processing_started < thirty_minutes_ago) ).order_by(Job.processing_started).all() - job_ids = [str(x.id) for x in jobs_not_complete_after_30_minutes] + # temporarily mark them as ERROR so that they don't get picked up by future check_job_status tasks + # if they haven't been re-processed in time. + job_ids = [] + for job in jobs_not_complete_after_30_minutes: + job.job_status = JOB_STATUS_ERROR + dao_update_job(job) + job_ids.append(str(job.id)) + if job_ids: notify_celery.send_task( name=TaskNames.PROCESS_INCOMPLETE_JOBS, diff --git a/app/celery/tasks.py b/app/celery/tasks.py index 942a0ec5f..d7f3d7a15 100644 --- a/app/celery/tasks.py +++ b/app/celery/tasks.py @@ -550,6 +550,14 @@ def send_inbound_sms_to_service(self, inbound_sms_id, service_id): @notify_celery.task(name='process-incomplete-jobs') @statsd(namespace="tasks") def process_incomplete_jobs(job_ids): + jobs = [dao_get_job_by_id(job_id) for job_id in job_ids] + + # reset the processing start time so that the check_job_status scheduled task doesn't pick this job up again + for job in jobs: + job.job_status = JOB_STATUS_IN_PROGRESS + job.processing_started = datetime.utcnow() + dao_update_job(job) + current_app.logger.info("Resuming Job(s) {}".format(job_ids)) for job_id in job_ids: process_incomplete_job(job_id) diff --git a/tests/app/celery/test_scheduled_tasks.py b/tests/app/celery/test_scheduled_tasks.py index 8e3a3a33e..40023c92d 100644 --- a/tests/app/celery/test_scheduled_tasks.py +++ b/tests/app/celery/test_scheduled_tasks.py @@ -53,6 +53,7 @@ from app.models import ( JOB_STATUS_READY_TO_SEND, JOB_STATUS_IN_PROGRESS, JOB_STATUS_SENT_TO_DVLA, + JOB_STATUS_ERROR, LETTER_TYPE, SMS_TYPE ) @@ -896,6 +897,68 @@ def test_check_job_status_task_raises_job_incomplete_error_for_multiple_jobs(moc ) +def test_check_job_status_task_only_sends_old_tasks(mocker, sample_template): + mock_celery = mocker.patch('app.celery.tasks.notify_celery.send_task') + job = create_job( + template=sample_template, + notification_count=3, + created_at=datetime.utcnow() - timedelta(hours=2), + scheduled_for=datetime.utcnow() - timedelta(minutes=31), + processing_started=datetime.utcnow() - timedelta(minutes=31), + job_status=JOB_STATUS_IN_PROGRESS + ) + job_2 = create_job( + template=sample_template, + notification_count=3, + created_at=datetime.utcnow() - timedelta(minutes=31), + processing_started=datetime.utcnow() - timedelta(minutes=29), + job_status=JOB_STATUS_IN_PROGRESS + ) + with pytest.raises(expected_exception=JobIncompleteError) as e: + check_job_status() + assert str(job.id) in e.value.message + assert str(job_2.id) not in e.value.message + + # job 2 not in celery task + mock_celery.assert_called_once_with( + name=TaskNames.PROCESS_INCOMPLETE_JOBS, + args=([str(job.id)],), + queue=QueueNames.JOBS + ) + + +def test_check_job_status_task_sets_jobs_to_error(mocker, sample_template): + mock_celery = mocker.patch('app.celery.tasks.notify_celery.send_task') + job = create_job( + template=sample_template, + notification_count=3, + created_at=datetime.utcnow() - timedelta(hours=2), + scheduled_for=datetime.utcnow() - timedelta(minutes=31), + processing_started=datetime.utcnow() - timedelta(minutes=31), + job_status=JOB_STATUS_IN_PROGRESS + ) + job_2 = create_job( + template=sample_template, + notification_count=3, + created_at=datetime.utcnow() - timedelta(minutes=31), + processing_started=datetime.utcnow() - timedelta(minutes=29), + job_status=JOB_STATUS_IN_PROGRESS + ) + with pytest.raises(expected_exception=JobIncompleteError) as e: + check_job_status() + assert str(job.id) in e.value.message + assert str(job_2.id) not in e.value.message + + # job 2 not in celery task + mock_celery.assert_called_once_with( + name=TaskNames.PROCESS_INCOMPLETE_JOBS, + args=([str(job.id)],), + queue=QueueNames.JOBS + ) + assert job.job_status == JOB_STATUS_ERROR + assert job_2.job_status == JOB_STATUS_IN_PROGRESS + + def test_daily_stats_template_usage_by_month(notify_db, notify_db_session): notification_history = functools.partial( create_notification_history, diff --git a/tests/app/celery/test_tasks.py b/tests/app/celery/test_tasks.py index e996ed62b..873285e00 100644 --- a/tests/app/celery/test_tasks.py +++ b/tests/app/celery/test_tasks.py @@ -1,7 +1,7 @@ import json import uuid from datetime import datetime, timedelta -from unittest.mock import Mock +from unittest.mock import Mock, call import pytest import requests_mock @@ -38,6 +38,7 @@ from app.models import ( KEY_TYPE_TEAM, KEY_TYPE_TEST, JOB_STATUS_FINISHED, + JOB_STATUS_ERROR, JOB_STATUS_IN_PROGRESS, LETTER_TYPE, SMS_TYPE @@ -1346,7 +1347,7 @@ def test_process_incomplete_job_sms(mocker, sample_template): created_at=datetime.utcnow() - timedelta(hours=2), scheduled_for=datetime.utcnow() - timedelta(minutes=31), processing_started=datetime.utcnow() - timedelta(minutes=31), - job_status=JOB_STATUS_IN_PROGRESS) + job_status=JOB_STATUS_ERROR) create_notification(sample_template, job, 0) create_notification(sample_template, job, 1) @@ -1371,7 +1372,7 @@ def test_process_incomplete_job_with_notifications_all_sent(mocker, sample_templ created_at=datetime.utcnow() - timedelta(hours=2), scheduled_for=datetime.utcnow() - timedelta(minutes=31), processing_started=datetime.utcnow() - timedelta(minutes=31), - job_status=JOB_STATUS_IN_PROGRESS) + job_status=JOB_STATUS_ERROR) create_notification(sample_template, job, 0) create_notification(sample_template, job, 1) @@ -1404,7 +1405,7 @@ def test_process_incomplete_jobs_sms(mocker, sample_template): created_at=datetime.utcnow() - timedelta(hours=2), scheduled_for=datetime.utcnow() - timedelta(minutes=31), processing_started=datetime.utcnow() - timedelta(minutes=31), - job_status=JOB_STATUS_IN_PROGRESS) + job_status=JOB_STATUS_ERROR) create_notification(sample_template, job, 0) create_notification(sample_template, job, 1) create_notification(sample_template, job, 2) @@ -1415,7 +1416,7 @@ def test_process_incomplete_jobs_sms(mocker, sample_template): created_at=datetime.utcnow() - timedelta(hours=2), scheduled_for=datetime.utcnow() - timedelta(minutes=31), processing_started=datetime.utcnow() - timedelta(minutes=31), - job_status=JOB_STATUS_IN_PROGRESS) + job_status=JOB_STATUS_ERROR) create_notification(sample_template, job2, 0) create_notification(sample_template, job2, 1) @@ -1446,7 +1447,7 @@ def test_process_incomplete_jobs_no_notifications_added(mocker, sample_template) created_at=datetime.utcnow() - timedelta(hours=2), scheduled_for=datetime.utcnow() - timedelta(minutes=31), processing_started=datetime.utcnow() - timedelta(minutes=31), - job_status=JOB_STATUS_IN_PROGRESS) + job_status=JOB_STATUS_ERROR) assert Notification.query.filter(Notification.job_id == job.id).count() == 0 @@ -1490,7 +1491,7 @@ def test_process_incomplete_job_email(mocker, sample_email_template): created_at=datetime.utcnow() - timedelta(hours=2), scheduled_for=datetime.utcnow() - timedelta(minutes=31), processing_started=datetime.utcnow() - timedelta(minutes=31), - job_status=JOB_STATUS_IN_PROGRESS) + job_status=JOB_STATUS_ERROR) create_notification(sample_email_template, job, 0) create_notification(sample_email_template, job, 1) @@ -1514,7 +1515,7 @@ def test_process_incomplete_job_letter(mocker, sample_letter_template): created_at=datetime.utcnow() - timedelta(hours=2), scheduled_for=datetime.utcnow() - timedelta(minutes=31), processing_started=datetime.utcnow() - timedelta(minutes=31), - job_status=JOB_STATUS_IN_PROGRESS) + job_status=JOB_STATUS_ERROR) create_notification(sample_letter_template, job, 0) create_notification(sample_letter_template, job, 1) @@ -1524,3 +1525,29 @@ def test_process_incomplete_job_letter(mocker, sample_letter_template): process_incomplete_job(str(job.id)) assert mock_letter_saver.call_count == 8 + + +@freeze_time('2017-01-01') +def test_process_incomplete_jobs_sets_status_to_in_progress_and_resets_processing_started_time(mocker, sample_template): + mock_process_incomplete_job = mocker.patch('app.celery.tasks.process_incomplete_job') + + job1 = create_job( + sample_template, + processing_started=datetime.utcnow() - timedelta(minutes=30), + job_status=JOB_STATUS_ERROR + ) + job2 = create_job( + sample_template, + processing_started=datetime.utcnow() - timedelta(minutes=31), + job_status=JOB_STATUS_ERROR + ) + + process_incomplete_jobs([str(job1.id), str(job2.id)]) + + assert job1.job_status == JOB_STATUS_IN_PROGRESS + assert job1.processing_started == datetime.utcnow() + + assert job2.job_status == JOB_STATUS_IN_PROGRESS + assert job2.processing_started == datetime.utcnow() + + assert mock_process_incomplete_job.mock_calls == [call(str(job1.id)), call(str(job2.id))]