mirror of
https://github.com/GSA/notifications-api.git
synced 2025-12-21 07:51:13 -05:00
Celery tasks require an active app context in order to use app logger, database or app configuration values. Since there's no builtin support we create this context by using a custom celery task class NotifyTask. However, NotifyTask was using `current_app`, which itself is only available within an app context so the code was pushing an initial context in run_celery.py. This works with the default prefork pool implementation, but raises a "working outside of application context" error with an eventlet pool since that initial application context is local to a thread and eventlet celery worker pool will create multiple green threads. To avoid this, we bind NotifyTask to the app variable with a closure and use that variable instead of `current_app` to create the context for executing the task. This avoids any issues caused by shared initial app context being lost when spawning additional worker threads. We still need to keep the context push in run_celery.py for prefork workers since it's required for logging events outside of tasks (eg logging during `worker_process_shutdown` signal processing).
43 lines
1.2 KiB
Python
43 lines
1.2 KiB
Python
import time
|
|
|
|
from celery import Celery, Task
|
|
|
|
|
|
def make_task(app):
|
|
class NotifyTask(Task):
|
|
abstract = True
|
|
start = None
|
|
|
|
def on_success(self, retval, task_id, args, kwargs):
|
|
elapsed_time = time.time() - self.start
|
|
app.logger.info(
|
|
"{task_name} took {time}".format(
|
|
task_name=self.name, time="{0:.4f}".format(elapsed_time)
|
|
)
|
|
)
|
|
|
|
def on_failure(self, exc, task_id, args, kwargs, einfo):
|
|
# ensure task will log exceptions to correct handlers
|
|
app.logger.exception('Celery task failed')
|
|
super().on_failure(exc, task_id, args, kwargs, einfo)
|
|
|
|
def __call__(self, *args, **kwargs):
|
|
# ensure task has flask context to access config, logger, etc
|
|
with app.app_context():
|
|
self.start = time.time()
|
|
return super().__call__(*args, **kwargs)
|
|
|
|
return NotifyTask
|
|
|
|
|
|
class NotifyCelery(Celery):
|
|
|
|
def init_app(self, app):
|
|
super().__init__(
|
|
app.import_name,
|
|
broker=app.config['BROKER_URL'],
|
|
task_cls=make_task(app),
|
|
)
|
|
|
|
self.conf.update(app.config)
|