Move Queuenames in with the celery code, revamp config to allow move to celery 4.x

This commit is contained in:
Martyn Inglis
2017-06-09 12:24:54 +01:00
committed by Leo Hemsted
parent e0106eb1be
commit 786adb5d71
18 changed files with 52 additions and 60 deletions

View File

@@ -1,5 +1,6 @@
from datetime import timedelta
from app.celery import QueueNames
from celery import Celery
from celery.schedules import crontab
from kombu import Queue, Exchange
@@ -8,7 +9,6 @@ from app.config import QueueNames
# BROKER_URL = 'you-forgot-to-mock-celery-in-your-tests://'
class CeleryConfig(object):
broker_url = 'sqs://'
broker_transport_options = {
'region': 'sqs.eu-west-1',
@@ -107,16 +107,11 @@ class CeleryConfig(object):
}
task_queues = []
for queue in QueueNames.all_queues():
task_queues.append(
Queue(queue, Exchange('default'), routing_key=queue)
)
class NotifyCelery(Celery):
def init_app(self, app):
super().__init__(app.import_name, broker=CeleryConfig.broker_url)
self.init_queues_if_needed(app.config['NOTIFY_ENVIRONMENT'])
self.config_from_object(CeleryConfig())
TaskBase = self.Task
@@ -126,4 +121,12 @@ class NotifyCelery(Celery):
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
self.Task = ContextTask
def init_queues_if_needed(self, environment):
if environment in ['development', 'test']:
for queue in QueueNames.all_queues():
CeleryConfig.task_queues.append(
Queue(queue, Exchange('default'), routing_key=queue)
)