Finished celery refactor - set up config for queue prefix

LEO notes: Also made sure the Test BROKER_URL is preserved so that
tests warn you when celery isn't mocked out
This commit is contained in:
Martyn Inglis
2017-06-09 16:20:02 +01:00
committed by Leo Hemsted
parent 786adb5d71
commit 0e9e8955f7
4 changed files with 33 additions and 20 deletions

View File

@@ -7,14 +7,16 @@ from kombu import Queue, Exchange
from app.config import QueueNames
# BROKER_URL = 'you-forgot-to-mock-celery-in-your-tests://'
class CeleryConfig(object):
broker_url = 'sqs://'
class CeleryConfig:
def __init__(self, config):
self.broker_transport_options['queue_name_prefix'] = config['NOTIFICATION_QUEUE_PREFIX']
self.broker_url = config.get('BROKER_URL', 'sqs://')
broker_transport_options = {
'region': 'sqs.eu-west-1',
'polling_interval': 1, # 1 second
'visibility_timeout': 310,
'queue_name_prefix': 'martyn-'
'queue_name_prefix': None
}
enable_utc = True,
timezone = 'Europe/London'
@@ -110,9 +112,17 @@ class CeleryConfig(object):
class NotifyCelery(Celery):
def init_app(self, app):
super().__init__(app.import_name, broker=CeleryConfig.broker_url)
self.init_queues_if_needed(app.config['NOTIFY_ENVIRONMENT'])
self.config_from_object(CeleryConfig())
celery_config = CeleryConfig(app.config)
super().__init__(app.import_name, broker=celery_config.broker_url)
if app.config['INITIALISE_QUEUES']:
for queue in QueueNames.all_queues():
CeleryConfig.task_queues.append(
Queue(queue, Exchange('default'), routing_key=queue)
)
self.config_from_object()
TaskBase = self.Task
class ContextTask(TaskBase):
@@ -123,10 +133,3 @@ class NotifyCelery(Celery):
return TaskBase.__call__(self, *args, **kwargs)
self.Task = ContextTask
def init_queues_if_needed(self, environment):
if environment in ['development', 'test']:
for queue in QueueNames.all_queues():
CeleryConfig.task_queues.append(
Queue(queue, Exchange('default'), routing_key=queue)
)