mirror of
https://github.com/GSA/notifications-api.git
synced 2026-05-08 01:58:01 -04:00
@@ -45,13 +45,15 @@ from app.dao.provider_details_dao import (
|
||||
dao_toggle_sms_provider
|
||||
)
|
||||
from app.dao.users_dao import delete_codes_older_created_more_than_a_day_ago
|
||||
from app.dao.jobs_dao import dao_update_job
|
||||
from app.models import (
|
||||
Job,
|
||||
Notification,
|
||||
NOTIFICATION_SENDING,
|
||||
LETTER_TYPE,
|
||||
JOB_STATUS_IN_PROGRESS,
|
||||
JOB_STATUS_READY_TO_SEND
|
||||
JOB_STATUS_READY_TO_SEND,
|
||||
JOB_STATUS_ERROR
|
||||
)
|
||||
from app.notifications.process_notifications import send_notification_to_queue
|
||||
from app.celery.tasks import (
|
||||
@@ -442,7 +444,14 @@ def check_job_status():
|
||||
and_(thirty_five_minutes_ago < Job.processing_started, Job.processing_started < thirty_minutes_ago)
|
||||
).order_by(Job.processing_started).all()
|
||||
|
||||
job_ids = [str(x.id) for x in jobs_not_complete_after_30_minutes]
|
||||
# temporarily mark them as ERROR so that they don't get picked up by future check_job_status tasks
|
||||
# if they haven't been re-processed in time.
|
||||
job_ids = []
|
||||
for job in jobs_not_complete_after_30_minutes:
|
||||
job.job_status = JOB_STATUS_ERROR
|
||||
dao_update_job(job)
|
||||
job_ids.append(str(job.id))
|
||||
|
||||
if job_ids:
|
||||
notify_celery.send_task(
|
||||
name=TaskNames.PROCESS_INCOMPLETE_JOBS,
|
||||
|
||||
@@ -550,6 +550,14 @@ def send_inbound_sms_to_service(self, inbound_sms_id, service_id):
|
||||
@notify_celery.task(name='process-incomplete-jobs')
|
||||
@statsd(namespace="tasks")
|
||||
def process_incomplete_jobs(job_ids):
|
||||
jobs = [dao_get_job_by_id(job_id) for job_id in job_ids]
|
||||
|
||||
# reset the processing start time so that the check_job_status scheduled task doesn't pick this job up again
|
||||
for job in jobs:
|
||||
job.job_status = JOB_STATUS_IN_PROGRESS
|
||||
job.processing_started = datetime.utcnow()
|
||||
dao_update_job(job)
|
||||
|
||||
current_app.logger.info("Resuming Job(s) {}".format(job_ids))
|
||||
for job_id in job_ids:
|
||||
process_incomplete_job(job_id)
|
||||
|
||||
@@ -53,6 +53,7 @@ from app.models import (
|
||||
JOB_STATUS_READY_TO_SEND,
|
||||
JOB_STATUS_IN_PROGRESS,
|
||||
JOB_STATUS_SENT_TO_DVLA,
|
||||
JOB_STATUS_ERROR,
|
||||
LETTER_TYPE,
|
||||
SMS_TYPE
|
||||
)
|
||||
@@ -896,6 +897,68 @@ def test_check_job_status_task_raises_job_incomplete_error_for_multiple_jobs(moc
|
||||
)
|
||||
|
||||
|
||||
def test_check_job_status_task_only_sends_old_tasks(mocker, sample_template):
|
||||
mock_celery = mocker.patch('app.celery.tasks.notify_celery.send_task')
|
||||
job = create_job(
|
||||
template=sample_template,
|
||||
notification_count=3,
|
||||
created_at=datetime.utcnow() - timedelta(hours=2),
|
||||
scheduled_for=datetime.utcnow() - timedelta(minutes=31),
|
||||
processing_started=datetime.utcnow() - timedelta(minutes=31),
|
||||
job_status=JOB_STATUS_IN_PROGRESS
|
||||
)
|
||||
job_2 = create_job(
|
||||
template=sample_template,
|
||||
notification_count=3,
|
||||
created_at=datetime.utcnow() - timedelta(minutes=31),
|
||||
processing_started=datetime.utcnow() - timedelta(minutes=29),
|
||||
job_status=JOB_STATUS_IN_PROGRESS
|
||||
)
|
||||
with pytest.raises(expected_exception=JobIncompleteError) as e:
|
||||
check_job_status()
|
||||
assert str(job.id) in e.value.message
|
||||
assert str(job_2.id) not in e.value.message
|
||||
|
||||
# job 2 not in celery task
|
||||
mock_celery.assert_called_once_with(
|
||||
name=TaskNames.PROCESS_INCOMPLETE_JOBS,
|
||||
args=([str(job.id)],),
|
||||
queue=QueueNames.JOBS
|
||||
)
|
||||
|
||||
|
||||
def test_check_job_status_task_sets_jobs_to_error(mocker, sample_template):
|
||||
mock_celery = mocker.patch('app.celery.tasks.notify_celery.send_task')
|
||||
job = create_job(
|
||||
template=sample_template,
|
||||
notification_count=3,
|
||||
created_at=datetime.utcnow() - timedelta(hours=2),
|
||||
scheduled_for=datetime.utcnow() - timedelta(minutes=31),
|
||||
processing_started=datetime.utcnow() - timedelta(minutes=31),
|
||||
job_status=JOB_STATUS_IN_PROGRESS
|
||||
)
|
||||
job_2 = create_job(
|
||||
template=sample_template,
|
||||
notification_count=3,
|
||||
created_at=datetime.utcnow() - timedelta(minutes=31),
|
||||
processing_started=datetime.utcnow() - timedelta(minutes=29),
|
||||
job_status=JOB_STATUS_IN_PROGRESS
|
||||
)
|
||||
with pytest.raises(expected_exception=JobIncompleteError) as e:
|
||||
check_job_status()
|
||||
assert str(job.id) in e.value.message
|
||||
assert str(job_2.id) not in e.value.message
|
||||
|
||||
# job 2 not in celery task
|
||||
mock_celery.assert_called_once_with(
|
||||
name=TaskNames.PROCESS_INCOMPLETE_JOBS,
|
||||
args=([str(job.id)],),
|
||||
queue=QueueNames.JOBS
|
||||
)
|
||||
assert job.job_status == JOB_STATUS_ERROR
|
||||
assert job_2.job_status == JOB_STATUS_IN_PROGRESS
|
||||
|
||||
|
||||
def test_daily_stats_template_usage_by_month(notify_db, notify_db_session):
|
||||
notification_history = functools.partial(
|
||||
create_notification_history,
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import json
|
||||
import uuid
|
||||
from datetime import datetime, timedelta
|
||||
from unittest.mock import Mock
|
||||
from unittest.mock import Mock, call
|
||||
|
||||
import pytest
|
||||
import requests_mock
|
||||
@@ -38,6 +38,7 @@ from app.models import (
|
||||
KEY_TYPE_TEAM,
|
||||
KEY_TYPE_TEST,
|
||||
JOB_STATUS_FINISHED,
|
||||
JOB_STATUS_ERROR,
|
||||
JOB_STATUS_IN_PROGRESS,
|
||||
LETTER_TYPE,
|
||||
SMS_TYPE
|
||||
@@ -1346,7 +1347,7 @@ def test_process_incomplete_job_sms(mocker, sample_template):
|
||||
created_at=datetime.utcnow() - timedelta(hours=2),
|
||||
scheduled_for=datetime.utcnow() - timedelta(minutes=31),
|
||||
processing_started=datetime.utcnow() - timedelta(minutes=31),
|
||||
job_status=JOB_STATUS_IN_PROGRESS)
|
||||
job_status=JOB_STATUS_ERROR)
|
||||
|
||||
create_notification(sample_template, job, 0)
|
||||
create_notification(sample_template, job, 1)
|
||||
@@ -1371,7 +1372,7 @@ def test_process_incomplete_job_with_notifications_all_sent(mocker, sample_templ
|
||||
created_at=datetime.utcnow() - timedelta(hours=2),
|
||||
scheduled_for=datetime.utcnow() - timedelta(minutes=31),
|
||||
processing_started=datetime.utcnow() - timedelta(minutes=31),
|
||||
job_status=JOB_STATUS_IN_PROGRESS)
|
||||
job_status=JOB_STATUS_ERROR)
|
||||
|
||||
create_notification(sample_template, job, 0)
|
||||
create_notification(sample_template, job, 1)
|
||||
@@ -1404,7 +1405,7 @@ def test_process_incomplete_jobs_sms(mocker, sample_template):
|
||||
created_at=datetime.utcnow() - timedelta(hours=2),
|
||||
scheduled_for=datetime.utcnow() - timedelta(minutes=31),
|
||||
processing_started=datetime.utcnow() - timedelta(minutes=31),
|
||||
job_status=JOB_STATUS_IN_PROGRESS)
|
||||
job_status=JOB_STATUS_ERROR)
|
||||
create_notification(sample_template, job, 0)
|
||||
create_notification(sample_template, job, 1)
|
||||
create_notification(sample_template, job, 2)
|
||||
@@ -1415,7 +1416,7 @@ def test_process_incomplete_jobs_sms(mocker, sample_template):
|
||||
created_at=datetime.utcnow() - timedelta(hours=2),
|
||||
scheduled_for=datetime.utcnow() - timedelta(minutes=31),
|
||||
processing_started=datetime.utcnow() - timedelta(minutes=31),
|
||||
job_status=JOB_STATUS_IN_PROGRESS)
|
||||
job_status=JOB_STATUS_ERROR)
|
||||
|
||||
create_notification(sample_template, job2, 0)
|
||||
create_notification(sample_template, job2, 1)
|
||||
@@ -1446,7 +1447,7 @@ def test_process_incomplete_jobs_no_notifications_added(mocker, sample_template)
|
||||
created_at=datetime.utcnow() - timedelta(hours=2),
|
||||
scheduled_for=datetime.utcnow() - timedelta(minutes=31),
|
||||
processing_started=datetime.utcnow() - timedelta(minutes=31),
|
||||
job_status=JOB_STATUS_IN_PROGRESS)
|
||||
job_status=JOB_STATUS_ERROR)
|
||||
|
||||
assert Notification.query.filter(Notification.job_id == job.id).count() == 0
|
||||
|
||||
@@ -1490,7 +1491,7 @@ def test_process_incomplete_job_email(mocker, sample_email_template):
|
||||
created_at=datetime.utcnow() - timedelta(hours=2),
|
||||
scheduled_for=datetime.utcnow() - timedelta(minutes=31),
|
||||
processing_started=datetime.utcnow() - timedelta(minutes=31),
|
||||
job_status=JOB_STATUS_IN_PROGRESS)
|
||||
job_status=JOB_STATUS_ERROR)
|
||||
|
||||
create_notification(sample_email_template, job, 0)
|
||||
create_notification(sample_email_template, job, 1)
|
||||
@@ -1514,7 +1515,7 @@ def test_process_incomplete_job_letter(mocker, sample_letter_template):
|
||||
created_at=datetime.utcnow() - timedelta(hours=2),
|
||||
scheduled_for=datetime.utcnow() - timedelta(minutes=31),
|
||||
processing_started=datetime.utcnow() - timedelta(minutes=31),
|
||||
job_status=JOB_STATUS_IN_PROGRESS)
|
||||
job_status=JOB_STATUS_ERROR)
|
||||
|
||||
create_notification(sample_letter_template, job, 0)
|
||||
create_notification(sample_letter_template, job, 1)
|
||||
@@ -1524,3 +1525,29 @@ def test_process_incomplete_job_letter(mocker, sample_letter_template):
|
||||
process_incomplete_job(str(job.id))
|
||||
|
||||
assert mock_letter_saver.call_count == 8
|
||||
|
||||
|
||||
@freeze_time('2017-01-01')
|
||||
def test_process_incomplete_jobs_sets_status_to_in_progress_and_resets_processing_started_time(mocker, sample_template):
|
||||
mock_process_incomplete_job = mocker.patch('app.celery.tasks.process_incomplete_job')
|
||||
|
||||
job1 = create_job(
|
||||
sample_template,
|
||||
processing_started=datetime.utcnow() - timedelta(minutes=30),
|
||||
job_status=JOB_STATUS_ERROR
|
||||
)
|
||||
job2 = create_job(
|
||||
sample_template,
|
||||
processing_started=datetime.utcnow() - timedelta(minutes=31),
|
||||
job_status=JOB_STATUS_ERROR
|
||||
)
|
||||
|
||||
process_incomplete_jobs([str(job1.id), str(job2.id)])
|
||||
|
||||
assert job1.job_status == JOB_STATUS_IN_PROGRESS
|
||||
assert job1.processing_started == datetime.utcnow()
|
||||
|
||||
assert job2.job_status == JOB_STATUS_IN_PROGRESS
|
||||
assert job2.processing_started == datetime.utcnow()
|
||||
|
||||
assert mock_process_incomplete_job.mock_calls == [call(str(job1.id)), call(str(job2.id))]
|
||||
|
||||
Reference in New Issue
Block a user