mirror of
https://github.com/GSA/notifications-api.git
synced 2025-12-22 00:11:16 -05:00
The JobStatistics table is going to be deleted. There are currently 3 tasks which use the JobStatistics model via the Statistics DAO, so we need to make sure that these tasks aren't being used before they are deleted in a separate PR. This commit deletes: * The `create_initial_notification_statistic_tasks` function which gets used to call the `record_initial_job_statistics` task. * The `create_outcome_notification_statistic_tasks` function which gets used to call the `record_outcome_job_statistics` task. * And the scheduling of the `timeout-job-statistics` scheduled task.
63 lines
2.4 KiB
Python
63 lines
2.4 KiB
Python
from celery.signals import worker_process_shutdown
|
|
from notifications_utils.statsd_decorators import statsd
|
|
from sqlalchemy.exc import SQLAlchemyError
|
|
|
|
from app import notify_celery
|
|
from flask import current_app
|
|
|
|
from app.dao.statistics_dao import (
|
|
create_or_update_job_sending_statistics,
|
|
update_job_stats_outcome_count
|
|
)
|
|
from app.dao.notifications_dao import get_notification_by_id
|
|
from app.config import QueueNames
|
|
|
|
|
|
@worker_process_shutdown.connect
|
|
def worker_process_shutdown(sender, signal, pid, exitcode):
|
|
current_app.logger.info('Statistics worker shutdown: PID: {} Exitcode: {}'.format(pid, exitcode))
|
|
|
|
|
|
@notify_celery.task(bind=True, name='record_initial_job_statistics', max_retries=20, default_retry_delay=10)
|
|
@statsd(namespace="tasks")
|
|
def record_initial_job_statistics(self, notification_id):
|
|
notification = None
|
|
try:
|
|
notification = get_notification_by_id(notification_id)
|
|
if notification:
|
|
create_or_update_job_sending_statistics(notification)
|
|
else:
|
|
raise SQLAlchemyError("Failed to find notification with id {}".format(notification_id))
|
|
except SQLAlchemyError as e:
|
|
current_app.logger.exception(e)
|
|
self.retry(queue=QueueNames.RETRY)
|
|
except self.MaxRetriesExceededError:
|
|
current_app.logger.error(
|
|
"RETRY FAILED: task record_initial_job_statistics failed for notification {}".format(
|
|
notification.id if notification else "missing ID"
|
|
)
|
|
)
|
|
|
|
|
|
@notify_celery.task(bind=True, name='record_outcome_job_statistics', max_retries=20, default_retry_delay=10)
|
|
@statsd(namespace="tasks")
|
|
def record_outcome_job_statistics(self, notification_id):
|
|
notification = None
|
|
try:
|
|
notification = get_notification_by_id(notification_id)
|
|
if notification:
|
|
updated_count = update_job_stats_outcome_count(notification)
|
|
if updated_count == 0:
|
|
self.retry(queue=QueueNames.RETRY)
|
|
else:
|
|
raise SQLAlchemyError("Failed to find notification with id {}".format(notification_id))
|
|
except SQLAlchemyError as e:
|
|
current_app.logger.exception(e)
|
|
self.retry(queue=QueueNames.RETRY)
|
|
except self.MaxRetriesExceededError:
|
|
current_app.logger.error(
|
|
"RETRY FAILED: task update_job_stats_outcome_count failed for notification {}".format(
|
|
notification.id if notification else "missing ID"
|
|
)
|
|
)
|