Files
notifications-api/app/celery/celery.py

50 lines
1.5 KiB
Python
Raw Normal View History

import time
from celery import Celery, Task
from celery.signals import worker_process_shutdown
from flask import current_app
@worker_process_shutdown.connect
def worker_process_shutdown(sender, signal, pid, exitcode, **kwargs):
current_app.logger.info('worker shutdown: PID: {} Exitcode: {}'.format(pid, exitcode))
def make_task(app):
class NotifyTask(Task):
abstract = True
start = None
def on_success(self, retval, task_id, args, kwargs):
elapsed_time = time.time() - self.start
app.logger.info(
"{task_name} took {time}".format(
task_name=self.name, time="{0:.4f}".format(elapsed_time)
)
)
def on_failure(self, exc, task_id, args, kwargs, einfo):
# ensure task will log exceptions to correct handlers
app.logger.exception('Celery task: {} failed'.format(self.name))
super().on_failure(exc, task_id, args, kwargs, einfo)
def __call__(self, *args, **kwargs):
# ensure task has flask context to access config, logger, etc
with app.app_context():
self.start = time.time()
return super().__call__(*args, **kwargs)
return NotifyTask
2016-02-09 13:31:45 +00:00
class NotifyCelery(Celery):
def init_app(self, app):
super().__init__(
app.import_name,
broker=app.config['BROKER_URL'],
task_cls=make_task(app),
)
2016-02-09 13:31:45 +00:00
self.conf.update(app.config)