Files
notifications-api/app/celery/celery.py
Richard Chapman 2d670e8cf0 Updated utils to the latest version. This version of utils has less
logging at info level and as such no longer prints out the celery task
timing which are found to be use to find out if a tasks has been called
but also the timing for the task. Added an extra timing message for
celery tasks so that it can be determined if the these are less frequent
than the API calls and provide more useful information
2018-02-05 14:58:02 +00:00

41 lines
1.1 KiB
Python

import time
from flask import current_app
from celery import Celery, Task
class NotifyTask(Task):
abstract = True
start = None
def on_success(self, retval, task_id, args, kwargs):
elapsed_time = time.time() - self.start
current_app.logger.info(
"{task_name} took {time}".format(
task_name=Task.name, time="{0:.4f}".format(elapsed_time)
)
)
def on_failure(self, exc, task_id, args, kwargs, einfo):
# ensure task will log exceptions to correct handlers
current_app.logger.exception('Celery task failed')
super().on_failure(exc, task_id, args, kwargs, einfo)
def __call__(self, *args, **kwargs):
# ensure task has flask context to access config, logger, etc
with current_app.app_context():
self.start = time.time()
return super().__call__(*args, **kwargs)
class NotifyCelery(Celery):
def init_app(self, app):
super().__init__(
app.import_name,
broker=app.config['BROKER_URL'],
task_cls=NotifyTask,
)
self.conf.update(app.config)