mirror of
https://github.com/GSA/notifications-api.git
synced 2026-01-12 13:41:19 -05:00
Merge pull request #2935 from alphagov/remove-jobs-incomplete-error
Stop raising error in scheduled task
This commit is contained in:
@@ -12,10 +12,10 @@ from app import notify_celery, zendesk_client
|
||||
from app.celery.tasks import (
|
||||
process_job,
|
||||
get_recipient_csv_and_template_and_sender_id,
|
||||
process_row
|
||||
)
|
||||
process_row,
|
||||
process_incomplete_jobs)
|
||||
from app.celery.letters_pdf_tasks import get_pdf_for_templated_letter
|
||||
from app.config import QueueNames, TaskNames
|
||||
from app.config import QueueNames
|
||||
from app.dao.invited_org_user_dao import delete_org_invitations_created_more_than_two_days_ago
|
||||
from app.dao.invited_user_dao import delete_invitations_created_more_than_two_days_ago
|
||||
from app.dao.jobs_dao import (
|
||||
@@ -45,7 +45,6 @@ from app.models import (
|
||||
EMAIL_TYPE,
|
||||
)
|
||||
from app.notifications.process_notifications import send_notification_to_queue
|
||||
from app.v2.errors import JobIncompleteError
|
||||
|
||||
|
||||
@notify_celery.task(name="run-scheduled-jobs")
|
||||
@@ -149,12 +148,11 @@ def check_job_status():
|
||||
job_ids.append(str(job.id))
|
||||
|
||||
if job_ids:
|
||||
notify_celery.send_task(
|
||||
name=TaskNames.PROCESS_INCOMPLETE_JOBS,
|
||||
args=(job_ids,),
|
||||
current_app.logger.info("Job(s) {} have not completed.".format(job_ids))
|
||||
process_incomplete_jobs.apply_async(
|
||||
[job_ids],
|
||||
queue=QueueNames.JOBS
|
||||
)
|
||||
raise JobIncompleteError("Job(s) {} have not completed.".format(job_ids))
|
||||
|
||||
|
||||
@notify_celery.task(name='replay-created-notifications')
|
||||
|
||||
@@ -10,23 +10,6 @@ from app.authentication.auth import AuthError
|
||||
from app.errors import InvalidRequest
|
||||
|
||||
|
||||
class JobIncompleteError(Exception):
|
||||
def __init__(self, message):
|
||||
self.message = message
|
||||
self.status_code = 500
|
||||
|
||||
def to_dict_v2(self):
|
||||
return {
|
||||
'status_code': self.status_code,
|
||||
"errors": [
|
||||
{
|
||||
"error": 'JobIncompleteError',
|
||||
"message": self.message
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
|
||||
class TooManyRequestsError(InvalidRequest):
|
||||
status_code = 429
|
||||
message_template = 'Exceeded send limits ({}) for today'
|
||||
@@ -91,10 +74,6 @@ def register_errors(blueprint):
|
||||
current_app.logger.info(error)
|
||||
return jsonify(json.loads(error.message)), 400
|
||||
|
||||
@blueprint.errorhandler(JobIncompleteError)
|
||||
def job_incomplete_error(error):
|
||||
return jsonify(error.to_dict_v2()), 500
|
||||
|
||||
@blueprint.errorhandler(NoResultFound)
|
||||
@blueprint.errorhandler(DataError)
|
||||
def no_result_found(e):
|
||||
|
||||
@@ -19,7 +19,7 @@ from app.celery.scheduled_tasks import (
|
||||
check_for_services_with_high_failure_rates_or_sending_to_tv_numbers,
|
||||
switch_current_sms_provider_on_slow_delivery,
|
||||
)
|
||||
from app.config import QueueNames, TaskNames, Config
|
||||
from app.config import QueueNames, Config
|
||||
from app.dao.jobs_dao import dao_get_job_by_id
|
||||
from app.dao.provider_details_dao import get_provider_details_by_identifier
|
||||
from app.models import (
|
||||
@@ -29,7 +29,6 @@ from app.models import (
|
||||
NOTIFICATION_DELIVERED,
|
||||
NOTIFICATION_PENDING_VIRUS_CHECK,
|
||||
)
|
||||
from app.v2.errors import JobIncompleteError
|
||||
from tests.app import load_example_csv
|
||||
|
||||
from tests.app.db import (
|
||||
@@ -141,44 +140,40 @@ def test_switch_current_sms_provider_on_slow_delivery_does_nothing_if_no_need(
|
||||
assert mock_reduce.called is False
|
||||
|
||||
|
||||
def test_check_job_status_task_raises_job_incomplete_error(mocker, sample_template):
|
||||
mock_celery = mocker.patch('app.celery.tasks.notify_celery.send_task')
|
||||
def test_check_job_status_task_calls_process_incomplete_jobs(mocker, sample_template):
|
||||
mock_celery = mocker.patch('app.celery.tasks.process_incomplete_jobs.apply_async')
|
||||
job = create_job(template=sample_template, notification_count=3,
|
||||
created_at=datetime.utcnow() - timedelta(minutes=31),
|
||||
processing_started=datetime.utcnow() - timedelta(minutes=31),
|
||||
job_status=JOB_STATUS_IN_PROGRESS)
|
||||
create_notification(template=sample_template, job=job)
|
||||
with pytest.raises(expected_exception=JobIncompleteError) as e:
|
||||
check_job_status()
|
||||
assert e.value.message == "Job(s) ['{}'] have not completed.".format(str(job.id))
|
||||
check_job_status()
|
||||
|
||||
mock_celery.assert_called_once_with(
|
||||
name=TaskNames.PROCESS_INCOMPLETE_JOBS,
|
||||
args=([str(job.id)],),
|
||||
[[str(job.id)]],
|
||||
queue=QueueNames.JOBS
|
||||
)
|
||||
|
||||
|
||||
def test_check_job_status_task_raises_job_incomplete_error_when_scheduled_job_is_not_complete(mocker, sample_template):
|
||||
mock_celery = mocker.patch('app.celery.tasks.notify_celery.send_task')
|
||||
def test_check_job_status_task_calls_process_incomplete_jobs_when_scheduled_job_is_not_complete(
|
||||
mocker, sample_template
|
||||
):
|
||||
mock_celery = mocker.patch('app.celery.tasks.process_incomplete_jobs.apply_async')
|
||||
job = create_job(template=sample_template, notification_count=3,
|
||||
created_at=datetime.utcnow() - timedelta(hours=2),
|
||||
scheduled_for=datetime.utcnow() - timedelta(minutes=31),
|
||||
processing_started=datetime.utcnow() - timedelta(minutes=31),
|
||||
job_status=JOB_STATUS_IN_PROGRESS)
|
||||
with pytest.raises(expected_exception=JobIncompleteError) as e:
|
||||
check_job_status()
|
||||
assert e.value.message == "Job(s) ['{}'] have not completed.".format(str(job.id))
|
||||
check_job_status()
|
||||
|
||||
mock_celery.assert_called_once_with(
|
||||
name=TaskNames.PROCESS_INCOMPLETE_JOBS,
|
||||
args=([str(job.id)],),
|
||||
[[str(job.id)]],
|
||||
queue=QueueNames.JOBS
|
||||
)
|
||||
|
||||
|
||||
def test_check_job_status_task_raises_job_incomplete_error_for_multiple_jobs(mocker, sample_template):
|
||||
mock_celery = mocker.patch('app.celery.tasks.notify_celery.send_task')
|
||||
def test_check_job_status_task_calls_process_incomplete_jobs_for_multiple_jobs(mocker, sample_template):
|
||||
mock_celery = mocker.patch('app.celery.tasks.process_incomplete_jobs.apply_async')
|
||||
job = create_job(template=sample_template, notification_count=3,
|
||||
created_at=datetime.utcnow() - timedelta(hours=2),
|
||||
scheduled_for=datetime.utcnow() - timedelta(minutes=31),
|
||||
@@ -189,20 +184,16 @@ def test_check_job_status_task_raises_job_incomplete_error_for_multiple_jobs(moc
|
||||
scheduled_for=datetime.utcnow() - timedelta(minutes=31),
|
||||
processing_started=datetime.utcnow() - timedelta(minutes=31),
|
||||
job_status=JOB_STATUS_IN_PROGRESS)
|
||||
with pytest.raises(expected_exception=JobIncompleteError) as e:
|
||||
check_job_status()
|
||||
assert str(job.id) in e.value.message
|
||||
assert str(job_2.id) in e.value.message
|
||||
check_job_status()
|
||||
|
||||
mock_celery.assert_called_once_with(
|
||||
name=TaskNames.PROCESS_INCOMPLETE_JOBS,
|
||||
args=([str(job.id), str(job_2.id)],),
|
||||
[[str(job.id), str(job_2.id)]],
|
||||
queue=QueueNames.JOBS
|
||||
)
|
||||
|
||||
|
||||
def test_check_job_status_task_only_sends_old_tasks(mocker, sample_template):
|
||||
mock_celery = mocker.patch('app.celery.tasks.notify_celery.send_task')
|
||||
mock_celery = mocker.patch('app.celery.tasks.process_incomplete_jobs.apply_async')
|
||||
job = create_job(
|
||||
template=sample_template,
|
||||
notification_count=3,
|
||||
@@ -211,28 +202,24 @@ def test_check_job_status_task_only_sends_old_tasks(mocker, sample_template):
|
||||
processing_started=datetime.utcnow() - timedelta(minutes=31),
|
||||
job_status=JOB_STATUS_IN_PROGRESS
|
||||
)
|
||||
job_2 = create_job(
|
||||
create_job(
|
||||
template=sample_template,
|
||||
notification_count=3,
|
||||
created_at=datetime.utcnow() - timedelta(minutes=31),
|
||||
processing_started=datetime.utcnow() - timedelta(minutes=29),
|
||||
job_status=JOB_STATUS_IN_PROGRESS
|
||||
)
|
||||
with pytest.raises(expected_exception=JobIncompleteError) as e:
|
||||
check_job_status()
|
||||
assert str(job.id) in e.value.message
|
||||
assert str(job_2.id) not in e.value.message
|
||||
check_job_status()
|
||||
|
||||
# job 2 not in celery task
|
||||
mock_celery.assert_called_once_with(
|
||||
name=TaskNames.PROCESS_INCOMPLETE_JOBS,
|
||||
args=([str(job.id)],),
|
||||
[[str(job.id)]],
|
||||
queue=QueueNames.JOBS
|
||||
)
|
||||
|
||||
|
||||
def test_check_job_status_task_sets_jobs_to_error(mocker, sample_template):
|
||||
mock_celery = mocker.patch('app.celery.tasks.notify_celery.send_task')
|
||||
mock_celery = mocker.patch('app.celery.tasks.process_incomplete_jobs.apply_async')
|
||||
job = create_job(
|
||||
template=sample_template,
|
||||
notification_count=3,
|
||||
@@ -248,15 +235,11 @@ def test_check_job_status_task_sets_jobs_to_error(mocker, sample_template):
|
||||
processing_started=datetime.utcnow() - timedelta(minutes=29),
|
||||
job_status=JOB_STATUS_IN_PROGRESS
|
||||
)
|
||||
with pytest.raises(expected_exception=JobIncompleteError) as e:
|
||||
check_job_status()
|
||||
assert str(job.id) in e.value.message
|
||||
assert str(job_2.id) not in e.value.message
|
||||
check_job_status()
|
||||
|
||||
# job 2 not in celery task
|
||||
mock_celery.assert_called_once_with(
|
||||
name=TaskNames.PROCESS_INCOMPLETE_JOBS,
|
||||
args=([str(job.id)],),
|
||||
[[str(job.id)]],
|
||||
queue=QueueNames.JOBS
|
||||
)
|
||||
assert job.job_status == JOB_STATUS_ERROR
|
||||
|
||||
@@ -8,7 +8,7 @@ def app_for_test():
|
||||
import flask
|
||||
from flask import Blueprint
|
||||
from app.authentication.auth import AuthError
|
||||
from app.v2.errors import BadRequestError, TooManyRequestsError, JobIncompleteError
|
||||
from app.v2.errors import BadRequestError, TooManyRequestsError
|
||||
from app import init_app
|
||||
|
||||
app = flask.Flask(__name__)
|
||||
@@ -42,10 +42,6 @@ def app_for_test():
|
||||
def raising_data_error():
|
||||
raise DataError("There was a db problem", "params", "orig")
|
||||
|
||||
@blue.route("raise_job_incomplete_error", methods=["GET"])
|
||||
def raising_job_incomplete_error():
|
||||
raise JobIncompleteError("Raising job incomplete error")
|
||||
|
||||
@blue.route("raise_exception", methods=["GET"])
|
||||
def raising_exception():
|
||||
raise AssertionError("Raising any old exception")
|
||||
@@ -114,16 +110,6 @@ def test_data_errors(app_for_test):
|
||||
"errors": [{"error": "DataError", "message": "No result found"}]}
|
||||
|
||||
|
||||
def test_job_incomplete_errors(app_for_test):
|
||||
with app_for_test.test_request_context():
|
||||
with app_for_test.test_client() as client:
|
||||
response = client.get(url_for('v2_under_test.raising_job_incomplete_error'))
|
||||
assert response.status_code == 500
|
||||
error = response.json
|
||||
assert error == {"status_code": 500,
|
||||
"errors": [{"error": "JobIncompleteError", "message": "Raising job incomplete error"}]}
|
||||
|
||||
|
||||
def test_internal_server_error_handler(app_for_test):
|
||||
with app_for_test.test_request_context():
|
||||
with app_for_test.test_client() as client:
|
||||
|
||||
Reference in New Issue
Block a user