From e0106eb1bef47e1f9c3a05d5362ba0a440b33a7c Mon Sep 17 00:00:00 2001 From: Martyn Inglis Date: Thu, 1 Jun 2017 14:32:19 +0100 Subject: [PATCH] hacked celery4.0.2 in. Runs and works - note though this version of master I branched had split head on sqlalchemey. This needs a new master merge to fix --- app/celery/celery.py | 116 ++++++++++++++++++++++++++++++++++++++++++- app/config.py | 112 +---------------------------------------- requirements.txt | 3 +- 3 files changed, 118 insertions(+), 113 deletions(-) diff --git a/app/celery/celery.py b/app/celery/celery.py index 183e50bd6..9d0fe7c88 100644 --- a/app/celery/celery.py +++ b/app/celery/celery.py @@ -1,11 +1,123 @@ +from datetime import timedelta + from celery import Celery +from celery.schedules import crontab +from kombu import Queue, Exchange + +from app.config import QueueNames + +# BROKER_URL = 'you-forgot-to-mock-celery-in-your-tests://' +class CeleryConfig(object): + + broker_url = 'sqs://' + broker_transport_options = { + 'region': 'sqs.eu-west-1', + 'polling_interval': 1, # 1 second + 'visibility_timeout': 310, + 'queue_name_prefix': 'martyn-' + } + enable_utc = True, + timezone = 'Europe/London' + accept_content = ['json'] + task_serializer = 'json' + imports = ('app.celery.tasks', 'app.celery.scheduled_tasks') + beat_schedule = { + 'run-scheduled-jobs': { + 'task': 'run-scheduled-jobs', + 'schedule': crontab(minute=1), + 'options': {'queue': QueueNames.PERIODIC} + }, + # 'send-scheduled-notifications': { + # 'task': 'send-scheduled-notifications', + # 'schedule': crontab(minute='*/15'), + # 'options': {'queue': 'periodic'} + # }, + 'delete-verify-codes': { + 'task': 'delete-verify-codes', + 'schedule': timedelta(minutes=63), + 'options': {'queue': QueueNames.PERIODIC} + }, + 'delete-invitations': { + 'task': 'delete-invitations', + 'schedule': timedelta(minutes=66), + 'options': {'queue': QueueNames.PERIODIC} + }, + 'delete-sms-notifications': { + 'task': 'delete-sms-notifications', + 'schedule': crontab(minute=0, hour=0), + 'options': {'queue': QueueNames.PERIODIC} + }, + 'delete-email-notifications': { + 'task': 'delete-email-notifications', + 'schedule': crontab(minute=20, hour=0), + 'options': {'queue': QueueNames.PERIODIC} + }, + 'delete-letter-notifications': { + 'task': 'delete-letter-notifications', + 'schedule': crontab(minute=40, hour=0), + 'options': {'queue': QueueNames.PERIODIC} + }, + 'delete-inbound-sms': { + 'task': 'delete-inbound-sms', + 'schedule': crontab(minute=0, hour=1), + 'options': {'queue': QueueNames.PERIODIC} + }, + 'send-daily-performance-platform-stats': { + 'task': 'send-daily-performance-platform-stats', + 'schedule': crontab(minute=0, hour=2), + 'options': {'queue': QueueNames.PERIODIC} + }, + 'switch-current-sms-provider-on-slow-delivery': { + 'task': 'switch-current-sms-provider-on-slow-delivery', + 'schedule': crontab(), # Every minute + 'options': {'queue': QueueNames.PERIODIC} + }, + 'timeout-sending-notifications': { + 'task': 'timeout-sending-notifications', + 'schedule': crontab(minute=0, hour=3), + 'options': {'queue': QueueNames.PERIODIC} + }, + 'remove_sms_email_jobs': { + 'task': 'remove_csv_files', + 'schedule': crontab(minute=0, hour=4), + 'options': {'queue': QueueNames.PERIODIC}, + 'kwargs': {'job_types': [EMAIL_TYPE, SMS_TYPE]} + }, + 'remove_letter_jobs': { + 'task': 'remove_csv_files', + 'schedule': crontab(minute=20, hour=4), + 'options': {'queue': QueueNames.PERIODIC}, + 'kwargs': {'job_types': [LETTER_TYPE]} + }, + 'remove_transformed_dvla_files': { + 'task': 'remove_transformed_dvla_files', + 'schedule': crontab(minute=40, hour=4), + 'options': {'queue': QueueNames.PERIODIC} + }, + 'delete_dvla_response_files': { + 'task': 'delete_dvla_response_files', + 'schedule': crontab(minute=10, hour=5), + 'options': {'queue': QueueNames.PERIODIC} + }, + 'timeout-job-statistics': { + 'task': 'timeout-job-statistics', + 'schedule': crontab(minute=0, hour=5), + 'options': {'queue': QueueNames.PERIODIC} + } + } + task_queues = [] + + for queue in QueueNames.all_queues(): + task_queues.append( + Queue(queue, Exchange('default'), routing_key=queue) + ) class NotifyCelery(Celery): def init_app(self, app): - super().__init__(app.import_name, broker=app.config['BROKER_URL']) - self.conf.update(app.config) + super().__init__(app.import_name, broker=CeleryConfig.broker_url) + self.config_from_object(CeleryConfig()) TaskBase = self.Task class ContextTask(TaskBase): diff --git a/app/config.py b/app/config.py index 1bbbb2057..d4c48fcd1 100644 --- a/app/config.py +++ b/app/config.py @@ -18,6 +18,7 @@ if os.environ.get('VCAP_SERVICES'): extract_cloudfoundry_config() + class QueueNames(object): PERIODIC = 'periodic-tasks' PRIORITY = 'priority-tasks' @@ -46,6 +47,7 @@ class QueueNames(object): ] + class Config(object): # URL of admin app ADMIN_BASE_URL = os.environ['ADMIN_BASE_URL'] @@ -126,104 +128,6 @@ class Config(object): CHANGE_EMAIL_CONFIRMATION_TEMPLATE_ID = 'eb4d9930-87ab-4aef-9bce-786762687884' SERVICE_NOW_LIVE_TEMPLATE_ID = '618185c6-3636-49cd-b7d2-6f6f5eb3bdde' - BROKER_URL = 'sqs://' - BROKER_TRANSPORT_OPTIONS = { - 'region': AWS_REGION, - 'polling_interval': 1, # 1 second - 'visibility_timeout': 310, - 'queue_name_prefix': NOTIFICATION_QUEUE_PREFIX - } - CELERY_ENABLE_UTC = True, - CELERY_TIMEZONE = 'Europe/London' - CELERY_ACCEPT_CONTENT = ['json'] - CELERY_TASK_SERIALIZER = 'json' - CELERY_IMPORTS = ('app.celery.tasks', 'app.celery.scheduled_tasks') - CELERYBEAT_SCHEDULE = { - 'run-scheduled-jobs': { - 'task': 'run-scheduled-jobs', - 'schedule': crontab(minute=1), - 'options': {'queue': QueueNames.PERIODIC} - }, - # 'send-scheduled-notifications': { - # 'task': 'send-scheduled-notifications', - # 'schedule': crontab(minute='*/15'), - # 'options': {'queue': 'periodic'} - # }, - 'delete-verify-codes': { - 'task': 'delete-verify-codes', - 'schedule': timedelta(minutes=63), - 'options': {'queue': QueueNames.PERIODIC} - }, - 'delete-invitations': { - 'task': 'delete-invitations', - 'schedule': timedelta(minutes=66), - 'options': {'queue': QueueNames.PERIODIC} - }, - 'delete-sms-notifications': { - 'task': 'delete-sms-notifications', - 'schedule': crontab(minute=0, hour=0), - 'options': {'queue': QueueNames.PERIODIC} - }, - 'delete-email-notifications': { - 'task': 'delete-email-notifications', - 'schedule': crontab(minute=20, hour=0), - 'options': {'queue': QueueNames.PERIODIC} - }, - 'delete-letter-notifications': { - 'task': 'delete-letter-notifications', - 'schedule': crontab(minute=40, hour=0), - 'options': {'queue': QueueNames.PERIODIC} - }, - 'delete-inbound-sms': { - 'task': 'delete-inbound-sms', - 'schedule': crontab(minute=0, hour=1), - 'options': {'queue': QueueNames.PERIODIC} - }, - 'send-daily-performance-platform-stats': { - 'task': 'send-daily-performance-platform-stats', - 'schedule': crontab(minute=0, hour=2), - 'options': {'queue': QueueNames.PERIODIC} - }, - 'switch-current-sms-provider-on-slow-delivery': { - 'task': 'switch-current-sms-provider-on-slow-delivery', - 'schedule': crontab(), # Every minute - 'options': {'queue': QueueNames.PERIODIC} - }, - 'timeout-sending-notifications': { - 'task': 'timeout-sending-notifications', - 'schedule': crontab(minute=0, hour=3), - 'options': {'queue': QueueNames.PERIODIC} - }, - 'remove_sms_email_jobs': { - 'task': 'remove_csv_files', - 'schedule': crontab(minute=0, hour=4), - 'options': {'queue': QueueNames.PERIODIC}, - 'kwargs': {'job_types': [EMAIL_TYPE, SMS_TYPE]} - }, - 'remove_letter_jobs': { - 'task': 'remove_csv_files', - 'schedule': crontab(minute=20, hour=4), - 'options': {'queue': QueueNames.PERIODIC}, - 'kwargs': {'job_types': [LETTER_TYPE]} - }, - 'remove_transformed_dvla_files': { - 'task': 'remove_transformed_dvla_files', - 'schedule': crontab(minute=40, hour=4), - 'options': {'queue': QueueNames.PERIODIC} - }, - 'delete_dvla_response_files': { - 'task': 'delete_dvla_response_files', - 'schedule': crontab(minute=10, hour=5), - 'options': {'queue': QueueNames.PERIODIC} - }, - 'timeout-job-statistics': { - 'task': 'timeout-job-statistics', - 'schedule': crontab(minute=0, hour=5), - 'options': {'queue': QueueNames.PERIODIC} - } - } - CELERY_QUEUES = [] - NOTIFICATIONS_ALERT = 5 # five mins FROM_NUMBER = 'development' @@ -279,11 +183,6 @@ class Development(Config): NOTIFICATION_QUEUE_PREFIX = 'development' DEBUG = True - for queue in QueueNames.all_queues(): - Config.CELERY_QUEUES.append( - Queue(queue, Exchange('default'), routing_key=queue) - ) - API_HOST_NAME = "http://localhost:6011" API_RATE_LIMIT_ENABLED = True @@ -300,13 +199,6 @@ class Test(Config): STATSD_HOST = "localhost" STATSD_PORT = 1000 - BROKER_URL = 'you-forgot-to-mock-celery-in-your-tests://' - - for queue in QueueNames.all_queues(): - Config.CELERY_QUEUES.append( - Queue(queue, Exchange('default'), routing_key=queue) - ) - API_RATE_LIMIT_ENABLED = True API_HOST_NAME = "http://localhost:6011" diff --git a/requirements.txt b/requirements.txt index b1d1cd307..1e1f7014c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -13,7 +13,7 @@ flask-marshmallow==0.6.2 Flask-Bcrypt==0.6.2 credstash==1.8.0 boto3==1.4.4 -celery==3.1.25 +celery==4.0.2 monotonic==1.2 statsd==3.2.1 jsonschema==2.5.1 @@ -21,6 +21,7 @@ gunicorn==19.6.0 docopt==0.6.2 six==1.10.0 iso8601==0.1.11 +pycurl==7.43.0 # pin to minor version 3.1.x notifications-python-client>=3.1,<3.2