diff --git a/app/celery/celery.py b/app/celery/celery.py index 506de84d8..28ff6fb5f 100644 --- a/app/celery/celery.py +++ b/app/celery/celery.py @@ -2,13 +2,20 @@ import time from celery import Celery, Task from celery.signals import worker_process_shutdown -from flask import current_app, g, request +from flask import g, request from flask.ctx import has_request_context @worker_process_shutdown.connect -def worker_process_shutdown(sender, signal, pid, exitcode, **kwargs): - current_app.logger.info('worker shutdown: PID: {} Exitcode: {}'.format(pid, exitcode)) +def log_on_worker_shutdown(sender, signal, pid, exitcode, **kwargs): + # imported here to avoid circular imports + from app import notify_celery + + # if the worker has already restarted at least once, then we no longer have app context and current_app won't work + # to create a new one. Instead we have to create a new app context from the original flask app and use that instead. + with notify_celery._app.app_context(): + # if the worker has restarted + notify_celery._app.logger.info('worker shutdown: PID: {} Exitcode: {}'.format(pid, exitcode)) def make_task(app): @@ -60,3 +67,4 @@ class NotifyCelery(Celery): ) self.conf.update(app.config) + self._app = app diff --git a/app/config.py b/app/config.py index fa0b6cd6e..3e38d8521 100644 --- a/app/config.py +++ b/app/config.py @@ -174,6 +174,8 @@ class Config(object): CELERY_TIMEZONE = 'Europe/London' CELERY_ACCEPT_CONTENT = ['json'] CELERY_TASK_SERIALIZER = 'json' + # on reporting worker, restart workers after each task is executed to help prevent memory leaks + CELERYD_MAX_TASKS_PER_CHILD = os.getenv('CELERYD_MAX_TASKS_PER_CHILD') CELERY_IMPORTS = ( 'app.celery.tasks', 'app.celery.scheduled_tasks', diff --git a/manifest.yml.j2 b/manifest.yml.j2 index da9f67be7..918b4b50e 100644 --- a/manifest.yml.j2 +++ b/manifest.yml.j2 @@ -30,7 +30,7 @@ 'notify-delivery-worker-research': {}, 'notify-delivery-worker-sender': {'disk_quota': '2G', 'memory': '3G'}, 'notify-delivery-worker-periodic': {}, - 'notify-delivery-worker-reporting': {}, + 'notify-delivery-worker-reporting': {'additional_env_vars': {'CELERYD_MAX_TASKS_PER_CHILD': 1}}, 'notify-delivery-worker-priority': {}, 'notify-delivery-worker-letters': {}, 'notify-delivery-worker-retry-tasks': {}, @@ -112,3 +112,7 @@ applications: TEMPLATE_PREVIEW_API_HOST: '{{ TEMPLATE_PREVIEW_API_HOST }}' TEMPLATE_PREVIEW_API_KEY: '{{ TEMPLATE_PREVIEW_API_KEY }}' + + {% for key, value in app.get('additional_env_vars', {}).items() %} + {{key}}: '{{value}}' + {% endfor %}