diff --git a/app/celery/scheduled_tasks.py b/app/celery/scheduled_tasks.py index 92070f1fb..4fcac0bbe 100644 --- a/app/celery/scheduled_tasks.py +++ b/app/celery/scheduled_tasks.py @@ -12,10 +12,10 @@ from app import notify_celery, zendesk_client from app.celery.tasks import ( process_job, get_recipient_csv_and_template_and_sender_id, - process_row -) + process_row, + process_incomplete_jobs) from app.celery.letters_pdf_tasks import get_pdf_for_templated_letter -from app.config import QueueNames, TaskNames +from app.config import QueueNames from app.dao.invited_org_user_dao import delete_org_invitations_created_more_than_two_days_ago from app.dao.invited_user_dao import delete_invitations_created_more_than_two_days_ago from app.dao.jobs_dao import ( @@ -45,7 +45,6 @@ from app.models import ( EMAIL_TYPE, ) from app.notifications.process_notifications import send_notification_to_queue -from app.v2.errors import JobIncompleteError @notify_celery.task(name="run-scheduled-jobs") @@ -149,12 +148,11 @@ def check_job_status(): job_ids.append(str(job.id)) if job_ids: - notify_celery.send_task( - name=TaskNames.PROCESS_INCOMPLETE_JOBS, - args=(job_ids,), + current_app.logger.info("Job(s) {} have not completed.".format(job_ids)) + process_incomplete_jobs.apply_async( + [job_ids], queue=QueueNames.JOBS ) - raise JobIncompleteError("Job(s) {} have not completed.".format(job_ids)) @notify_celery.task(name='replay-created-notifications') diff --git a/app/v2/errors.py b/app/v2/errors.py index 2b726e23d..9a6927071 100644 --- a/app/v2/errors.py +++ b/app/v2/errors.py @@ -10,23 +10,6 @@ from app.authentication.auth import AuthError from app.errors import InvalidRequest -class JobIncompleteError(Exception): - def __init__(self, message): - self.message = message - self.status_code = 500 - - def to_dict_v2(self): - return { - 'status_code': self.status_code, - "errors": [ - { - "error": 'JobIncompleteError', - "message": self.message - } - ] - } - - class TooManyRequestsError(InvalidRequest): status_code = 429 message_template = 'Exceeded send limits ({}) for today' @@ -91,10 +74,6 @@ def register_errors(blueprint): current_app.logger.info(error) return jsonify(json.loads(error.message)), 400 - @blueprint.errorhandler(JobIncompleteError) - def job_incomplete_error(error): - return jsonify(error.to_dict_v2()), 500 - @blueprint.errorhandler(NoResultFound) @blueprint.errorhandler(DataError) def no_result_found(e): diff --git a/tests/app/celery/test_scheduled_tasks.py b/tests/app/celery/test_scheduled_tasks.py index 25d122f7d..655d6e699 100644 --- a/tests/app/celery/test_scheduled_tasks.py +++ b/tests/app/celery/test_scheduled_tasks.py @@ -19,7 +19,7 @@ from app.celery.scheduled_tasks import ( check_for_services_with_high_failure_rates_or_sending_to_tv_numbers, switch_current_sms_provider_on_slow_delivery, ) -from app.config import QueueNames, TaskNames, Config +from app.config import QueueNames, Config from app.dao.jobs_dao import dao_get_job_by_id from app.dao.provider_details_dao import get_provider_details_by_identifier from app.models import ( @@ -29,7 +29,6 @@ from app.models import ( NOTIFICATION_DELIVERED, NOTIFICATION_PENDING_VIRUS_CHECK, ) -from app.v2.errors import JobIncompleteError from tests.app import load_example_csv from tests.app.db import ( @@ -141,44 +140,40 @@ def test_switch_current_sms_provider_on_slow_delivery_does_nothing_if_no_need( assert mock_reduce.called is False -def test_check_job_status_task_raises_job_incomplete_error(mocker, sample_template): - mock_celery = mocker.patch('app.celery.tasks.notify_celery.send_task') +def test_check_job_status_task_calls_process_incomplete_jobs(mocker, sample_template): + mock_celery = mocker.patch('app.celery.tasks.process_incomplete_jobs.apply_async') job = create_job(template=sample_template, notification_count=3, created_at=datetime.utcnow() - timedelta(minutes=31), processing_started=datetime.utcnow() - timedelta(minutes=31), job_status=JOB_STATUS_IN_PROGRESS) create_notification(template=sample_template, job=job) - with pytest.raises(expected_exception=JobIncompleteError) as e: - check_job_status() - assert e.value.message == "Job(s) ['{}'] have not completed.".format(str(job.id)) + check_job_status() mock_celery.assert_called_once_with( - name=TaskNames.PROCESS_INCOMPLETE_JOBS, - args=([str(job.id)],), + [[str(job.id)]], queue=QueueNames.JOBS ) -def test_check_job_status_task_raises_job_incomplete_error_when_scheduled_job_is_not_complete(mocker, sample_template): - mock_celery = mocker.patch('app.celery.tasks.notify_celery.send_task') +def test_check_job_status_task_calls_process_incomplete_jobs_when_scheduled_job_is_not_complete( + mocker, sample_template +): + mock_celery = mocker.patch('app.celery.tasks.process_incomplete_jobs.apply_async') job = create_job(template=sample_template, notification_count=3, created_at=datetime.utcnow() - timedelta(hours=2), scheduled_for=datetime.utcnow() - timedelta(minutes=31), processing_started=datetime.utcnow() - timedelta(minutes=31), job_status=JOB_STATUS_IN_PROGRESS) - with pytest.raises(expected_exception=JobIncompleteError) as e: - check_job_status() - assert e.value.message == "Job(s) ['{}'] have not completed.".format(str(job.id)) + check_job_status() mock_celery.assert_called_once_with( - name=TaskNames.PROCESS_INCOMPLETE_JOBS, - args=([str(job.id)],), + [[str(job.id)]], queue=QueueNames.JOBS ) -def test_check_job_status_task_raises_job_incomplete_error_for_multiple_jobs(mocker, sample_template): - mock_celery = mocker.patch('app.celery.tasks.notify_celery.send_task') +def test_check_job_status_task_calls_process_incomplete_jobs_for_multiple_jobs(mocker, sample_template): + mock_celery = mocker.patch('app.celery.tasks.process_incomplete_jobs.apply_async') job = create_job(template=sample_template, notification_count=3, created_at=datetime.utcnow() - timedelta(hours=2), scheduled_for=datetime.utcnow() - timedelta(minutes=31), @@ -189,20 +184,16 @@ def test_check_job_status_task_raises_job_incomplete_error_for_multiple_jobs(moc scheduled_for=datetime.utcnow() - timedelta(minutes=31), processing_started=datetime.utcnow() - timedelta(minutes=31), job_status=JOB_STATUS_IN_PROGRESS) - with pytest.raises(expected_exception=JobIncompleteError) as e: - check_job_status() - assert str(job.id) in e.value.message - assert str(job_2.id) in e.value.message + check_job_status() mock_celery.assert_called_once_with( - name=TaskNames.PROCESS_INCOMPLETE_JOBS, - args=([str(job.id), str(job_2.id)],), + [[str(job.id), str(job_2.id)]], queue=QueueNames.JOBS ) def test_check_job_status_task_only_sends_old_tasks(mocker, sample_template): - mock_celery = mocker.patch('app.celery.tasks.notify_celery.send_task') + mock_celery = mocker.patch('app.celery.tasks.process_incomplete_jobs.apply_async') job = create_job( template=sample_template, notification_count=3, @@ -211,28 +202,24 @@ def test_check_job_status_task_only_sends_old_tasks(mocker, sample_template): processing_started=datetime.utcnow() - timedelta(minutes=31), job_status=JOB_STATUS_IN_PROGRESS ) - job_2 = create_job( + create_job( template=sample_template, notification_count=3, created_at=datetime.utcnow() - timedelta(minutes=31), processing_started=datetime.utcnow() - timedelta(minutes=29), job_status=JOB_STATUS_IN_PROGRESS ) - with pytest.raises(expected_exception=JobIncompleteError) as e: - check_job_status() - assert str(job.id) in e.value.message - assert str(job_2.id) not in e.value.message + check_job_status() # job 2 not in celery task mock_celery.assert_called_once_with( - name=TaskNames.PROCESS_INCOMPLETE_JOBS, - args=([str(job.id)],), + [[str(job.id)]], queue=QueueNames.JOBS ) def test_check_job_status_task_sets_jobs_to_error(mocker, sample_template): - mock_celery = mocker.patch('app.celery.tasks.notify_celery.send_task') + mock_celery = mocker.patch('app.celery.tasks.process_incomplete_jobs.apply_async') job = create_job( template=sample_template, notification_count=3, @@ -248,15 +235,11 @@ def test_check_job_status_task_sets_jobs_to_error(mocker, sample_template): processing_started=datetime.utcnow() - timedelta(minutes=29), job_status=JOB_STATUS_IN_PROGRESS ) - with pytest.raises(expected_exception=JobIncompleteError) as e: - check_job_status() - assert str(job.id) in e.value.message - assert str(job_2.id) not in e.value.message + check_job_status() # job 2 not in celery task mock_celery.assert_called_once_with( - name=TaskNames.PROCESS_INCOMPLETE_JOBS, - args=([str(job.id)],), + [[str(job.id)]], queue=QueueNames.JOBS ) assert job.job_status == JOB_STATUS_ERROR diff --git a/tests/app/v2/test_errors.py b/tests/app/v2/test_errors.py index 390afb605..04a488ed6 100644 --- a/tests/app/v2/test_errors.py +++ b/tests/app/v2/test_errors.py @@ -8,7 +8,7 @@ def app_for_test(): import flask from flask import Blueprint from app.authentication.auth import AuthError - from app.v2.errors import BadRequestError, TooManyRequestsError, JobIncompleteError + from app.v2.errors import BadRequestError, TooManyRequestsError from app import init_app app = flask.Flask(__name__) @@ -42,10 +42,6 @@ def app_for_test(): def raising_data_error(): raise DataError("There was a db problem", "params", "orig") - @blue.route("raise_job_incomplete_error", methods=["GET"]) - def raising_job_incomplete_error(): - raise JobIncompleteError("Raising job incomplete error") - @blue.route("raise_exception", methods=["GET"]) def raising_exception(): raise AssertionError("Raising any old exception") @@ -114,16 +110,6 @@ def test_data_errors(app_for_test): "errors": [{"error": "DataError", "message": "No result found"}]} -def test_job_incomplete_errors(app_for_test): - with app_for_test.test_request_context(): - with app_for_test.test_client() as client: - response = client.get(url_for('v2_under_test.raising_job_incomplete_error')) - assert response.status_code == 500 - error = response.json - assert error == {"status_code": 500, - "errors": [{"error": "JobIncompleteError", "message": "Raising job incomplete error"}]} - - def test_internal_server_error_handler(app_for_test): with app_for_test.test_request_context(): with app_for_test.test_client() as client: