restart reporting worker after each task

The reporting worker tasks fetch large amounts of data from the db, do
some processing then store back in the database. As the reporting worker
only processes the create nightly billing/stats table tasks, which
aren't high performance or high volume, we're fine with the performance
hit from restarting the worker between every task (which based on
limited local testing takes about a second or so).

This causes some real funky shit with the app_context (used for
accessing current_app.logger). To access flask's global state we use the
standard way of importing `from flask import current_app`. However,
processes after the first one don't have the current_app available on
shut down (they're fine during the actual task running), and are unable
to call `with current_app.app_context()` to create it. They _are_ able
to call `with app.app_context()` to create it, where `app` is the
initial app that we pass in to `NotifyCelery.init_app`.

NotifyCelery.init_app is only called once, in the master process - I
think the application state is then stored and passed to the celery
workers. But then it looks like the teardown might clear it, but it
never gets set up again for the new workers? Unsure.

To fix this, store a copy of the initial flask app on the NotifyCelery
object and then use that from within the shutdown signal logging
function.

Nothing's ever easy ¯\_(ツ)_/¯
This commit is contained in:
Leo Hemsted
2020-04-24 11:21:41 +01:00
parent d88b20beec
commit ad419f7592
3 changed files with 18 additions and 4 deletions

View File

@@ -2,13 +2,20 @@ import time
from celery import Celery, Task
from celery.signals import worker_process_shutdown
from flask import current_app, g, request
from flask import g, request
from flask.ctx import has_request_context
@worker_process_shutdown.connect
def worker_process_shutdown(sender, signal, pid, exitcode, **kwargs):
current_app.logger.info('worker shutdown: PID: {} Exitcode: {}'.format(pid, exitcode))
def log_on_worker_shutdown(sender, signal, pid, exitcode, **kwargs):
# imported here to avoid circular imports
from app import notify_celery
# if the worker has already restarted at least once, then we no longer have app context and current_app won't work
# to create a new one. Instead we have to create a new app context from the original flask app and use that instead.
with notify_celery._app.app_context():
# if the worker has restarted
notify_celery._app.logger.info('worker shutdown: PID: {} Exitcode: {}'.format(pid, exitcode))
def make_task(app):
@@ -60,3 +67,4 @@ class NotifyCelery(Celery):
)
self.conf.update(app.config)
self._app = app

View File

@@ -174,6 +174,8 @@ class Config(object):
CELERY_TIMEZONE = 'Europe/London'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
# on reporting worker, restart workers after each task is executed to help prevent memory leaks
CELERYD_MAX_TASKS_PER_CHILD = os.getenv('CELERYD_MAX_TASKS_PER_CHILD')
CELERY_IMPORTS = (
'app.celery.tasks',
'app.celery.scheduled_tasks',