Created an object to hold queue names.

- This is to be used throughout the app.
- Allows us to manage queue names centrally
- All queues renamed to allow us to change the retry processing/visibility timeout
This commit is contained in:
Martyn Inglis
2017-05-25 10:50:55 +01:00
parent 07b527bb1b
commit 21586c917c

View File

@@ -12,6 +12,34 @@ if os.environ.get('VCAP_SERVICES'):
extract_cloudfoundry_config() extract_cloudfoundry_config()
class QueueNames(object):
PERIODIC = 'periodic-tasks'
PRIORITY = 'priority-tasks'
DATABASE = 'database-tasks'
SEND = 'send-tasks'
RESEARCH_MODE = 'research-mode-tasks'
STATISTICS = 'statistics-tasks'
JOBS = 'job-tasks'
RETRY = 'retry-tasks'
NOTIFY = 'notify-internal-tasks'
PROCESS_FTP = 'process-ftp-tasks'
@staticmethod
def all_queues():
return [
QueueNames.PRIORITY,
QueueNames.PERIODIC,
QueueNames.DATABASE,
QueueNames.SEND,
QueueNames.RESEARCH_MODE,
QueueNames.STATISTICS,
QueueNames.JOBS,
QueueNames.RETRY,
QueueNames.NOTIFY,
QueueNames.PROCESS_FTP
]
class Config(object): class Config(object):
# URL of admin app # URL of admin app
ADMIN_BASE_URL = os.environ['ADMIN_BASE_URL'] ADMIN_BASE_URL = os.environ['ADMIN_BASE_URL']
@@ -95,7 +123,7 @@ class Config(object):
BROKER_TRANSPORT_OPTIONS = { BROKER_TRANSPORT_OPTIONS = {
'region': AWS_REGION, 'region': AWS_REGION,
'polling_interval': 1, # 1 second 'polling_interval': 1, # 1 second
'visibility_timeout': 14410, # 4 hours 10 seconds. 10 seconds longer than max retry 'visibility_timeout': 300,
'queue_name_prefix': NOTIFICATION_QUEUE_PREFIX 'queue_name_prefix': NOTIFICATION_QUEUE_PREFIX
} }
CELERY_ENABLE_UTC = True, CELERY_ENABLE_UTC = True,
@@ -107,57 +135,57 @@ class Config(object):
'run-scheduled-jobs': { 'run-scheduled-jobs': {
'task': 'run-scheduled-jobs', 'task': 'run-scheduled-jobs',
'schedule': crontab(minute=1), 'schedule': crontab(minute=1),
'options': {'queue': 'periodic'} 'options': {'queue': QueueNames.PERIODIC}
}, },
'delete-verify-codes': { 'delete-verify-codes': {
'task': 'delete-verify-codes', 'task': 'delete-verify-codes',
'schedule': timedelta(minutes=63), 'schedule': timedelta(minutes=63),
'options': {'queue': 'periodic'} 'options': {'queue': QueueNames.PERIODIC}
}, },
'delete-invitations': { 'delete-invitations': {
'task': 'delete-invitations', 'task': 'delete-invitations',
'schedule': timedelta(minutes=66), 'schedule': timedelta(minutes=66),
'options': {'queue': 'periodic'} 'options': {'queue': QueueNames.PERIODIC}
}, },
'delete-sms-notifications': { 'delete-sms-notifications': {
'task': 'delete-sms-notifications', 'task': 'delete-sms-notifications',
'schedule': crontab(minute=0, hour=0), 'schedule': crontab(minute=0, hour=0),
'options': {'queue': 'periodic'} 'options': {'queue': QueueNames.PERIODIC}
}, },
'delete-email-notifications': { 'delete-email-notifications': {
'task': 'delete-email-notifications', 'task': 'delete-email-notifications',
'schedule': crontab(minute=20, hour=0), 'schedule': crontab(minute=20, hour=0),
'options': {'queue': 'periodic'} 'options': {'queue': QueueNames.PERIODIC}
}, },
'delete-letter-notifications': { 'delete-letter-notifications': {
'task': 'delete-letter-notifications', 'task': 'delete-letter-notifications',
'schedule': crontab(minute=40, hour=0), 'schedule': crontab(minute=40, hour=0),
'options': {'queue': 'periodic'} 'options': {'queue': QueueNames.PERIODIC}
}, },
'send-daily-performance-platform-stats': { 'send-daily-performance-platform-stats': {
'task': 'send-daily-performance-platform-stats', 'task': 'send-daily-performance-platform-stats',
'schedule': crontab(minute=0, hour=2), 'schedule': crontab(minute=0, hour=2),
'options': {'queue': 'periodic'} 'options': {'queue': QueueNames.PERIODIC}
}, },
'switch-current-sms-provider-on-slow-delivery': { 'switch-current-sms-provider-on-slow-delivery': {
'task': 'switch-current-sms-provider-on-slow-delivery', 'task': 'switch-current-sms-provider-on-slow-delivery',
'schedule': crontab(), # Every minute 'schedule': crontab(), # Every minute
'options': {'queue': 'periodic'} 'options': {'queue': QueueNames.PERIODIC}
}, },
'timeout-sending-notifications': { 'timeout-sending-notifications': {
'task': 'timeout-sending-notifications', 'task': 'timeout-sending-notifications',
'schedule': crontab(minute=0, hour=3), 'schedule': crontab(minute=0, hour=3),
'options': {'queue': 'periodic'} 'options': {'queue': QueueNames.PERIODIC}
}, },
'remove_csv_files': { 'remove_csv_files': {
'task': 'remove_csv_files', 'task': 'remove_csv_files',
'schedule': crontab(minute=0, hour=4), 'schedule': crontab(minute=0, hour=4),
'options': {'queue': 'periodic'} 'options': {'queue': QueueNames.PERIODIC}
}, },
'timeout-job-statistics': { 'timeout-job-statistics': {
'task': 'timeout-job-statistics', 'task': 'timeout-job-statistics',
'schedule': crontab(minute=0, hour=5), 'schedule': crontab(minute=0, hour=5),
'options': {'queue': 'periodic'} 'options': {'queue': QueueNames.PERIODIC}
} }
} }
CELERY_QUEUES = [] CELERY_QUEUES = []
@@ -211,20 +239,12 @@ class Development(Config):
NOTIFY_ENVIRONMENT = 'development' NOTIFY_ENVIRONMENT = 'development'
NOTIFICATION_QUEUE_PREFIX = 'development' NOTIFICATION_QUEUE_PREFIX = 'development'
DEBUG = True DEBUG = True
CELERY_QUEUES = Config.CELERY_QUEUES + [
Queue('db-sms', Exchange('default'), routing_key='db-sms'), for queue in QueueNames.all_queues():
Queue('priority', Exchange('default'), routing_key='priority'), Config.CELERY_QUEUES.append(
Queue('periodic', Exchange('default'), routing_key='periodic'), Queue(queue, Exchange('default'), routing_key=queue)
Queue('db-email', Exchange('default'), routing_key='db-email'), )
Queue('db-letter', Exchange('default'), routing_key='db-letter'),
Queue('send-sms', Exchange('default'), routing_key='send-sms'),
Queue('send-email', Exchange('default'), routing_key='send-email'),
Queue('research-mode', Exchange('default'), routing_key='research-mode'),
Queue('statistics', Exchange('default'), routing_key='statistics'),
Queue('process-job', Exchange('default'), routing_key='process-job'),
Queue('retry', Exchange('default'), routing_key='retry'),
Queue('notify', Exchange('default'), routing_key='notify')
]
API_HOST_NAME = "http://localhost:6011" API_HOST_NAME = "http://localhost:6011"
API_RATE_LIMIT_ENABLED = True API_RATE_LIMIT_ENABLED = True
@@ -238,20 +258,11 @@ class Test(Config):
STATSD_ENABLED = True STATSD_ENABLED = True
STATSD_HOST = "localhost" STATSD_HOST = "localhost"
STATSD_PORT = 1000 STATSD_PORT = 1000
CELERY_QUEUES = Config.CELERY_QUEUES + [
Queue('periodic', Exchange('default'), routing_key='periodic'), for queue in QueueNames.all_queues():
Queue('priority', Exchange('default'), routing_key='priority'), Config.CELERY_QUEUES.append(
Queue('db-sms', Exchange('default'), routing_key='db-sms'), Queue(queue, Exchange('default'), routing_key=queue)
Queue('db-email', Exchange('default'), routing_key='db-email'), )
Queue('db-letter', Exchange('default'), routing_key='db-letter'),
Queue('send-sms', Exchange('default'), routing_key='send-sms'),
Queue('send-email', Exchange('default'), routing_key='send-email'),
Queue('research-mode', Exchange('default'), routing_key='research-mode'),
Queue('statistics', Exchange('default'), routing_key='statistics'),
Queue('process-job', Exchange('default'), routing_key='process-job'),
Queue('retry', Exchange('default'), routing_key='retry'),
Queue('notify', Exchange('default'), routing_key='notify')
]
API_RATE_LIMIT_ENABLED = True API_RATE_LIMIT_ENABLED = True
API_HOST_NAME = "http://localhost:6011" API_HOST_NAME = "http://localhost:6011"