From 599e611278dfacdfcbfb076cf8ff6a3aab54e225 Mon Sep 17 00:00:00 2001 From: Alexey Bezhan Date: Mon, 12 Feb 2018 15:29:03 +0000 Subject: [PATCH 1/3] Create an app context for each celery task run Celery tasks require an active app context in order to use app logger, database or app configuration values. Since there's no builtin support we create this context by using a custom celery task class NotifyTask. However, NotifyTask was using `current_app`, which itself is only available within an app context so the code was pushing an initial context in run_celery.py. This works with the default prefork pool implementation, but raises a "working outside of application context" error with an eventlet pool since that initial application context is local to a thread and eventlet celery worker pool will create multiple green threads. To avoid this, we bind NotifyTask to the app variable with a closure and use that variable instead of `current_app` to create the context for executing the task. This avoids any issues caused by shared initial app context being lost when spawning additional worker threads. We still need to keep the context push in run_celery.py for prefork workers since it's required for logging events outside of tasks (eg logging during `worker_process_shutdown` signal processing). --- app/celery/celery.py | 42 ++++++++++++++++++++++-------------------- run_celery.py | 1 + 2 files changed, 23 insertions(+), 20 deletions(-) diff --git a/app/celery/celery.py b/app/celery/celery.py index 23b318692..5fc30a51c 100644 --- a/app/celery/celery.py +++ b/app/celery/celery.py @@ -1,31 +1,33 @@ import time -from flask import current_app from celery import Celery, Task -class NotifyTask(Task): - abstract = True - start = None +def make_task(app): + class NotifyTask(Task): + abstract = True + start = None - def on_success(self, retval, task_id, args, kwargs): - elapsed_time = time.time() - self.start - current_app.logger.info( - "{task_name} took {time}".format( - task_name=self.name, time="{0:.4f}".format(elapsed_time) + def on_success(self, retval, task_id, args, kwargs): + elapsed_time = time.time() - self.start + app.logger.info( + "{task_name} took {time}".format( + task_name=self.name, time="{0:.4f}".format(elapsed_time) + ) ) - ) - def on_failure(self, exc, task_id, args, kwargs, einfo): - # ensure task will log exceptions to correct handlers - current_app.logger.exception('Celery task failed') - super().on_failure(exc, task_id, args, kwargs, einfo) + def on_failure(self, exc, task_id, args, kwargs, einfo): + # ensure task will log exceptions to correct handlers + app.logger.exception('Celery task failed') + super().on_failure(exc, task_id, args, kwargs, einfo) - def __call__(self, *args, **kwargs): - # ensure task has flask context to access config, logger, etc - with current_app.app_context(): - self.start = time.time() - return super().__call__(*args, **kwargs) + def __call__(self, *args, **kwargs): + # ensure task has flask context to access config, logger, etc + with app.app_context(): + self.start = time.time() + return super().__call__(*args, **kwargs) + + return NotifyTask class NotifyCelery(Celery): @@ -34,7 +36,7 @@ class NotifyCelery(Celery): super().__init__( app.import_name, broker=app.config['BROKER_URL'], - task_cls=NotifyTask, + task_cls=make_task(app), ) self.conf.update(app.config) diff --git a/run_celery.py b/run_celery.py index 0b847892e..e21533c86 100644 --- a/run_celery.py +++ b/run_celery.py @@ -1,4 +1,5 @@ #!/usr/bin/env python + from flask import Flask # notify_celery is referenced from manifest_delivery_base.yml, and cannot be removed From 9eada233929a7d0d7eb470cda94cb355013900be Mon Sep 17 00:00:00 2001 From: Alexey Bezhan Date: Mon, 12 Feb 2018 15:44:05 +0000 Subject: [PATCH 2/3] Release DB connection before executing service API callback Flask-SQLAlchemy sets up a connection pool with 5 connections and will create up to 10 additional connections if all the pool ones are in use. If all connections in the pool and all overflow connections are in use, SQLAlchemy will block new DB sessions until a connection becomes available. If a session can't acquire a connections for a specified time (we set it to 30s) then a TimeoutError is raised. By default db.session is deleted with the related context object (so when the request is finished or app context is discarded). This effectively limits the number of concurrent requests/tasks with multithreaded gunicorn/celery workers to the maximum DB connection pool size. Most of the time these limits are fine since the API requests are relatively quick and are mainly interacting with the database anyway. Service callbacks however have to make an HTTP request to a third party. If these requests start taking a long time and the number of threads is larger than the number of DB connections then remaining threads will start blocking and potentially failing if it takes more than 30s to acquire a connection. For example if a 100 threads start running tasks that take 20s each with a max DB connection pool size of 10 then first 10 threads will acquire a connection right away, next 10 tasks will block for 20 seconds before the initial connections are released and all other tasks will raise a TimeoutError after 30 seconds. To avoid this, we perform all database operations at the beginning of the task and then explicitly close the DB session before sending the HTTP request to the service callback URL. Closing the session ends the transaction and frees up the connection, making it available for other tasks. Making calls to the DB after calling `close` will acquire a new connection. This means that tasks are still limited to running at most 15 queries at the same time, but can have a lot more concurrent HTTP requests in progress. --- app/celery/service_callback_tasks.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/app/celery/service_callback_tasks.py b/app/celery/service_callback_tasks.py index 52d86f631..4dfe0c953 100644 --- a/app/celery/service_callback_tasks.py +++ b/app/celery/service_callback_tasks.py @@ -3,6 +3,7 @@ import json from notifications_utils.statsd_decorators import statsd from app import ( + db, DATETIME_FORMAT, notify_celery, ) @@ -30,6 +31,9 @@ def send_delivery_status_to_service(self, notification_id): # No delivery receipt API info set return + # Release DB connection before performing an external HTTP request + db.session.close() + data = { "id": str(notification_id), "reference": str(notification.client_reference), From 2dfbd93c7ed65028706e04762ea5102c60e5acbf Mon Sep 17 00:00:00 2001 From: Alexey Bezhan Date: Mon, 12 Feb 2018 16:19:23 +0000 Subject: [PATCH 3/3] Switch service callback workers to use eventlet pool implementation Service callbacks are I/O bound and can take a long time if the callback URL takes a long time to respond. This is a perfect use case for an eventlet worker pool since it allows spawning multiple green threads (1000 in the proposed configuration) to execute HTTP requests concurrently without a significant increase in CPU load or memory usage needed for adding additional worker processes. --- manifest-delivery-base.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/manifest-delivery-base.yml b/manifest-delivery-base.yml index 82bad9b2d..e141c6482 100644 --- a/manifest-delivery-base.yml +++ b/manifest-delivery-base.yml @@ -95,6 +95,6 @@ applications: NOTIFY_APP_NAME: delivery-worker-receipts - name: notify-delivery-worker-service-callbacks - command: scripts/run_app_paas.sh celery -A run_celery.notify_celery worker --loglevel=INFO --concurrency=11 -Q service-callbacks + command: scripts/run_app_paas.sh celery -A run_celery.notify_celery worker --loglevel=INFO -P eventlet -c 1000 -Q service-callbacks env: - NOTIFY_APP_NAME: delivery-worker-service-callbacks \ No newline at end of file + NOTIFY_APP_NAME: delivery-worker-service-callbacks