mirror of
https://github.com/GSA/notifications-api.git
synced 2025-12-21 07:51:13 -05:00
The Notify team needs to investigate when a notification is marked as failed. We will process the whole file and mark the notifications with the appropriate status, if any are failed an exception is raised. The exception will trigger a cloud watch error for the team to investigate.
43 lines
1.2 KiB
Python
43 lines
1.2 KiB
Python
import time
|
|
|
|
from celery import Celery, Task
|
|
|
|
|
|
def make_task(app):
|
|
class NotifyTask(Task):
|
|
abstract = True
|
|
start = None
|
|
|
|
def on_success(self, retval, task_id, args, kwargs):
|
|
elapsed_time = time.time() - self.start
|
|
app.logger.info(
|
|
"{task_name} took {time}".format(
|
|
task_name=self.name, time="{0:.4f}".format(elapsed_time)
|
|
)
|
|
)
|
|
|
|
def on_failure(self, exc, task_id, args, kwargs, einfo):
|
|
# ensure task will log exceptions to correct handlers
|
|
app.logger.exception('Celery task: {} failed'.format(self.name))
|
|
super().on_failure(exc, task_id, args, kwargs, einfo)
|
|
|
|
def __call__(self, *args, **kwargs):
|
|
# ensure task has flask context to access config, logger, etc
|
|
with app.app_context():
|
|
self.start = time.time()
|
|
return super().__call__(*args, **kwargs)
|
|
|
|
return NotifyTask
|
|
|
|
|
|
class NotifyCelery(Celery):
|
|
|
|
def init_app(self, app):
|
|
super().__init__(
|
|
app.import_name,
|
|
broker=app.config['BROKER_URL'],
|
|
task_cls=make_task(app),
|
|
)
|
|
|
|
self.conf.update(app.config)
|