Add celery task to check job finished

- celery task called after countdown of 60 minutes from start of job processing
This commit is contained in:
Ken Tsang
2017-10-11 18:14:56 +01:00
parent 92508d3b96
commit c29fc8cfa4
5 changed files with 58 additions and 41 deletions

View File

@@ -1,5 +1,5 @@
import json
from datetime import (datetime)
from datetime import datetime
from collections import namedtuple
from flask import current_app
@@ -54,6 +54,7 @@ from app.models import (
from app.notifications.process_notifications import persist_notification
from app.service.utils import service_allowed_to_send_to
from app.statsd_decorators import statsd
from app.v2.errors import JobIncompleteError
from notifications_utils.s3 import s3upload
@@ -88,6 +89,9 @@ def process_job(job_id):
current_app.logger.info("Starting job {} processing {} notifications".format(job_id, job.notification_count))
if template.template_type != LETTER_TYPE:
check_job_status.apply_async([str(job.id)], queue=QueueNames.JOBS)
for row_number, recipient, personalisation in RecipientCSV(
s3.get_job_from_s3(str(service.id), str(job_id)),
template_type=template.template_type,
@@ -95,21 +99,14 @@ def process_job(job_id):
).enumerated_recipients_and_personalisation:
process_row(row_number, recipient, personalisation, template, job, service)
notification_total_in_db = dao_get_total_notifications_for_job_id(job.id)
if job.notification_count != notification_total_in_db:
current_app.logger.error("Job {} is missing {} notifications".format(
job.id, notification_total_in_db - notification_total_in_db))
job.job_status = JOB_STATUS_ERROR
else:
if template.template_type == LETTER_TYPE:
if service.research_mode:
update_job_to_sent_to_dvla.apply_async([str(job.id)], queue=QueueNames.RESEARCH_MODE)
else:
build_dvla_file.apply_async([str(job.id)], queue=QueueNames.JOBS)
current_app.logger.info("send job {} to build-dvla-file in the {} queue".format(job_id, QueueNames.JOBS))
if template.template_type == LETTER_TYPE:
if service.research_mode:
update_job_to_sent_to_dvla.apply_async([str(job.id)], queue=QueueNames.RESEARCH_MODE)
else:
job.job_status = JOB_STATUS_FINISHED
build_dvla_file.apply_async([str(job.id)], queue=QueueNames.JOBS)
current_app.logger.info("send job {} to build-dvla-file in the {} queue".format(job_id, QueueNames.JOBS))
else:
job.job_status = JOB_STATUS_FINISHED
finished = datetime.utcnow()
job.processing_started = start
@@ -333,6 +330,14 @@ def update_dvla_job_to_error(self, job_id):
current_app.logger.info("Updated {} job to {}".format(job_id, JOB_STATUS_ERROR))
@notify_celery.task(bind=True, name='check-job-status', countdown=3600)
@statsd(namespace="tasks")
def check_job_status(self, job_id):
job = dao_get_job_by_id(job_id)
if job.job_status != JOB_STATUS_FINISHED:
raise JobIncompleteError("Job {} did not complete".format(job_id))
@notify_celery.task(bind=True, name='update-letter-notifications-to-sent')
@statsd(namespace="tasks")
def update_letter_notifications_to_sent_to_dvla(self, notification_references):

View File

@@ -19,6 +19,7 @@ if os.environ.get('VCAP_SERVICES'):
class QueueNames(object):
CHECK_JOBS = 'check-job-status-tasks'
PERIODIC = 'periodic-tasks'
PRIORITY = 'priority-tasks'
DATABASE = 'database-tasks'
@@ -34,6 +35,7 @@ class QueueNames(object):
@staticmethod
def all_queues():
return [
QueueNames.CHECK_JOBS,
QueueNames.PRIORITY,
QueueNames.PERIODIC,
QueueNames.DATABASE,

View File

@@ -10,10 +10,11 @@ from app.errors import InvalidRequest
class JobIncompleteError(Exception):
def __init__(self, message):
self.message = message
self.status_code = 500
def to_dict_v2(self):
return {
'status_code': 500,
'status_code': self.status_code,
"errors": [
{
"error": 'JobIncompleteError',

View File

@@ -16,6 +16,7 @@ from app.celery import tasks
from app.celery.tasks import (
s3,
build_dvla_file,
check_job_status,
create_dvla_file_contents_for_job,
update_dvla_job_to_error,
process_job,
@@ -30,6 +31,7 @@ from app.dao import jobs_dao, services_dao
from app.models import (
EMAIL_TYPE,
JOB_STATUS_ERROR,
JOB_STATUS_FINISHED,
KEY_TYPE_NORMAL,
KEY_TYPE_TEAM,
KEY_TYPE_TEST,
@@ -39,6 +41,7 @@ from app.models import (
Job,
Notification
)
from app.v2.errors import JobIncompleteError
from tests.app import load_example_csv
from tests.app.conftest import (
@@ -96,6 +99,7 @@ def email_job_with_placeholders(notify_db, notify_db_session, sample_email_templ
@freeze_time("2016-01-01 11:09:00.061258")
def test_should_process_sms_job(sample_job, mocker):
mocker.patch('app.celery.tasks.check_job_status.apply_async')
mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('sms'))
mocker.patch('app.celery.tasks.send_sms.apply_async')
mocker.patch('app.encryption.encrypt', return_value="something_encrypted")
@@ -132,6 +136,7 @@ def test_should_not_process_sms_job_if_would_exceed_send_limits(notify_db,
service = create_sample_service(notify_db, notify_db_session, limit=9)
job = create_sample_job(notify_db, notify_db_session, service=service, notification_count=10)
mocker.patch('app.celery.tasks.check_job_status.apply_async')
mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_sms'))
mocker.patch('app.celery.tasks.process_row')
mocker.patch('app.celery.tasks.build_dvla_file')
@@ -153,6 +158,7 @@ def test_should_not_process_sms_job_if_would_exceed_send_limits_inc_today(notify
create_sample_notification(notify_db, notify_db_session, service=service, job=job)
mocker.patch('app.celery.tasks.check_job_status.apply_async')
mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('sms'))
mocker.patch('app.celery.tasks.process_row')
mocker.patch('app.celery.tasks.build_dvla_file')
@@ -173,6 +179,7 @@ def test_should_not_process_email_job_if_would_exceed_send_limits_inc_today(noti
create_sample_notification(notify_db, notify_db_session, service=service, job=job)
mocker.patch('app.celery.tasks.check_job_status.apply_async')
mocker.patch('app.celery.tasks.s3.get_job_from_s3')
mocker.patch('app.celery.tasks.process_row')
mocker.patch('app.celery.tasks.build_dvla_file')
@@ -192,6 +199,7 @@ def test_should_not_process_email_job_if_would_exceed_send_limits(notify_db, not
template = create_sample_email_template(notify_db, notify_db_session, service=service)
job = create_sample_job(notify_db, notify_db_session, service=service, template=template)
mocker.patch('app.celery.tasks.check_job_status.apply_async')
mocker.patch('app.celery.tasks.s3.get_job_from_s3')
mocker.patch('app.celery.tasks.process_row')
mocker.patch('app.celery.tasks.build_dvla_file')
@@ -208,6 +216,7 @@ def test_should_not_process_email_job_if_would_exceed_send_limits(notify_db, not
def test_should_not_process_job_if_already_pending(notify_db, notify_db_session, mocker):
job = create_sample_job(notify_db, notify_db_session, job_status='scheduled')
mocker.patch('app.celery.tasks.check_job_status.apply_async')
mocker.patch('app.celery.tasks.s3.get_job_from_s3')
mocker.patch('app.celery.tasks.process_row')
mocker.patch('app.celery.tasks.build_dvla_file')
@@ -227,6 +236,7 @@ def test_should_process_email_job_if_exactly_on_send_limits(notify_db,
template = create_sample_email_template(notify_db, notify_db_session, service=service)
job = create_sample_job(notify_db, notify_db_session, service=service, template=template, notification_count=10)
mocker.patch('app.celery.tasks.check_job_status.apply_async')
mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_email'))
mocker.patch('app.celery.tasks.send_email.apply_async')
mocker.patch('app.encryption.encrypt', return_value="something_encrypted")
@@ -254,6 +264,7 @@ def test_should_process_email_job_if_exactly_on_send_limits(notify_db,
def test_should_not_create_send_task_for_empty_file(sample_job, mocker):
mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('empty'))
mocker.patch('app.celery.tasks.send_sms.apply_async')
mocker.patch('app.celery.tasks.check_job_status.apply_async')
process_job(sample_job.id)
@@ -271,6 +282,7 @@ def test_should_process_email_job(email_job_with_placeholders, mocker):
email_csv = """email_address,name
test@test.com,foo
"""
mocker.patch('app.celery.tasks.check_job_status.apply_async')
mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=email_csv)
mocker.patch('app.celery.tasks.send_email.apply_async')
mocker.patch('app.encryption.encrypt', return_value="something_encrypted")
@@ -309,6 +321,7 @@ def test_should_process_letter_job(sample_letter_job, mocker):
process_row_mock = mocker.patch('app.celery.tasks.process_row')
mocker.patch('app.celery.tasks.create_uuid', return_value="uuid")
mocker.patch('app.celery.tasks.build_dvla_file')
mocker.patch('app.celery.tasks.check_job_status.apply_async')
process_job(sample_letter_job.id)
@@ -339,6 +352,7 @@ def test_should_process_letter_job(sample_letter_job, mocker):
def test_should_process_all_sms_job(sample_job_with_placeholdered_template,
mocker):
mocker.patch('app.celery.tasks.check_job_status.apply_async')
mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_sms'))
mocker.patch('app.celery.tasks.send_sms.apply_async')
mocker.patch('app.encryption.encrypt', return_value="something_encrypted")
@@ -360,29 +374,6 @@ def test_should_process_all_sms_job(sample_job_with_placeholdered_template,
assert job.job_status == 'finished'
def test_should_error_log_missing_notifications(
sample_job_with_placeholdered_template, mocker):
multiple_sms = load_example_csv('multiple_sms').strip()
num_phone_numbers_after_header = len(multiple_sms.split('\n')[1:])
sample_job_with_placeholdered_template.notification_count = num_phone_numbers_after_header
mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=multiple_sms)
mocker.patch('app.celery.tasks.send_sms.apply_async')
mocker.patch('app.encryption.encrypt', return_value="something_encrypted")
mocker.patch('app.celery.tasks.create_uuid', return_value="uuid")
# deliberately return wrong total notifications to trigger error log
mocker.patch(
'app.celery.tasks.dao_get_total_notifications_for_job_id',
return_value=num_phone_numbers_after_header - 1
)
error_log = mocker.patch('app.celery.tasks.current_app.logger.error')
process_job(sample_job_with_placeholdered_template.id)
job = jobs_dao.dao_get_job_by_id(sample_job_with_placeholdered_template.id)
assert job.job_status == 'error'
assert error_log.called
# -------------- process_row tests -------------- #
@@ -466,6 +457,7 @@ def test_should_put_send_sms_task_in_research_mode_queue_if_research_mode_servic
notification = _notification_json(template, to="+447234123123")
mocker.patch('app.celery.tasks.check_job_status.apply_async')
mocked_deliver_sms = mocker.patch('app.celery.provider_tasks.deliver_sms.apply_async')
notification_id = uuid.uuid4()
@@ -648,6 +640,7 @@ def test_should_not_build_dvla_file_in_research_mode_for_letter_job(
csv = """address_line_1,address_line_2,address_line_3,address_line_4,postcode,name
A1,A2,A3,A4,A_POST,Alice
"""
mocker.patch('app.celery.tasks.check_job_status.apply_async')
mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=csv)
mocker.patch('app.celery.tasks.update_job_to_sent_to_dvla.apply_async')
mocker.patch('app.celery.tasks.persist_letter.apply_async')
@@ -670,6 +663,7 @@ def test_should_update_job_to_sent_to_dvla_in_research_mode_for_letter_job(
csv = """address_line_1,address_line_2,address_line_3,address_line_4,postcode,name
A1,A2,A3,A4,A_POST,Alice
"""
mocker.patch('app.celery.tasks.check_job_status.apply_async')
mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=csv)
mock_update_job_task = mocker.patch('app.celery.tasks.update_job_to_sent_to_dvla.apply_async')
mocker.patch('app.celery.tasks.persist_letter.apply_async')
@@ -1229,3 +1223,17 @@ def test_send_inbound_sms_to_service_does_not_retries_if_request_returns_404(not
send_inbound_sms_to_service(inbound_sms.id, inbound_sms.service_id)
mocked.call_count == 0
def test_job_incomplete_raises_job_incomplete_error(sample_job):
assert sample_job.job_status != JOB_STATUS_FINISHED
with pytest.raises(JobIncompleteError) as e:
check_job_status(str(sample_job.id))
assert e.value.status_code == 500
assert e.value.message == 'Job {} did not complete'.format(sample_job.id)
def test_job_finished_does_not_raises_job_incomplete_error(sample_job):
sample_job.job_status = JOB_STATUS_FINISHED
check_job_status(str(sample_job.id))

View File

@@ -63,7 +63,7 @@ def test_cloudfoundry_config_has_different_defaults():
def test_queue_names_all_queues_correct():
# Need to ensure that all_queues() only returns queue names used in API
queues = QueueNames.all_queues()
assert len(queues) == 10
assert len(queues) == 11
assert set([
QueueNames.PRIORITY,
QueueNames.PERIODIC,
@@ -74,5 +74,6 @@ def test_queue_names_all_queues_correct():
QueueNames.STATISTICS,
QueueNames.JOBS,
QueueNames.RETRY,
QueueNames.NOTIFY
QueueNames.NOTIFY,
QueueNames.CHECK_JOBS,
]) == set(queues)