Merge pull request #1502 from GSA/API-1466_Fix_database_inserts

API-1466 - Fixing database IntegrityError failures
This commit is contained in:
Kenneth Kehl
2024-12-26 08:08:12 -08:00
committed by GitHub
4 changed files with 42 additions and 34 deletions

View File

@@ -174,9 +174,7 @@ def check_for_missing_rows_in_completed_jobs():
for row_to_process in missing_rows: for row_to_process in missing_rows:
row = recipient_csv[row_to_process.missing_row] row = recipient_csv[row_to_process.missing_row]
current_app.logger.info( current_app.logger.info(
"Processing missing row: {} for job: {}".format( f"Processing missing row: {row_to_process.missing_row} for job: {job.id}"
row_to_process.missing_row, job.id
)
) )
process_row(row, template, job, job.service, sender_id=sender_id) process_row(row, template, job, job.service, sender_id=sender_id)

View File

@@ -24,6 +24,7 @@ from app.enums import JobStatus, KeyType, NotificationType
from app.errors import TotalRequestsError from app.errors import TotalRequestsError
from app.notifications.process_notifications import ( from app.notifications.process_notifications import (
get_notification, get_notification,
notification_exists,
persist_notification, persist_notification,
) )
from app.notifications.validators import check_service_over_total_message_limit from app.notifications.validators import check_service_over_total_message_limit
@@ -39,9 +40,7 @@ def process_job(job_id, sender_id=None):
start = utc_now() start = utc_now()
job = dao_get_job_by_id(job_id) job = dao_get_job_by_id(job_id)
current_app.logger.info( current_app.logger.info(
"Starting process-job task for job id {} with status: {}".format( f"Starting process-job task for job id {job_id} with status: {job.job_status}"
job_id, job.job_status
)
) )
if job.job_status != JobStatus.PENDING: if job.job_status != JobStatus.PENDING:
@@ -57,7 +56,7 @@ def process_job(job_id, sender_id=None):
job.job_status = JobStatus.CANCELLED job.job_status = JobStatus.CANCELLED
dao_update_job(job) dao_update_job(job)
current_app.logger.warning( current_app.logger.warning(
"Job {} has been cancelled, service {} is inactive".format( f"Job {job_id} has been cancelled, service {service.id} is inactive".format(
job_id, service.id job_id, service.id
) )
) )
@@ -71,9 +70,7 @@ def process_job(job_id, sender_id=None):
) )
current_app.logger.info( current_app.logger.info(
"Starting job {} processing {} notifications".format( f"Starting job {job_id} processing {job.notification_count} notifications"
job_id, job.notification_count
)
) )
# notify-api-1495 we are going to sleep periodically to give other # notify-api-1495 we are going to sleep periodically to give other
@@ -229,22 +226,29 @@ def save_sms(self, service_id, notification_id, encrypted_notification, sender_i
job = dao_get_job_by_id(job_id) job = dao_get_job_by_id(job_id)
created_by_id = job.created_by_id created_by_id = job.created_by_id
saved_notification = persist_notification( try:
template_id=notification["template"], saved_notification = persist_notification(
template_version=notification["template_version"], template_id=notification["template"],
recipient=notification["to"], template_version=notification["template_version"],
service=service, recipient=notification["to"],
personalisation=notification.get("personalisation"), service=service,
notification_type=NotificationType.SMS, personalisation=notification.get("personalisation"),
api_key_id=None, notification_type=NotificationType.SMS,
key_type=KeyType.NORMAL, api_key_id=None,
created_at=utc_now(), key_type=KeyType.NORMAL,
created_by_id=created_by_id, created_at=utc_now(),
job_id=notification.get("job", None), created_by_id=created_by_id,
job_row_number=notification.get("row_number", None), job_id=notification.get("job", None),
notification_id=notification_id, job_row_number=notification.get("row_number", None),
reply_to_text=reply_to_text, notification_id=notification_id,
) reply_to_text=reply_to_text,
)
except IntegrityError:
if notification_exists(notification_id):
saved_notification = get_notification(notification_id)
else:
raise
# Kick off sns process in provider_tasks.py # Kick off sns process in provider_tasks.py
sn = saved_notification sn = saved_notification
@@ -258,11 +262,8 @@ def save_sms(self, service_id, notification_id, encrypted_notification, sender_i
) )
current_app.logger.debug( current_app.logger.debug(
"SMS {} created at {} for job {}".format( f"SMS {saved_notification.id} created at {saved_notification.created_at} "
saved_notification.id, f"for job {notification.get('job', None)}"
saved_notification.created_at,
notification.get("job", None),
)
) )
except SQLAlchemyError as e: except SQLAlchemyError as e:

View File

@@ -65,6 +65,12 @@ def dao_get_last_date_template_was_used(template_id, service_id):
return last_date return last_date
def dao_notification_exists(notification_id) -> bool:
stmt = select(Notification).where(Notification.id == notification_id)
result = db.session.execute(stmt).scalar()
return result is not None
@autocommit @autocommit
def dao_create_notification(notification): def dao_create_notification(notification):
if not notification.id: if not notification.id:
@@ -86,9 +92,7 @@ def dao_create_notification(notification):
notification.normalised_to = "1" notification.normalised_to = "1"
# notify-api-1454 insert only if it doesn't exist # notify-api-1454 insert only if it doesn't exist
stmt = select(Notification).where(Notification.id == notification.id) if not dao_notification_exists(notification.id):
result = db.session.execute(stmt).scalar()
if result is None:
db.session.add(notification) db.session.add(notification)

View File

@@ -8,6 +8,7 @@ from app.config import QueueNames
from app.dao.notifications_dao import ( from app.dao.notifications_dao import (
dao_create_notification, dao_create_notification,
dao_delete_notifications_by_id, dao_delete_notifications_by_id,
dao_notification_exists,
get_notification_by_id, get_notification_by_id,
) )
from app.enums import KeyType, NotificationStatus, NotificationType from app.enums import KeyType, NotificationStatus, NotificationType
@@ -153,6 +154,10 @@ def persist_notification(
return notification return notification
def notification_exists(notification_id):
return dao_notification_exists(notification_id)
def send_notification_to_queue_detached( def send_notification_to_queue_detached(
key_type, notification_type, notification_id, queue=None key_type, notification_type, notification_id, queue=None
): ):