Files
notifications-api/app/celery/celery.py
Leo Hemsted ad419f7592 restart reporting worker after each task
The reporting worker tasks fetch large amounts of data from the db, do
some processing then store back in the database. As the reporting worker
only processes the create nightly billing/stats table tasks, which
aren't high performance or high volume, we're fine with the performance
hit from restarting the worker between every task (which based on
limited local testing takes about a second or so).

This causes some real funky shit with the app_context (used for
accessing current_app.logger). To access flask's global state we use the
standard way of importing `from flask import current_app`. However,
processes after the first one don't have the current_app available on
shut down (they're fine during the actual task running), and are unable
to call `with current_app.app_context()` to create it. They _are_ able
to call `with app.app_context()` to create it, where `app` is the
initial app that we pass in to `NotifyCelery.init_app`.

NotifyCelery.init_app is only called once, in the master process - I
think the application state is then stored and passed to the celery
workers. But then it looks like the teardown might clear it, but it
never gets set up again for the new workers? Unsure.

To fix this, store a copy of the initial flask app on the NotifyCelery
object and then use that from within the shutdown signal logging
function.

Nothing's ever easy ¯\_(ツ)_/¯
2020-04-24 12:28:25 +01:00

71 lines
2.6 KiB
Python

import time
from celery import Celery, Task
from celery.signals import worker_process_shutdown
from flask import g, request
from flask.ctx import has_request_context
@worker_process_shutdown.connect
def log_on_worker_shutdown(sender, signal, pid, exitcode, **kwargs):
# imported here to avoid circular imports
from app import notify_celery
# if the worker has already restarted at least once, then we no longer have app context and current_app won't work
# to create a new one. Instead we have to create a new app context from the original flask app and use that instead.
with notify_celery._app.app_context():
# if the worker has restarted
notify_celery._app.logger.info('worker shutdown: PID: {} Exitcode: {}'.format(pid, exitcode))
def make_task(app):
class NotifyTask(Task):
abstract = True
start = None
def on_success(self, retval, task_id, args, kwargs):
elapsed_time = time.time() - self.start
app.logger.info(
"{task_name} took {time}".format(
task_name=self.name, time="{0:.4f}".format(elapsed_time)
)
)
def on_failure(self, exc, task_id, args, kwargs, einfo):
# ensure task will log exceptions to correct handlers
app.logger.exception('Celery task: {} failed'.format(self.name))
super().on_failure(exc, task_id, args, kwargs, einfo)
def __call__(self, *args, **kwargs):
# ensure task has flask context to access config, logger, etc
with app.app_context():
self.start = time.time()
# Remove 'request_id' from the kwargs (so the task doesn't get an unexpected kwarg), then add it to g
# so that it gets logged
g.request_id = kwargs.pop('request_id', None)
return super().__call__(*args, **kwargs)
def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
link=None, link_error=None, **options):
kwargs = kwargs or {}
if has_request_context() and hasattr(request, 'request_id'):
kwargs['request_id'] = request.request_id
return super().apply_async(args, kwargs, task_id, producer, link, link_error, **options)
return NotifyTask
class NotifyCelery(Celery):
def init_app(self, app):
super().__init__(
app.import_name,
broker=app.config['BROKER_URL'],
task_cls=make_task(app),
)
self.conf.update(app.config)
self._app = app