mirror of
https://github.com/GSA/notifications-api.git
synced 2025-12-24 09:21:39 -05:00
The statsd code was added to the utils library a while ago, uses the statsd from the util library and therefore consolidates the code into once place.
74 lines
2.9 KiB
Python
74 lines
2.9 KiB
Python
from celery.signals import worker_process_shutdown
|
|
from notifications_utils.statsd_decorators import statsd
|
|
from sqlalchemy.exc import SQLAlchemyError
|
|
|
|
from app import notify_celery
|
|
from flask import current_app
|
|
|
|
from app.dao.statistics_dao import (
|
|
create_or_update_job_sending_statistics,
|
|
update_job_stats_outcome_count
|
|
)
|
|
from app.dao.notifications_dao import get_notification_by_id
|
|
from app.models import NOTIFICATION_STATUS_TYPES_COMPLETED
|
|
from app.config import QueueNames
|
|
|
|
|
|
def create_initial_notification_statistic_tasks(notification):
|
|
if notification.job_id and notification.status:
|
|
record_initial_job_statistics.apply_async((str(notification.id),), queue=QueueNames.STATISTICS)
|
|
|
|
|
|
def create_outcome_notification_statistic_tasks(notification):
|
|
if notification.job_id and notification.status in NOTIFICATION_STATUS_TYPES_COMPLETED:
|
|
record_outcome_job_statistics.apply_async((str(notification.id),), queue=QueueNames.STATISTICS)
|
|
|
|
|
|
@worker_process_shutdown.connect
|
|
def worker_process_shutdown(sender, signal, pid, exitcode):
|
|
current_app.logger.info('Statistics worker shutdown: PID: {} Exitcode: {}'.format(pid, exitcode))
|
|
|
|
|
|
@notify_celery.task(bind=True, name='record_initial_job_statistics', max_retries=20, default_retry_delay=10)
|
|
@statsd(namespace="tasks")
|
|
def record_initial_job_statistics(self, notification_id):
|
|
notification = None
|
|
try:
|
|
notification = get_notification_by_id(notification_id)
|
|
if notification:
|
|
create_or_update_job_sending_statistics(notification)
|
|
else:
|
|
raise SQLAlchemyError("Failed to find notification with id {}".format(notification_id))
|
|
except SQLAlchemyError as e:
|
|
current_app.logger.exception(e)
|
|
self.retry(queue=QueueNames.RETRY)
|
|
except self.MaxRetriesExceededError:
|
|
current_app.logger.error(
|
|
"RETRY FAILED: task record_initial_job_statistics failed for notification {}".format(
|
|
notification.id if notification else "missing ID"
|
|
)
|
|
)
|
|
|
|
|
|
@notify_celery.task(bind=True, name='record_outcome_job_statistics', max_retries=20, default_retry_delay=10)
|
|
@statsd(namespace="tasks")
|
|
def record_outcome_job_statistics(self, notification_id):
|
|
notification = None
|
|
try:
|
|
notification = get_notification_by_id(notification_id)
|
|
if notification:
|
|
updated_count = update_job_stats_outcome_count(notification)
|
|
if updated_count == 0:
|
|
self.retry(queue=QueueNames.RETRY)
|
|
else:
|
|
raise SQLAlchemyError("Failed to find notification with id {}".format(notification_id))
|
|
except SQLAlchemyError as e:
|
|
current_app.logger.exception(e)
|
|
self.retry(queue=QueueNames.RETRY)
|
|
except self.MaxRetriesExceededError:
|
|
current_app.logger.error(
|
|
"RETRY FAILED: task update_job_stats_outcome_count failed for notification {}".format(
|
|
notification.id if notification else "missing ID"
|
|
)
|
|
)
|