merge from main

This commit is contained in:
Kenneth Kehl
2025-01-02 12:36:59 -08:00
4 changed files with 58 additions and 48 deletions

View File

@@ -100,27 +100,29 @@ def check_job_status():
select select
from jobs from jobs
where job_status == 'in progress' where job_status == 'in progress'
and processing started between 30 and 35 minutes ago and processing started some time ago
OR where the job_status == 'pending' OR where the job_status == 'pending'
and the job scheduled_for timestamp is between 30 and 35 minutes ago. and the job scheduled_for timestamp is some time ago.
if any results then if any results then
update the job_status to 'error' update the job_status to 'error'
process the rows in the csv that are missing (in another task) just do the check here. process the rows in the csv that are missing (in another task) just do the check here.
""" """
thirty_minutes_ago = utc_now() - timedelta(minutes=30) START_MINUTES = 245
thirty_five_minutes_ago = utc_now() - timedelta(minutes=35) END_MINUTES = 240
end_minutes_ago = utc_now() - timedelta(minutes=END_MINUTES)
start_minutes_ago = utc_now() - timedelta(minutes=START_MINUTES)
incomplete_in_progress_jobs = Job.query.filter( incomplete_in_progress_jobs = Job.query.filter(
Job.job_status == JobStatus.IN_PROGRESS, Job.job_status == JobStatus.IN_PROGRESS,
between(Job.processing_started, thirty_five_minutes_ago, thirty_minutes_ago), between(Job.processing_started, start_minutes_ago, end_minutes_ago),
) )
incomplete_pending_jobs = Job.query.filter( incomplete_pending_jobs = Job.query.filter(
Job.job_status == JobStatus.PENDING, Job.job_status == JobStatus.PENDING,
Job.scheduled_for.isnot(None), Job.scheduled_for.isnot(None),
between(Job.scheduled_for, thirty_five_minutes_ago, thirty_minutes_ago), between(Job.scheduled_for, start_minutes_ago, end_minutes_ago),
) )
jobs_not_complete_after_30_minutes = ( jobs_not_complete_after_allotted_time = (
incomplete_in_progress_jobs.union(incomplete_pending_jobs) incomplete_in_progress_jobs.union(incomplete_pending_jobs)
.order_by(Job.processing_started, Job.scheduled_for) .order_by(Job.processing_started, Job.scheduled_for)
.all() .all()
@@ -129,7 +131,7 @@ def check_job_status():
# temporarily mark them as ERROR so that they don't get picked up by future check_job_status tasks # temporarily mark them as ERROR so that they don't get picked up by future check_job_status tasks
# if they haven't been re-processed in time. # if they haven't been re-processed in time.
job_ids = [] job_ids = []
for job in jobs_not_complete_after_30_minutes: for job in jobs_not_complete_after_allotted_time:
job.job_status = JobStatus.ERROR job.job_status = JobStatus.ERROR
dao_update_job(job) dao_update_job(job)
job_ids.append(str(job.id)) job_ids.append(str(job.id))

View File

@@ -24,7 +24,6 @@ from app.enums import JobStatus, KeyType, NotificationType
from app.errors import TotalRequestsError from app.errors import TotalRequestsError
from app.notifications.process_notifications import ( from app.notifications.process_notifications import (
get_notification, get_notification,
notification_exists,
persist_notification, persist_notification,
) )
from app.notifications.validators import check_service_over_total_message_limit from app.notifications.validators import check_service_over_total_message_limit
@@ -214,9 +213,7 @@ def save_sms(self, service_id, notification_id, encrypted_notification, sender_i
f"service not allowed to send for job_id {notification.get('job', None)}, aborting" f"service not allowed to send for job_id {notification.get('job', None)}, aborting"
) )
) )
current_app.logger.debug( current_app.logger.debug(f"SMS {notification_id} failed as restricted service")
"SMS {} failed as restricted service".format(notification_id)
)
return return
try: try:
@@ -244,11 +241,12 @@ def save_sms(self, service_id, notification_id, encrypted_notification, sender_i
reply_to_text=reply_to_text, reply_to_text=reply_to_text,
) )
except IntegrityError: except IntegrityError:
if notification_exists(notification_id): current_app.logger.warning(
saved_notification = get_notification(notification_id) f"{NotificationType.SMS}: {notification_id} already exists."
)
else: # If we don't have the return statement here, we will fall through and end
raise # up retrying because IntegrityError is a subclass of SQLAlchemyError
return
# Kick off sns process in provider_tasks.py # Kick off sns process in provider_tasks.py
sn = saved_notification sn = saved_notification

View File

@@ -54,7 +54,7 @@ def _create_service_invite(invited_user, nonce, state):
data["invited_user_email"] = invited_user.email_address data["invited_user_email"] = invited_user.email_address
invite_redis_key = f"invite-data-{unquote(state)}" invite_redis_key = f"invite-data-{unquote(state)}"
redis_store.set(invite_redis_key, get_user_data_url_safe(data)) redis_store.set(invite_redis_key, get_user_data_url_safe(data), ex=2 * 24 * 60 * 60)
url = os.environ["LOGIN_DOT_GOV_REGISTRATION_URL"] url = os.environ["LOGIN_DOT_GOV_REGISTRATION_URL"]

View File

@@ -23,6 +23,8 @@ from notifications_utils.clients.zendesk.zendesk_client import NotifySupportTick
from tests.app import load_example_csv from tests.app import load_example_csv
from tests.app.db import create_job, create_notification, create_template from tests.app.db import create_job, create_notification, create_template
CHECK_JOB_STATUS_TOO_OLD_MINUTES = 241
def test_should_call_delete_codes_on_delete_verify_codes_task( def test_should_call_delete_codes_on_delete_verify_codes_task(
notify_db_session, mocker notify_db_session, mocker
@@ -108,8 +110,9 @@ def test_check_job_status_task_calls_process_incomplete_jobs(mocker, sample_temp
job = create_job( job = create_job(
template=sample_template, template=sample_template,
notification_count=3, notification_count=3,
created_at=utc_now() - timedelta(minutes=31), created_at=utc_now() - timedelta(minutes=CHECK_JOB_STATUS_TOO_OLD_MINUTES),
processing_started=utc_now() - timedelta(minutes=31), processing_started=utc_now()
- timedelta(minutes=CHECK_JOB_STATUS_TOO_OLD_MINUTES),
job_status=JobStatus.IN_PROGRESS, job_status=JobStatus.IN_PROGRESS,
) )
create_notification(template=sample_template, job=job) create_notification(template=sample_template, job=job)
@@ -125,9 +128,10 @@ def test_check_job_status_task_calls_process_incomplete_jobs_when_scheduled_job_
job = create_job( job = create_job(
template=sample_template, template=sample_template,
notification_count=3, notification_count=3,
created_at=utc_now() - timedelta(hours=2), created_at=utc_now() - timedelta(hours=5),
scheduled_for=utc_now() - timedelta(minutes=31), scheduled_for=utc_now() - timedelta(minutes=CHECK_JOB_STATUS_TOO_OLD_MINUTES),
processing_started=utc_now() - timedelta(minutes=31), processing_started=utc_now()
- timedelta(minutes=CHECK_JOB_STATUS_TOO_OLD_MINUTES),
job_status=JobStatus.IN_PROGRESS, job_status=JobStatus.IN_PROGRESS,
) )
check_job_status() check_job_status()
@@ -142,8 +146,8 @@ def test_check_job_status_task_calls_process_incomplete_jobs_for_pending_schedul
job = create_job( job = create_job(
template=sample_template, template=sample_template,
notification_count=3, notification_count=3,
created_at=utc_now() - timedelta(hours=2), created_at=utc_now() - timedelta(hours=5),
scheduled_for=utc_now() - timedelta(minutes=31), scheduled_for=utc_now() - timedelta(minutes=CHECK_JOB_STATUS_TOO_OLD_MINUTES),
job_status=JobStatus.PENDING, job_status=JobStatus.PENDING,
) )
@@ -175,17 +179,19 @@ def test_check_job_status_task_calls_process_incomplete_jobs_for_multiple_jobs(
job = create_job( job = create_job(
template=sample_template, template=sample_template,
notification_count=3, notification_count=3,
created_at=utc_now() - timedelta(hours=2), created_at=utc_now() - timedelta(hours=5),
scheduled_for=utc_now() - timedelta(minutes=31), scheduled_for=utc_now() - timedelta(minutes=CHECK_JOB_STATUS_TOO_OLD_MINUTES),
processing_started=utc_now() - timedelta(minutes=31), processing_started=utc_now()
- timedelta(minutes=CHECK_JOB_STATUS_TOO_OLD_MINUTES),
job_status=JobStatus.IN_PROGRESS, job_status=JobStatus.IN_PROGRESS,
) )
job_2 = create_job( job_2 = create_job(
template=sample_template, template=sample_template,
notification_count=3, notification_count=3,
created_at=utc_now() - timedelta(hours=2), created_at=utc_now() - timedelta(hours=5),
scheduled_for=utc_now() - timedelta(minutes=31), scheduled_for=utc_now() - timedelta(minutes=CHECK_JOB_STATUS_TOO_OLD_MINUTES),
processing_started=utc_now() - timedelta(minutes=31), processing_started=utc_now()
- timedelta(minutes=CHECK_JOB_STATUS_TOO_OLD_MINUTES),
job_status=JobStatus.IN_PROGRESS, job_status=JobStatus.IN_PROGRESS,
) )
check_job_status() check_job_status()
@@ -200,23 +206,24 @@ def test_check_job_status_task_only_sends_old_tasks(mocker, sample_template):
job = create_job( job = create_job(
template=sample_template, template=sample_template,
notification_count=3, notification_count=3,
created_at=utc_now() - timedelta(hours=2), created_at=utc_now() - timedelta(hours=5),
scheduled_for=utc_now() - timedelta(minutes=31), scheduled_for=utc_now() - timedelta(minutes=CHECK_JOB_STATUS_TOO_OLD_MINUTES),
processing_started=utc_now() - timedelta(minutes=31), processing_started=utc_now()
- timedelta(minutes=CHECK_JOB_STATUS_TOO_OLD_MINUTES),
job_status=JobStatus.IN_PROGRESS, job_status=JobStatus.IN_PROGRESS,
) )
create_job( create_job(
template=sample_template, template=sample_template,
notification_count=3, notification_count=3,
created_at=utc_now() - timedelta(minutes=31), created_at=utc_now() - timedelta(minutes=300),
processing_started=utc_now() - timedelta(minutes=29), processing_started=utc_now() - timedelta(minutes=239),
job_status=JobStatus.IN_PROGRESS, job_status=JobStatus.IN_PROGRESS,
) )
create_job( create_job(
template=sample_template, template=sample_template,
notification_count=3, notification_count=3,
created_at=utc_now() - timedelta(minutes=50), created_at=utc_now() - timedelta(minutes=300),
scheduled_for=utc_now() - timedelta(minutes=29), scheduled_for=utc_now() - timedelta(minutes=239),
job_status=JobStatus.PENDING, job_status=JobStatus.PENDING,
) )
check_job_status() check_job_status()
@@ -230,16 +237,17 @@ def test_check_job_status_task_sets_jobs_to_error(mocker, sample_template):
job = create_job( job = create_job(
template=sample_template, template=sample_template,
notification_count=3, notification_count=3,
created_at=utc_now() - timedelta(hours=2), created_at=utc_now() - timedelta(hours=5),
scheduled_for=utc_now() - timedelta(minutes=31), scheduled_for=utc_now() - timedelta(minutes=CHECK_JOB_STATUS_TOO_OLD_MINUTES),
processing_started=utc_now() - timedelta(minutes=31), processing_started=utc_now()
- timedelta(minutes=CHECK_JOB_STATUS_TOO_OLD_MINUTES),
job_status=JobStatus.IN_PROGRESS, job_status=JobStatus.IN_PROGRESS,
) )
job_2 = create_job( job_2 = create_job(
template=sample_template, template=sample_template,
notification_count=3, notification_count=3,
created_at=utc_now() - timedelta(minutes=31), created_at=utc_now() - timedelta(minutes=300),
processing_started=utc_now() - timedelta(minutes=29), processing_started=utc_now() - timedelta(minutes=239),
job_status=JobStatus.IN_PROGRESS, job_status=JobStatus.IN_PROGRESS,
) )
check_job_status() check_job_status()
@@ -311,16 +319,18 @@ def test_check_job_status_task_does_not_raise_error(sample_template):
create_job( create_job(
template=sample_template, template=sample_template,
notification_count=3, notification_count=3,
created_at=utc_now() - timedelta(hours=2), created_at=utc_now() - timedelta(hours=5),
scheduled_for=utc_now() - timedelta(minutes=31), scheduled_for=utc_now() - timedelta(minutes=CHECK_JOB_STATUS_TOO_OLD_MINUTES),
processing_started=utc_now() - timedelta(minutes=31), processing_started=utc_now()
- timedelta(minutes=CHECK_JOB_STATUS_TOO_OLD_MINUTES),
job_status=JobStatus.FINISHED, job_status=JobStatus.FINISHED,
) )
create_job( create_job(
template=sample_template, template=sample_template,
notification_count=3, notification_count=3,
created_at=utc_now() - timedelta(minutes=31), created_at=utc_now() - timedelta(minutes=CHECK_JOB_STATUS_TOO_OLD_MINUTES),
processing_started=utc_now() - timedelta(minutes=31), processing_started=utc_now()
- timedelta(minutes=CHECK_JOB_STATUS_TOO_OLD_MINUTES),
job_status=JobStatus.FINISHED, job_status=JobStatus.FINISHED,
) )