From ad419f7592524292f8da8447814cc0bbc37f240e Mon Sep 17 00:00:00 2001 From: Leo Hemsted Date: Fri, 24 Apr 2020 11:21:41 +0100 Subject: [PATCH] restart reporting worker after each task MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The reporting worker tasks fetch large amounts of data from the db, do some processing then store back in the database. As the reporting worker only processes the create nightly billing/stats table tasks, which aren't high performance or high volume, we're fine with the performance hit from restarting the worker between every task (which based on limited local testing takes about a second or so). This causes some real funky shit with the app_context (used for accessing current_app.logger). To access flask's global state we use the standard way of importing `from flask import current_app`. However, processes after the first one don't have the current_app available on shut down (they're fine during the actual task running), and are unable to call `with current_app.app_context()` to create it. They _are_ able to call `with app.app_context()` to create it, where `app` is the initial app that we pass in to `NotifyCelery.init_app`. NotifyCelery.init_app is only called once, in the master process - I think the application state is then stored and passed to the celery workers. But then it looks like the teardown might clear it, but it never gets set up again for the new workers? Unsure. To fix this, store a copy of the initial flask app on the NotifyCelery object and then use that from within the shutdown signal logging function. Nothing's ever easy ¯\_(ツ)_/¯ --- app/celery/celery.py | 14 +++++++++++--- app/config.py | 2 ++ manifest.yml.j2 | 6 +++++- 3 files changed, 18 insertions(+), 4 deletions(-) diff --git a/app/celery/celery.py b/app/celery/celery.py index 506de84d8..28ff6fb5f 100644 --- a/app/celery/celery.py +++ b/app/celery/celery.py @@ -2,13 +2,20 @@ import time from celery import Celery, Task from celery.signals import worker_process_shutdown -from flask import current_app, g, request +from flask import g, request from flask.ctx import has_request_context @worker_process_shutdown.connect -def worker_process_shutdown(sender, signal, pid, exitcode, **kwargs): - current_app.logger.info('worker shutdown: PID: {} Exitcode: {}'.format(pid, exitcode)) +def log_on_worker_shutdown(sender, signal, pid, exitcode, **kwargs): + # imported here to avoid circular imports + from app import notify_celery + + # if the worker has already restarted at least once, then we no longer have app context and current_app won't work + # to create a new one. Instead we have to create a new app context from the original flask app and use that instead. + with notify_celery._app.app_context(): + # if the worker has restarted + notify_celery._app.logger.info('worker shutdown: PID: {} Exitcode: {}'.format(pid, exitcode)) def make_task(app): @@ -60,3 +67,4 @@ class NotifyCelery(Celery): ) self.conf.update(app.config) + self._app = app diff --git a/app/config.py b/app/config.py index fa0b6cd6e..3e38d8521 100644 --- a/app/config.py +++ b/app/config.py @@ -174,6 +174,8 @@ class Config(object): CELERY_TIMEZONE = 'Europe/London' CELERY_ACCEPT_CONTENT = ['json'] CELERY_TASK_SERIALIZER = 'json' + # on reporting worker, restart workers after each task is executed to help prevent memory leaks + CELERYD_MAX_TASKS_PER_CHILD = os.getenv('CELERYD_MAX_TASKS_PER_CHILD') CELERY_IMPORTS = ( 'app.celery.tasks', 'app.celery.scheduled_tasks', diff --git a/manifest.yml.j2 b/manifest.yml.j2 index da9f67be7..918b4b50e 100644 --- a/manifest.yml.j2 +++ b/manifest.yml.j2 @@ -30,7 +30,7 @@ 'notify-delivery-worker-research': {}, 'notify-delivery-worker-sender': {'disk_quota': '2G', 'memory': '3G'}, 'notify-delivery-worker-periodic': {}, - 'notify-delivery-worker-reporting': {}, + 'notify-delivery-worker-reporting': {'additional_env_vars': {'CELERYD_MAX_TASKS_PER_CHILD': 1}}, 'notify-delivery-worker-priority': {}, 'notify-delivery-worker-letters': {}, 'notify-delivery-worker-retry-tasks': {}, @@ -112,3 +112,7 @@ applications: TEMPLATE_PREVIEW_API_HOST: '{{ TEMPLATE_PREVIEW_API_HOST }}' TEMPLATE_PREVIEW_API_KEY: '{{ TEMPLATE_PREVIEW_API_KEY }}' + + {% for key, value in app.get('additional_env_vars', {}).items() %} + {{key}}: '{{value}}' + {% endfor %}