We have a scheduled task to check that all the jobs have completed, this will catch if an app is shut down and the job is complete yet, we only wait 10 seconds before forcing the app to shut down.

The task was raising a JobIncompleteError, yet it's not an error the task is performing it's task correctly and calling the appropriate task to restart the job.
Also used apply_sync to create the task instead of send_task.
This commit is contained in:
Rebecca Law
2020-07-22 17:00:20 +01:00
parent ec5eeac0aa
commit dd126df122
4 changed files with 29 additions and 83 deletions

View File

@@ -12,10 +12,10 @@ from app import notify_celery, zendesk_client
from app.celery.tasks import ( from app.celery.tasks import (
process_job, process_job,
get_recipient_csv_and_template_and_sender_id, get_recipient_csv_and_template_and_sender_id,
process_row process_row,
) process_incomplete_jobs)
from app.celery.letters_pdf_tasks import get_pdf_for_templated_letter from app.celery.letters_pdf_tasks import get_pdf_for_templated_letter
from app.config import QueueNames, TaskNames from app.config import QueueNames
from app.dao.invited_org_user_dao import delete_org_invitations_created_more_than_two_days_ago from app.dao.invited_org_user_dao import delete_org_invitations_created_more_than_two_days_ago
from app.dao.invited_user_dao import delete_invitations_created_more_than_two_days_ago from app.dao.invited_user_dao import delete_invitations_created_more_than_two_days_ago
from app.dao.jobs_dao import ( from app.dao.jobs_dao import (
@@ -45,7 +45,6 @@ from app.models import (
EMAIL_TYPE, EMAIL_TYPE,
) )
from app.notifications.process_notifications import send_notification_to_queue from app.notifications.process_notifications import send_notification_to_queue
from app.v2.errors import JobIncompleteError
@notify_celery.task(name="run-scheduled-jobs") @notify_celery.task(name="run-scheduled-jobs")
@@ -149,12 +148,11 @@ def check_job_status():
job_ids.append(str(job.id)) job_ids.append(str(job.id))
if job_ids: if job_ids:
notify_celery.send_task( current_app.logger.info("Job(s) {} have not completed.".format(job_ids))
name=TaskNames.PROCESS_INCOMPLETE_JOBS, process_incomplete_jobs.apply_async(
args=(job_ids,), [job_ids],
queue=QueueNames.JOBS queue=QueueNames.JOBS
) )
raise JobIncompleteError("Job(s) {} have not completed.".format(job_ids))
@notify_celery.task(name='replay-created-notifications') @notify_celery.task(name='replay-created-notifications')

View File

@@ -10,23 +10,6 @@ from app.authentication.auth import AuthError
from app.errors import InvalidRequest from app.errors import InvalidRequest
class JobIncompleteError(Exception):
def __init__(self, message):
self.message = message
self.status_code = 500
def to_dict_v2(self):
return {
'status_code': self.status_code,
"errors": [
{
"error": 'JobIncompleteError',
"message": self.message
}
]
}
class TooManyRequestsError(InvalidRequest): class TooManyRequestsError(InvalidRequest):
status_code = 429 status_code = 429
message_template = 'Exceeded send limits ({}) for today' message_template = 'Exceeded send limits ({}) for today'
@@ -91,10 +74,6 @@ def register_errors(blueprint):
current_app.logger.info(error) current_app.logger.info(error)
return jsonify(json.loads(error.message)), 400 return jsonify(json.loads(error.message)), 400
@blueprint.errorhandler(JobIncompleteError)
def job_incomplete_error(error):
return jsonify(error.to_dict_v2()), 500
@blueprint.errorhandler(NoResultFound) @blueprint.errorhandler(NoResultFound)
@blueprint.errorhandler(DataError) @blueprint.errorhandler(DataError)
def no_result_found(e): def no_result_found(e):

View File

@@ -19,7 +19,7 @@ from app.celery.scheduled_tasks import (
check_for_services_with_high_failure_rates_or_sending_to_tv_numbers, check_for_services_with_high_failure_rates_or_sending_to_tv_numbers,
switch_current_sms_provider_on_slow_delivery, switch_current_sms_provider_on_slow_delivery,
) )
from app.config import QueueNames, TaskNames, Config from app.config import QueueNames, Config
from app.dao.jobs_dao import dao_get_job_by_id from app.dao.jobs_dao import dao_get_job_by_id
from app.dao.provider_details_dao import get_provider_details_by_identifier from app.dao.provider_details_dao import get_provider_details_by_identifier
from app.models import ( from app.models import (
@@ -29,7 +29,6 @@ from app.models import (
NOTIFICATION_DELIVERED, NOTIFICATION_DELIVERED,
NOTIFICATION_PENDING_VIRUS_CHECK, NOTIFICATION_PENDING_VIRUS_CHECK,
) )
from app.v2.errors import JobIncompleteError
from tests.app import load_example_csv from tests.app import load_example_csv
from tests.app.db import ( from tests.app.db import (
@@ -141,44 +140,40 @@ def test_switch_current_sms_provider_on_slow_delivery_does_nothing_if_no_need(
assert mock_reduce.called is False assert mock_reduce.called is False
def test_check_job_status_task_raises_job_incomplete_error(mocker, sample_template): def test_check_job_status_task_calls_process_incomplete_jobs(mocker, sample_template):
mock_celery = mocker.patch('app.celery.tasks.notify_celery.send_task') mock_celery = mocker.patch('app.celery.tasks.process_incomplete_jobs.apply_async')
job = create_job(template=sample_template, notification_count=3, job = create_job(template=sample_template, notification_count=3,
created_at=datetime.utcnow() - timedelta(minutes=31), created_at=datetime.utcnow() - timedelta(minutes=31),
processing_started=datetime.utcnow() - timedelta(minutes=31), processing_started=datetime.utcnow() - timedelta(minutes=31),
job_status=JOB_STATUS_IN_PROGRESS) job_status=JOB_STATUS_IN_PROGRESS)
create_notification(template=sample_template, job=job) create_notification(template=sample_template, job=job)
with pytest.raises(expected_exception=JobIncompleteError) as e: check_job_status()
check_job_status()
assert e.value.message == "Job(s) ['{}'] have not completed.".format(str(job.id))
mock_celery.assert_called_once_with( mock_celery.assert_called_once_with(
name=TaskNames.PROCESS_INCOMPLETE_JOBS, [[str(job.id)]],
args=([str(job.id)],),
queue=QueueNames.JOBS queue=QueueNames.JOBS
) )
def test_check_job_status_task_raises_job_incomplete_error_when_scheduled_job_is_not_complete(mocker, sample_template): def test_check_job_status_task_calls_process_incomplete_jobs_when_scheduled_job_is_not_complete(
mock_celery = mocker.patch('app.celery.tasks.notify_celery.send_task') mocker, sample_template
):
mock_celery = mocker.patch('app.celery.tasks.process_incomplete_jobs.apply_async')
job = create_job(template=sample_template, notification_count=3, job = create_job(template=sample_template, notification_count=3,
created_at=datetime.utcnow() - timedelta(hours=2), created_at=datetime.utcnow() - timedelta(hours=2),
scheduled_for=datetime.utcnow() - timedelta(minutes=31), scheduled_for=datetime.utcnow() - timedelta(minutes=31),
processing_started=datetime.utcnow() - timedelta(minutes=31), processing_started=datetime.utcnow() - timedelta(minutes=31),
job_status=JOB_STATUS_IN_PROGRESS) job_status=JOB_STATUS_IN_PROGRESS)
with pytest.raises(expected_exception=JobIncompleteError) as e: check_job_status()
check_job_status()
assert e.value.message == "Job(s) ['{}'] have not completed.".format(str(job.id))
mock_celery.assert_called_once_with( mock_celery.assert_called_once_with(
name=TaskNames.PROCESS_INCOMPLETE_JOBS, [[str(job.id)]],
args=([str(job.id)],),
queue=QueueNames.JOBS queue=QueueNames.JOBS
) )
def test_check_job_status_task_raises_job_incomplete_error_for_multiple_jobs(mocker, sample_template): def test_check_job_status_task_calls_process_incomplete_jobs_for_multiple_jobs(mocker, sample_template):
mock_celery = mocker.patch('app.celery.tasks.notify_celery.send_task') mock_celery = mocker.patch('app.celery.tasks.process_incomplete_jobs.apply_async')
job = create_job(template=sample_template, notification_count=3, job = create_job(template=sample_template, notification_count=3,
created_at=datetime.utcnow() - timedelta(hours=2), created_at=datetime.utcnow() - timedelta(hours=2),
scheduled_for=datetime.utcnow() - timedelta(minutes=31), scheduled_for=datetime.utcnow() - timedelta(minutes=31),
@@ -189,20 +184,16 @@ def test_check_job_status_task_raises_job_incomplete_error_for_multiple_jobs(moc
scheduled_for=datetime.utcnow() - timedelta(minutes=31), scheduled_for=datetime.utcnow() - timedelta(minutes=31),
processing_started=datetime.utcnow() - timedelta(minutes=31), processing_started=datetime.utcnow() - timedelta(minutes=31),
job_status=JOB_STATUS_IN_PROGRESS) job_status=JOB_STATUS_IN_PROGRESS)
with pytest.raises(expected_exception=JobIncompleteError) as e: check_job_status()
check_job_status()
assert str(job.id) in e.value.message
assert str(job_2.id) in e.value.message
mock_celery.assert_called_once_with( mock_celery.assert_called_once_with(
name=TaskNames.PROCESS_INCOMPLETE_JOBS, [[str(job.id), str(job_2.id)]],
args=([str(job.id), str(job_2.id)],),
queue=QueueNames.JOBS queue=QueueNames.JOBS
) )
def test_check_job_status_task_only_sends_old_tasks(mocker, sample_template): def test_check_job_status_task_only_sends_old_tasks(mocker, sample_template):
mock_celery = mocker.patch('app.celery.tasks.notify_celery.send_task') mock_celery = mocker.patch('app.celery.tasks.process_incomplete_jobs.apply_async')
job = create_job( job = create_job(
template=sample_template, template=sample_template,
notification_count=3, notification_count=3,
@@ -211,28 +202,24 @@ def test_check_job_status_task_only_sends_old_tasks(mocker, sample_template):
processing_started=datetime.utcnow() - timedelta(minutes=31), processing_started=datetime.utcnow() - timedelta(minutes=31),
job_status=JOB_STATUS_IN_PROGRESS job_status=JOB_STATUS_IN_PROGRESS
) )
job_2 = create_job( create_job(
template=sample_template, template=sample_template,
notification_count=3, notification_count=3,
created_at=datetime.utcnow() - timedelta(minutes=31), created_at=datetime.utcnow() - timedelta(minutes=31),
processing_started=datetime.utcnow() - timedelta(minutes=29), processing_started=datetime.utcnow() - timedelta(minutes=29),
job_status=JOB_STATUS_IN_PROGRESS job_status=JOB_STATUS_IN_PROGRESS
) )
with pytest.raises(expected_exception=JobIncompleteError) as e: check_job_status()
check_job_status()
assert str(job.id) in e.value.message
assert str(job_2.id) not in e.value.message
# job 2 not in celery task # job 2 not in celery task
mock_celery.assert_called_once_with( mock_celery.assert_called_once_with(
name=TaskNames.PROCESS_INCOMPLETE_JOBS, [[str(job.id)]],
args=([str(job.id)],),
queue=QueueNames.JOBS queue=QueueNames.JOBS
) )
def test_check_job_status_task_sets_jobs_to_error(mocker, sample_template): def test_check_job_status_task_sets_jobs_to_error(mocker, sample_template):
mock_celery = mocker.patch('app.celery.tasks.notify_celery.send_task') mock_celery = mocker.patch('app.celery.tasks.process_incomplete_jobs.apply_async')
job = create_job( job = create_job(
template=sample_template, template=sample_template,
notification_count=3, notification_count=3,
@@ -248,15 +235,11 @@ def test_check_job_status_task_sets_jobs_to_error(mocker, sample_template):
processing_started=datetime.utcnow() - timedelta(minutes=29), processing_started=datetime.utcnow() - timedelta(minutes=29),
job_status=JOB_STATUS_IN_PROGRESS job_status=JOB_STATUS_IN_PROGRESS
) )
with pytest.raises(expected_exception=JobIncompleteError) as e: check_job_status()
check_job_status()
assert str(job.id) in e.value.message
assert str(job_2.id) not in e.value.message
# job 2 not in celery task # job 2 not in celery task
mock_celery.assert_called_once_with( mock_celery.assert_called_once_with(
name=TaskNames.PROCESS_INCOMPLETE_JOBS, [[str(job.id)]],
args=([str(job.id)],),
queue=QueueNames.JOBS queue=QueueNames.JOBS
) )
assert job.job_status == JOB_STATUS_ERROR assert job.job_status == JOB_STATUS_ERROR

View File

@@ -8,7 +8,7 @@ def app_for_test():
import flask import flask
from flask import Blueprint from flask import Blueprint
from app.authentication.auth import AuthError from app.authentication.auth import AuthError
from app.v2.errors import BadRequestError, TooManyRequestsError, JobIncompleteError from app.v2.errors import BadRequestError, TooManyRequestsError
from app import init_app from app import init_app
app = flask.Flask(__name__) app = flask.Flask(__name__)
@@ -42,10 +42,6 @@ def app_for_test():
def raising_data_error(): def raising_data_error():
raise DataError("There was a db problem", "params", "orig") raise DataError("There was a db problem", "params", "orig")
@blue.route("raise_job_incomplete_error", methods=["GET"])
def raising_job_incomplete_error():
raise JobIncompleteError("Raising job incomplete error")
@blue.route("raise_exception", methods=["GET"]) @blue.route("raise_exception", methods=["GET"])
def raising_exception(): def raising_exception():
raise AssertionError("Raising any old exception") raise AssertionError("Raising any old exception")
@@ -114,16 +110,6 @@ def test_data_errors(app_for_test):
"errors": [{"error": "DataError", "message": "No result found"}]} "errors": [{"error": "DataError", "message": "No result found"}]}
def test_job_incomplete_errors(app_for_test):
with app_for_test.test_request_context():
with app_for_test.test_client() as client:
response = client.get(url_for('v2_under_test.raising_job_incomplete_error'))
assert response.status_code == 500
error = response.json
assert error == {"status_code": 500,
"errors": [{"error": "JobIncompleteError", "message": "Raising job incomplete error"}]}
def test_internal_server_error_handler(app_for_test): def test_internal_server_error_handler(app_for_test):
with app_for_test.test_request_context(): with app_for_test.test_request_context():
with app_for_test.test_client() as client: with app_for_test.test_client() as client: