Merge pull request #1651 from alphagov/eventlet-celery-workers-service-callbacks

Switch service callback workers to use eventlet pool implementation
This commit is contained in:
Alexey Bezhan
2018-02-15 11:08:54 +00:00
committed by GitHub
4 changed files with 29 additions and 22 deletions

View File

@@ -1,31 +1,33 @@
import time
from flask import current_app
from celery import Celery, Task
class NotifyTask(Task):
abstract = True
start = None
def make_task(app):
class NotifyTask(Task):
abstract = True
start = None
def on_success(self, retval, task_id, args, kwargs):
elapsed_time = time.time() - self.start
current_app.logger.info(
"{task_name} took {time}".format(
task_name=self.name, time="{0:.4f}".format(elapsed_time)
def on_success(self, retval, task_id, args, kwargs):
elapsed_time = time.time() - self.start
app.logger.info(
"{task_name} took {time}".format(
task_name=self.name, time="{0:.4f}".format(elapsed_time)
)
)
)
def on_failure(self, exc, task_id, args, kwargs, einfo):
# ensure task will log exceptions to correct handlers
current_app.logger.exception('Celery task failed')
super().on_failure(exc, task_id, args, kwargs, einfo)
def on_failure(self, exc, task_id, args, kwargs, einfo):
# ensure task will log exceptions to correct handlers
app.logger.exception('Celery task failed')
super().on_failure(exc, task_id, args, kwargs, einfo)
def __call__(self, *args, **kwargs):
# ensure task has flask context to access config, logger, etc
with current_app.app_context():
self.start = time.time()
return super().__call__(*args, **kwargs)
def __call__(self, *args, **kwargs):
# ensure task has flask context to access config, logger, etc
with app.app_context():
self.start = time.time()
return super().__call__(*args, **kwargs)
return NotifyTask
class NotifyCelery(Celery):
@@ -34,7 +36,7 @@ class NotifyCelery(Celery):
super().__init__(
app.import_name,
broker=app.config['BROKER_URL'],
task_cls=NotifyTask,
task_cls=make_task(app),
)
self.conf.update(app.config)

View File

@@ -3,6 +3,7 @@ import json
from notifications_utils.statsd_decorators import statsd
from app import (
db,
DATETIME_FORMAT,
notify_celery,
)
@@ -30,6 +31,9 @@ def send_delivery_status_to_service(self, notification_id):
# No delivery receipt API info set
return
# Release DB connection before performing an external HTTP request
db.session.close()
data = {
"id": str(notification_id),
"reference": str(notification.client_reference),

View File

@@ -95,6 +95,6 @@ applications:
NOTIFY_APP_NAME: delivery-worker-receipts
- name: notify-delivery-worker-service-callbacks
command: scripts/run_app_paas.sh celery -A run_celery.notify_celery worker --loglevel=INFO --concurrency=11 -Q service-callbacks
command: scripts/run_app_paas.sh celery -A run_celery.notify_celery worker --loglevel=INFO -P eventlet -c 1000 -Q service-callbacks
env:
NOTIFY_APP_NAME: delivery-worker-service-callbacks
NOTIFY_APP_NAME: delivery-worker-service-callbacks

View File

@@ -1,4 +1,5 @@
#!/usr/bin/env python
from flask import Flask
# notify_celery is referenced from manifest_delivery_base.yml, and cannot be removed