cleanup redis commands and flow

This commit is contained in:
Kenneth Kehl
2025-01-10 11:21:39 -08:00
parent bbf5bace20
commit 64a61f5d36
5 changed files with 77 additions and 18 deletions

View File

@@ -36,7 +36,7 @@ from app.dao.services_dao import (
)
from app.dao.users_dao import delete_codes_older_created_more_than_a_day_ago
from app.enums import JobStatus, NotificationType
from app.models import Job
from app.models import Job, Notification
from app.notifications.process_notifications import send_notification_to_queue
from app.utils import utc_now
from notifications_utils import aware_utcnow
@@ -292,13 +292,39 @@ def cleanup_delivery_receipts(self):
@notify_celery.task(bind=True, name="batch-insert-notifications")
def batch_insert_notifications(self):
current_app.logger.info("ENTER SCHEDULED TASK")
batch = []
with redis_store.pipeline:
notification = redis_store.lpop("notification_queue")
batch.append(json.loads(notification))
# with redis_store.pipeline():
# while redis_store.llen("message_queue") > 0:
# redis_store.lpop("message_queue")
# current_app.logger.info("EMPTY!")
# return
with redis_store.pipeline():
current_app.logger.info("PIPELINE")
# since this list is always growing, just grab what is available when
# this call is made and process that.
current_len = redis_store.llen("message_queue")
count = 0
while count < current_len:
count = count + 1
notification_bytes = redis_store.lpop("message_queue")
notification_dict = json.loads(notification_bytes.decode("utf-8"))
notification_dict["status"] = notification_dict.pop("notification_status")
notification_dict["created_at"] = utc_now()
notification = Notification(**notification_dict)
current_app.logger.info(
f"WHAT IS THIS NOTIFICATION {type(notification)} {notification}"
)
if notification is not None:
current_app.logger.info(
f"SCHEDULED adding notification {notification.id} to batch"
)
batch.append(notification)
try:
current_app.logger.info("GOING TO DO BATCH INSERT")
dao_batch_insert_notifications(batch)
except Exception as e:
current_app.logger.exception(f"Notification batch insert failed {e}")
for msg in batch:
redis_store.rpush("notification_queue", json.dumps(msg))
current_app.logger.exception(f"Notification batch insert failed {e}")