From e0106eb1bef47e1f9c3a05d5362ba0a440b33a7c Mon Sep 17 00:00:00 2001 From: Martyn Inglis Date: Thu, 1 Jun 2017 14:32:19 +0100 Subject: [PATCH 1/7] hacked celery4.0.2 in. Runs and works - note though this version of master I branched had split head on sqlalchemey. This needs a new master merge to fix --- app/celery/celery.py | 116 ++++++++++++++++++++++++++++++++++++++++++- app/config.py | 112 +---------------------------------------- requirements.txt | 3 +- 3 files changed, 118 insertions(+), 113 deletions(-) diff --git a/app/celery/celery.py b/app/celery/celery.py index 183e50bd6..9d0fe7c88 100644 --- a/app/celery/celery.py +++ b/app/celery/celery.py @@ -1,11 +1,123 @@ +from datetime import timedelta + from celery import Celery +from celery.schedules import crontab +from kombu import Queue, Exchange + +from app.config import QueueNames + +# BROKER_URL = 'you-forgot-to-mock-celery-in-your-tests://' +class CeleryConfig(object): + + broker_url = 'sqs://' + broker_transport_options = { + 'region': 'sqs.eu-west-1', + 'polling_interval': 1, # 1 second + 'visibility_timeout': 310, + 'queue_name_prefix': 'martyn-' + } + enable_utc = True, + timezone = 'Europe/London' + accept_content = ['json'] + task_serializer = 'json' + imports = ('app.celery.tasks', 'app.celery.scheduled_tasks') + beat_schedule = { + 'run-scheduled-jobs': { + 'task': 'run-scheduled-jobs', + 'schedule': crontab(minute=1), + 'options': {'queue': QueueNames.PERIODIC} + }, + # 'send-scheduled-notifications': { + # 'task': 'send-scheduled-notifications', + # 'schedule': crontab(minute='*/15'), + # 'options': {'queue': 'periodic'} + # }, + 'delete-verify-codes': { + 'task': 'delete-verify-codes', + 'schedule': timedelta(minutes=63), + 'options': {'queue': QueueNames.PERIODIC} + }, + 'delete-invitations': { + 'task': 'delete-invitations', + 'schedule': timedelta(minutes=66), + 'options': {'queue': QueueNames.PERIODIC} + }, + 'delete-sms-notifications': { + 'task': 'delete-sms-notifications', + 'schedule': crontab(minute=0, hour=0), + 'options': {'queue': QueueNames.PERIODIC} + }, + 'delete-email-notifications': { + 'task': 'delete-email-notifications', + 'schedule': crontab(minute=20, hour=0), + 'options': {'queue': QueueNames.PERIODIC} + }, + 'delete-letter-notifications': { + 'task': 'delete-letter-notifications', + 'schedule': crontab(minute=40, hour=0), + 'options': {'queue': QueueNames.PERIODIC} + }, + 'delete-inbound-sms': { + 'task': 'delete-inbound-sms', + 'schedule': crontab(minute=0, hour=1), + 'options': {'queue': QueueNames.PERIODIC} + }, + 'send-daily-performance-platform-stats': { + 'task': 'send-daily-performance-platform-stats', + 'schedule': crontab(minute=0, hour=2), + 'options': {'queue': QueueNames.PERIODIC} + }, + 'switch-current-sms-provider-on-slow-delivery': { + 'task': 'switch-current-sms-provider-on-slow-delivery', + 'schedule': crontab(), # Every minute + 'options': {'queue': QueueNames.PERIODIC} + }, + 'timeout-sending-notifications': { + 'task': 'timeout-sending-notifications', + 'schedule': crontab(minute=0, hour=3), + 'options': {'queue': QueueNames.PERIODIC} + }, + 'remove_sms_email_jobs': { + 'task': 'remove_csv_files', + 'schedule': crontab(minute=0, hour=4), + 'options': {'queue': QueueNames.PERIODIC}, + 'kwargs': {'job_types': [EMAIL_TYPE, SMS_TYPE]} + }, + 'remove_letter_jobs': { + 'task': 'remove_csv_files', + 'schedule': crontab(minute=20, hour=4), + 'options': {'queue': QueueNames.PERIODIC}, + 'kwargs': {'job_types': [LETTER_TYPE]} + }, + 'remove_transformed_dvla_files': { + 'task': 'remove_transformed_dvla_files', + 'schedule': crontab(minute=40, hour=4), + 'options': {'queue': QueueNames.PERIODIC} + }, + 'delete_dvla_response_files': { + 'task': 'delete_dvla_response_files', + 'schedule': crontab(minute=10, hour=5), + 'options': {'queue': QueueNames.PERIODIC} + }, + 'timeout-job-statistics': { + 'task': 'timeout-job-statistics', + 'schedule': crontab(minute=0, hour=5), + 'options': {'queue': QueueNames.PERIODIC} + } + } + task_queues = [] + + for queue in QueueNames.all_queues(): + task_queues.append( + Queue(queue, Exchange('default'), routing_key=queue) + ) class NotifyCelery(Celery): def init_app(self, app): - super().__init__(app.import_name, broker=app.config['BROKER_URL']) - self.conf.update(app.config) + super().__init__(app.import_name, broker=CeleryConfig.broker_url) + self.config_from_object(CeleryConfig()) TaskBase = self.Task class ContextTask(TaskBase): diff --git a/app/config.py b/app/config.py index 1bbbb2057..d4c48fcd1 100644 --- a/app/config.py +++ b/app/config.py @@ -18,6 +18,7 @@ if os.environ.get('VCAP_SERVICES'): extract_cloudfoundry_config() + class QueueNames(object): PERIODIC = 'periodic-tasks' PRIORITY = 'priority-tasks' @@ -46,6 +47,7 @@ class QueueNames(object): ] + class Config(object): # URL of admin app ADMIN_BASE_URL = os.environ['ADMIN_BASE_URL'] @@ -126,104 +128,6 @@ class Config(object): CHANGE_EMAIL_CONFIRMATION_TEMPLATE_ID = 'eb4d9930-87ab-4aef-9bce-786762687884' SERVICE_NOW_LIVE_TEMPLATE_ID = '618185c6-3636-49cd-b7d2-6f6f5eb3bdde' - BROKER_URL = 'sqs://' - BROKER_TRANSPORT_OPTIONS = { - 'region': AWS_REGION, - 'polling_interval': 1, # 1 second - 'visibility_timeout': 310, - 'queue_name_prefix': NOTIFICATION_QUEUE_PREFIX - } - CELERY_ENABLE_UTC = True, - CELERY_TIMEZONE = 'Europe/London' - CELERY_ACCEPT_CONTENT = ['json'] - CELERY_TASK_SERIALIZER = 'json' - CELERY_IMPORTS = ('app.celery.tasks', 'app.celery.scheduled_tasks') - CELERYBEAT_SCHEDULE = { - 'run-scheduled-jobs': { - 'task': 'run-scheduled-jobs', - 'schedule': crontab(minute=1), - 'options': {'queue': QueueNames.PERIODIC} - }, - # 'send-scheduled-notifications': { - # 'task': 'send-scheduled-notifications', - # 'schedule': crontab(minute='*/15'), - # 'options': {'queue': 'periodic'} - # }, - 'delete-verify-codes': { - 'task': 'delete-verify-codes', - 'schedule': timedelta(minutes=63), - 'options': {'queue': QueueNames.PERIODIC} - }, - 'delete-invitations': { - 'task': 'delete-invitations', - 'schedule': timedelta(minutes=66), - 'options': {'queue': QueueNames.PERIODIC} - }, - 'delete-sms-notifications': { - 'task': 'delete-sms-notifications', - 'schedule': crontab(minute=0, hour=0), - 'options': {'queue': QueueNames.PERIODIC} - }, - 'delete-email-notifications': { - 'task': 'delete-email-notifications', - 'schedule': crontab(minute=20, hour=0), - 'options': {'queue': QueueNames.PERIODIC} - }, - 'delete-letter-notifications': { - 'task': 'delete-letter-notifications', - 'schedule': crontab(minute=40, hour=0), - 'options': {'queue': QueueNames.PERIODIC} - }, - 'delete-inbound-sms': { - 'task': 'delete-inbound-sms', - 'schedule': crontab(minute=0, hour=1), - 'options': {'queue': QueueNames.PERIODIC} - }, - 'send-daily-performance-platform-stats': { - 'task': 'send-daily-performance-platform-stats', - 'schedule': crontab(minute=0, hour=2), - 'options': {'queue': QueueNames.PERIODIC} - }, - 'switch-current-sms-provider-on-slow-delivery': { - 'task': 'switch-current-sms-provider-on-slow-delivery', - 'schedule': crontab(), # Every minute - 'options': {'queue': QueueNames.PERIODIC} - }, - 'timeout-sending-notifications': { - 'task': 'timeout-sending-notifications', - 'schedule': crontab(minute=0, hour=3), - 'options': {'queue': QueueNames.PERIODIC} - }, - 'remove_sms_email_jobs': { - 'task': 'remove_csv_files', - 'schedule': crontab(minute=0, hour=4), - 'options': {'queue': QueueNames.PERIODIC}, - 'kwargs': {'job_types': [EMAIL_TYPE, SMS_TYPE]} - }, - 'remove_letter_jobs': { - 'task': 'remove_csv_files', - 'schedule': crontab(minute=20, hour=4), - 'options': {'queue': QueueNames.PERIODIC}, - 'kwargs': {'job_types': [LETTER_TYPE]} - }, - 'remove_transformed_dvla_files': { - 'task': 'remove_transformed_dvla_files', - 'schedule': crontab(minute=40, hour=4), - 'options': {'queue': QueueNames.PERIODIC} - }, - 'delete_dvla_response_files': { - 'task': 'delete_dvla_response_files', - 'schedule': crontab(minute=10, hour=5), - 'options': {'queue': QueueNames.PERIODIC} - }, - 'timeout-job-statistics': { - 'task': 'timeout-job-statistics', - 'schedule': crontab(minute=0, hour=5), - 'options': {'queue': QueueNames.PERIODIC} - } - } - CELERY_QUEUES = [] - NOTIFICATIONS_ALERT = 5 # five mins FROM_NUMBER = 'development' @@ -279,11 +183,6 @@ class Development(Config): NOTIFICATION_QUEUE_PREFIX = 'development' DEBUG = True - for queue in QueueNames.all_queues(): - Config.CELERY_QUEUES.append( - Queue(queue, Exchange('default'), routing_key=queue) - ) - API_HOST_NAME = "http://localhost:6011" API_RATE_LIMIT_ENABLED = True @@ -300,13 +199,6 @@ class Test(Config): STATSD_HOST = "localhost" STATSD_PORT = 1000 - BROKER_URL = 'you-forgot-to-mock-celery-in-your-tests://' - - for queue in QueueNames.all_queues(): - Config.CELERY_QUEUES.append( - Queue(queue, Exchange('default'), routing_key=queue) - ) - API_RATE_LIMIT_ENABLED = True API_HOST_NAME = "http://localhost:6011" diff --git a/requirements.txt b/requirements.txt index b1d1cd307..1e1f7014c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -13,7 +13,7 @@ flask-marshmallow==0.6.2 Flask-Bcrypt==0.6.2 credstash==1.8.0 boto3==1.4.4 -celery==3.1.25 +celery==4.0.2 monotonic==1.2 statsd==3.2.1 jsonschema==2.5.1 @@ -21,6 +21,7 @@ gunicorn==19.6.0 docopt==0.6.2 six==1.10.0 iso8601==0.1.11 +pycurl==7.43.0 # pin to minor version 3.1.x notifications-python-client>=3.1,<3.2 From 786adb5d7186ecb785ab735d89f7da09d62d6c68 Mon Sep 17 00:00:00 2001 From: Martyn Inglis Date: Fri, 9 Jun 2017 12:24:54 +0100 Subject: [PATCH 2/7] Move Queuenames in with the celery code, revamp config to allow move to celery 4.x --- app/celery/__init__.py | 27 +++++++++++++++ app/celery/celery.py | 17 ++++++---- app/celery/provider_tasks.py | 2 +- app/celery/research_mode_tasks.py | 1 - app/celery/scheduled_tasks.py | 2 +- app/celery/statistics_tasks.py | 2 +- app/celery/tasks.py | 2 +- app/config.py | 34 ------------------- app/delivery/rest.py | 2 +- app/invite/rest.py | 2 +- app/job/rest.py | 2 +- app/letters/send_letter_jobs.py | 2 +- .../notifications_letter_callback.py | 2 +- app/notifications/process_notifications.py | 2 +- app/notifications/rest.py | 2 +- app/service/sender.py | 2 +- app/user/rest.py | 2 +- app/v2/notifications/post_notifications.py | 7 ++-- 18 files changed, 52 insertions(+), 60 deletions(-) diff --git a/app/celery/__init__.py b/app/celery/__init__.py index e69de29bb..270bb54dd 100644 --- a/app/celery/__init__.py +++ b/app/celery/__init__.py @@ -0,0 +1,27 @@ + +class QueueNames(object): + PERIODIC = 'periodic-tasks' + PRIORITY = 'priority-tasks' + DATABASE = 'database-tasks' + SEND = 'send-tasks' + RESEARCH_MODE = 'research-mode-tasks' + STATISTICS = 'statistics-tasks' + JOBS = 'job-tasks' + RETRY = 'retry-tasks' + NOTIFY = 'notify-internal-tasks' + PROCESS_FTP = 'process-ftp-tasks' + + @staticmethod + def all_queues(): + return [ + QueueNames.PRIORITY, + QueueNames.PERIODIC, + QueueNames.DATABASE, + QueueNames.SEND, + QueueNames.RESEARCH_MODE, + QueueNames.STATISTICS, + QueueNames.JOBS, + QueueNames.RETRY, + QueueNames.NOTIFY, + QueueNames.PROCESS_FTP + ] diff --git a/app/celery/celery.py b/app/celery/celery.py index 9d0fe7c88..f3d40a2c8 100644 --- a/app/celery/celery.py +++ b/app/celery/celery.py @@ -1,5 +1,6 @@ from datetime import timedelta +from app.celery import QueueNames from celery import Celery from celery.schedules import crontab from kombu import Queue, Exchange @@ -8,7 +9,6 @@ from app.config import QueueNames # BROKER_URL = 'you-forgot-to-mock-celery-in-your-tests://' class CeleryConfig(object): - broker_url = 'sqs://' broker_transport_options = { 'region': 'sqs.eu-west-1', @@ -107,16 +107,11 @@ class CeleryConfig(object): } task_queues = [] - for queue in QueueNames.all_queues(): - task_queues.append( - Queue(queue, Exchange('default'), routing_key=queue) - ) - class NotifyCelery(Celery): - def init_app(self, app): super().__init__(app.import_name, broker=CeleryConfig.broker_url) + self.init_queues_if_needed(app.config['NOTIFY_ENVIRONMENT']) self.config_from_object(CeleryConfig()) TaskBase = self.Task @@ -126,4 +121,12 @@ class NotifyCelery(Celery): def __call__(self, *args, **kwargs): with app.app_context(): return TaskBase.__call__(self, *args, **kwargs) + self.Task = ContextTask + + def init_queues_if_needed(self, environment): + if environment in ['development', 'test']: + for queue in QueueNames.all_queues(): + CeleryConfig.task_queues.append( + Queue(queue, Exchange('default'), routing_key=queue) + ) diff --git a/app/celery/provider_tasks.py b/app/celery/provider_tasks.py index 50d5a31b7..e748c3bd2 100644 --- a/app/celery/provider_tasks.py +++ b/app/celery/provider_tasks.py @@ -3,7 +3,7 @@ from notifications_utils.recipients import InvalidEmailError from sqlalchemy.orm.exc import NoResultFound from app import notify_celery -from app.config import QueueNames +from app.celery import QueueNames from app.dao import notifications_dao from app.dao.notifications_dao import update_notification_status_by_id from app.statsd_decorators import statsd diff --git a/app/celery/research_mode_tasks.py b/app/celery/research_mode_tasks.py index ced35b1d4..356bc3488 100644 --- a/app/celery/research_mode_tasks.py +++ b/app/celery/research_mode_tasks.py @@ -1,7 +1,6 @@ import json from flask import current_app -from app import notify_celery from requests import request, RequestException, HTTPError from app.models import SMS_TYPE diff --git a/app/celery/scheduled_tasks.py b/app/celery/scheduled_tasks.py index e86a7c113..b14f7a5be 100644 --- a/app/celery/scheduled_tasks.py +++ b/app/celery/scheduled_tasks.py @@ -28,7 +28,7 @@ from app.models import LETTER_TYPE from app.notifications.process_notifications import send_notification_to_queue from app.statsd_decorators import statsd from app.celery.tasks import process_job -from app.config import QueueNames +from app.celery import QueueNames @notify_celery.task(name="remove_csv_files") diff --git a/app/celery/statistics_tasks.py b/app/celery/statistics_tasks.py index 150fa6aac..b00f88bdd 100644 --- a/app/celery/statistics_tasks.py +++ b/app/celery/statistics_tasks.py @@ -10,7 +10,7 @@ from app.dao.statistics_dao import ( ) from app.dao.notifications_dao import get_notification_by_id from app.models import NOTIFICATION_STATUS_TYPES_COMPLETED -from app.config import QueueNames +from app.celery import QueueNames def create_initial_notification_statistic_tasks(notification): diff --git a/app/celery/tasks.py b/app/celery/tasks.py index da9d1f9ee..bbd1fa207 100644 --- a/app/celery/tasks.py +++ b/app/celery/tasks.py @@ -19,8 +19,8 @@ from app import ( ) from app.aws import s3 from app.celery import provider_tasks -from app.config import QueueNames from app.dao.inbound_sms_dao import dao_get_inbound_sms_by_id +from app.celery import QueueNames from app.dao.jobs_dao import ( dao_update_job, dao_get_job_by_id, diff --git a/app/config.py b/app/config.py index d4c48fcd1..bde445381 100644 --- a/app/config.py +++ b/app/config.py @@ -1,10 +1,6 @@ -from datetime import timedelta import os import json -from celery.schedules import crontab -from kombu import Exchange, Queue - from app.models import ( EMAIL_TYPE, SMS_TYPE, LETTER_TYPE, KEY_TYPE_NORMAL, KEY_TYPE_TEAM, KEY_TYPE_TEST @@ -18,36 +14,6 @@ if os.environ.get('VCAP_SERVICES'): extract_cloudfoundry_config() - -class QueueNames(object): - PERIODIC = 'periodic-tasks' - PRIORITY = 'priority-tasks' - DATABASE = 'database-tasks' - SEND = 'send-tasks' - RESEARCH_MODE = 'research-mode-tasks' - STATISTICS = 'statistics-tasks' - JOBS = 'job-tasks' - RETRY = 'retry-tasks' - NOTIFY = 'notify-internal-tasks' - PROCESS_FTP = 'process-ftp-tasks' - - @staticmethod - def all_queues(): - return [ - QueueNames.PRIORITY, - QueueNames.PERIODIC, - QueueNames.DATABASE, - QueueNames.SEND, - QueueNames.RESEARCH_MODE, - QueueNames.STATISTICS, - QueueNames.JOBS, - QueueNames.RETRY, - QueueNames.NOTIFY, - QueueNames.PROCESS_FTP - ] - - - class Config(object): # URL of admin app ADMIN_BASE_URL = os.environ['ADMIN_BASE_URL'] diff --git a/app/delivery/rest.py b/app/delivery/rest.py index 489a5fcda..5f4abb70b 100644 --- a/app/delivery/rest.py +++ b/app/delivery/rest.py @@ -1,6 +1,6 @@ from flask import Blueprint, jsonify -from app.config import QueueNames +from app.celery import QueueNames from app.delivery import send_to_providers from app.models import EMAIL_TYPE from app.celery import provider_tasks diff --git a/app/invite/rest.py b/app/invite/rest.py index 8105b171f..e1e401190 100644 --- a/app/invite/rest.py +++ b/app/invite/rest.py @@ -4,7 +4,7 @@ from flask import ( jsonify, current_app) -from app.config import QueueNames +from app.celery import QueueNames from app.dao.invited_user_dao import ( save_invited_user, get_invited_user, diff --git a/app/job/rest.py b/app/job/rest.py index 6a8a86ee4..e25aa5fcb 100644 --- a/app/job/rest.py +++ b/app/job/rest.py @@ -36,7 +36,7 @@ from app.models import JOB_STATUS_SCHEDULED, JOB_STATUS_PENDING, JOB_STATUS_CANC from app.utils import pagination_links -from app.config import QueueNames +from app.celery import QueueNames job_blueprint = Blueprint('job', __name__, url_prefix='/service//job') diff --git a/app/letters/send_letter_jobs.py b/app/letters/send_letter_jobs.py index 91c39615a..f90d7f280 100644 --- a/app/letters/send_letter_jobs.py +++ b/app/letters/send_letter_jobs.py @@ -2,7 +2,7 @@ from flask import Blueprint, jsonify from flask import request from app import notify_celery -from app.config import QueueNames +from app.celery import QueueNames from app.dao.jobs_dao import dao_get_all_letter_jobs from app.schemas import job_schema from app.v2.errors import register_errors diff --git a/app/notifications/notifications_letter_callback.py b/app/notifications/notifications_letter_callback.py index ac2de9e5c..b8b587cf0 100644 --- a/app/notifications/notifications_letter_callback.py +++ b/app/notifications/notifications_letter_callback.py @@ -13,7 +13,7 @@ from app.celery.tasks import update_letter_notifications_statuses from app.v2.errors import register_errors from app.notifications.utils import autoconfirm_subscription from app.schema_validation import validate -from app.config import QueueNames +from app.celery import QueueNames letter_callback_blueprint = Blueprint('notifications_letter_callback', __name__) register_errors(letter_callback_blueprint) diff --git a/app/notifications/process_notifications.py b/app/notifications/process_notifications.py index ca3ae0b53..da757db4b 100644 --- a/app/notifications/process_notifications.py +++ b/app/notifications/process_notifications.py @@ -12,7 +12,7 @@ from app import redis_store from app.celery import provider_tasks from notifications_utils.clients import redis -from app.config import QueueNames +from app.celery import QueueNames from app.models import SMS_TYPE, Notification, KEY_TYPE_TEST, EMAIL_TYPE, ScheduledNotification from app.dao.notifications_dao import (dao_create_notification, dao_delete_notifications_and_history_by_id, diff --git a/app/notifications/rest.py b/app/notifications/rest.py index 5c7f8fdea..16ba13fee 100644 --- a/app/notifications/rest.py +++ b/app/notifications/rest.py @@ -6,7 +6,7 @@ from flask import ( ) from app import api_user, authenticated_service -from app.config import QueueNames +from app.celery import QueueNames from app.dao import ( templates_dao, notifications_dao diff --git a/app/service/sender.py b/app/service/sender.py index 4919a93bf..3c4ef17d9 100644 --- a/app/service/sender.py +++ b/app/service/sender.py @@ -1,6 +1,6 @@ from flask import current_app -from app.config import QueueNames +from app.celery import QueueNames from app.dao.services_dao import dao_fetch_service_by_id, dao_fetch_active_users_for_service from app.dao.templates_dao import dao_get_template_by_id from app.models import EMAIL_TYPE, KEY_TYPE_NORMAL diff --git a/app/user/rest.py b/app/user/rest.py index 9a9cf3832..45c649940 100644 --- a/app/user/rest.py +++ b/app/user/rest.py @@ -4,7 +4,7 @@ from datetime import datetime from flask import (jsonify, request, Blueprint, current_app) -from app.config import QueueNames +from app.celery import QueueNames from app.dao.users_dao import ( get_user_by_id, save_model_user, diff --git a/app/v2/notifications/post_notifications.py b/app/v2/notifications/post_notifications.py index 274005386..f9da36f88 100644 --- a/app/v2/notifications/post_notifications.py +++ b/app/v2/notifications/post_notifications.py @@ -1,8 +1,8 @@ from flask import request, jsonify, current_app from app import api_user, authenticated_service -from app.config import QueueNames -from app.models import SMS_TYPE, EMAIL_TYPE, PRIORITY, SCHEDULE_NOTIFICATIONS +from app.models import SMS_TYPE, EMAIL_TYPE, PRIORITY +from app.celery import QueueNames from app.notifications.process_notifications import ( persist_notification, send_notification_to_queue, @@ -11,20 +11,17 @@ from app.notifications.process_notifications import ( from app.notifications.validators import ( validate_and_format_recipient, check_rate_limiting, - service_has_permission, check_service_can_schedule_notification, check_service_has_permission, validate_template ) from app.schema_validation import validate -from app.utils import get_public_notify_type_text from app.v2.notifications import v2_notification_blueprint from app.v2.notifications.notification_schemas import ( post_sms_request, create_post_sms_response_from_notification, post_email_request, create_post_email_response_from_notification) -from app.v2.errors import BadRequestError @v2_notification_blueprint.route('/', methods=['POST']) From 0e9e8955f7ffa9aed2e34b931ef5d2edce8f24fb Mon Sep 17 00:00:00 2001 From: Martyn Inglis Date: Fri, 9 Jun 2017 16:20:02 +0100 Subject: [PATCH 3/7] Finished celery refactor - set up config for queue prefix LEO notes: Also made sure the Test BROKER_URL is preserved so that tests warn you when celery isn't mocked out --- app/__init__.py | 1 - app/celery/celery.py | 31 +++++++++++++---------- app/config.py | 6 ++++- tests/app/celery/test_statistics_tasks.py | 15 ++++++++--- 4 files changed, 33 insertions(+), 20 deletions(-) diff --git a/app/__init__.py b/app/__init__.py index e0ec02723..72b8f2885 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -49,7 +49,6 @@ def create_app(app_name=None): from app.config import configs notify_environment = os.environ['NOTIFY_ENVIRONMENT'] - application.config.from_object(configs[notify_environment]) if app_name: diff --git a/app/celery/celery.py b/app/celery/celery.py index f3d40a2c8..e85f929aa 100644 --- a/app/celery/celery.py +++ b/app/celery/celery.py @@ -7,14 +7,16 @@ from kombu import Queue, Exchange from app.config import QueueNames -# BROKER_URL = 'you-forgot-to-mock-celery-in-your-tests://' -class CeleryConfig(object): - broker_url = 'sqs://' +class CeleryConfig: + def __init__(self, config): + self.broker_transport_options['queue_name_prefix'] = config['NOTIFICATION_QUEUE_PREFIX'] + self.broker_url = config.get('BROKER_URL', 'sqs://') + broker_transport_options = { 'region': 'sqs.eu-west-1', 'polling_interval': 1, # 1 second 'visibility_timeout': 310, - 'queue_name_prefix': 'martyn-' + 'queue_name_prefix': None } enable_utc = True, timezone = 'Europe/London' @@ -110,9 +112,17 @@ class CeleryConfig(object): class NotifyCelery(Celery): def init_app(self, app): - super().__init__(app.import_name, broker=CeleryConfig.broker_url) - self.init_queues_if_needed(app.config['NOTIFY_ENVIRONMENT']) - self.config_from_object(CeleryConfig()) + celery_config = CeleryConfig(app.config) + + super().__init__(app.import_name, broker=celery_config.broker_url) + + if app.config['INITIALISE_QUEUES']: + for queue in QueueNames.all_queues(): + CeleryConfig.task_queues.append( + Queue(queue, Exchange('default'), routing_key=queue) + ) + + self.config_from_object() TaskBase = self.Task class ContextTask(TaskBase): @@ -123,10 +133,3 @@ class NotifyCelery(Celery): return TaskBase.__call__(self, *args, **kwargs) self.Task = ContextTask - - def init_queues_if_needed(self, environment): - if environment in ['development', 'test']: - for queue in QueueNames.all_queues(): - CeleryConfig.task_queues.append( - Queue(queue, Exchange('default'), routing_key=queue) - ) diff --git a/app/config.py b/app/config.py index bde445381..290cc3211 100644 --- a/app/config.py +++ b/app/config.py @@ -132,6 +132,7 @@ class Config(object): } FREE_SMS_TIER_FRAGMENT_COUNT = 250000 + INITIALISE_QUEUES = False SMS_INBOUND_WHITELIST = json.loads(os.environ.get('SMS_INBOUND_WHITELIST', '[]')) @@ -141,12 +142,12 @@ class Config(object): ###################### class Development(Config): + INITIALISE_QUEUES = True SQLALCHEMY_ECHO = False NOTIFY_EMAIL_DOMAIN = 'notify.tools' CSV_UPLOAD_BUCKET_NAME = 'development-notifications-csv-upload' DVLA_RESPONSE_BUCKET_NAME = 'notify.tools-ftp' NOTIFY_ENVIRONMENT = 'development' - NOTIFICATION_QUEUE_PREFIX = 'development' DEBUG = True API_HOST_NAME = "http://localhost:6011" @@ -154,6 +155,7 @@ class Development(Config): class Test(Config): + INITIALISE_QUEUES = True NOTIFY_EMAIL_DOMAIN = 'test.notify.com' FROM_NUMBER = 'testing' NOTIFY_ENVIRONMENT = 'test' @@ -165,6 +167,8 @@ class Test(Config): STATSD_HOST = "localhost" STATSD_PORT = 1000 + BROKER_URL = 'you-forgot-to-mock-celery-in-your-tests://' + API_RATE_LIMIT_ENABLED = True API_HOST_NAME = "http://localhost:6011" diff --git a/tests/app/celery/test_statistics_tasks.py b/tests/app/celery/test_statistics_tasks.py index 40d20117d..c97007f55 100644 --- a/tests/app/celery/test_statistics_tasks.py +++ b/tests/app/celery/test_statistics_tasks.py @@ -1,14 +1,21 @@ import pytest +from sqlalchemy.exc import SQLAlchemyError + +from app import create_uuid from app.celery.statistics_tasks import ( record_initial_job_statistics, record_outcome_job_statistics, create_initial_notification_statistic_tasks, create_outcome_notification_statistic_tasks) -from sqlalchemy.exc import SQLAlchemyError -from app import create_uuid +from app.models import ( + NOTIFICATION_STATUS_TYPES_COMPLETED, + NOTIFICATION_SENDING, + NOTIFICATION_PENDING, + NOTIFICATION_CREATED, + NOTIFICATION_DELIVERED +) + from tests.app.conftest import sample_notification -from app.models import NOTIFICATION_STATUS_TYPES_COMPLETED, NOTIFICATION_SENT, NOTIFICATION_SENDING, \ - NOTIFICATION_PENDING, NOTIFICATION_CREATED, NOTIFICATION_DELIVERED def test_should_create_initial_job_task_if_notification_is_related_to_a_job( From 1a03248317a7605586c06180bde23faa38f015c3 Mon Sep 17 00:00:00 2001 From: Leo Hemsted Date: Wed, 12 Jul 2017 13:02:19 +0100 Subject: [PATCH 4/7] temp fix to sort out circular imports --- app/__init__.py | 6 +++++- app/celery/celery.py | 12 +++++++----- app/notifications/receive_notifications.py | 2 +- app/service/send_notification.py | 2 +- tests/app/service/test_send_one_off_notification.py | 2 +- 5 files changed, 15 insertions(+), 9 deletions(-) diff --git a/app/__init__.py b/app/__init__.py index 72b8f2885..2e1445c43 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -13,7 +13,6 @@ from notifications_utils.clients.redis.redis_client import RedisClient from notifications_utils import logging, request_id from werkzeug.local import LocalProxy -from app.celery.celery import NotifyCelery from app.clients import Clients from app.clients.email.aws_ses import AwsSesClient from app.clients.sms.firetext import FiretextClient @@ -26,6 +25,11 @@ DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ" DATE_FORMAT = "%Y-%m-%d" db = SQLAlchemy() + +# avoid circular import by importing after db +from app.celery.celery import NotifyCelery + + ma = Marshmallow() notify_celery = NotifyCelery() firetext_client = FiretextClient() diff --git a/app/celery/celery.py b/app/celery/celery.py index e85f929aa..69fad159d 100644 --- a/app/celery/celery.py +++ b/app/celery/celery.py @@ -1,11 +1,11 @@ from datetime import timedelta -from app.celery import QueueNames from celery import Celery from celery.schedules import crontab from kombu import Queue, Exchange -from app.config import QueueNames +from app.celery import QueueNames + class CeleryConfig: def __init__(self, config): @@ -83,13 +83,15 @@ class CeleryConfig: 'task': 'remove_csv_files', 'schedule': crontab(minute=0, hour=4), 'options': {'queue': QueueNames.PERIODIC}, - 'kwargs': {'job_types': [EMAIL_TYPE, SMS_TYPE]} + # TODO: Avoid duplication of keywords - ideally by moving definitions out of models.py + 'kwargs': {'job_types': ['email', 'sms']} }, 'remove_letter_jobs': { 'task': 'remove_csv_files', 'schedule': crontab(minute=20, hour=4), 'options': {'queue': QueueNames.PERIODIC}, - 'kwargs': {'job_types': [LETTER_TYPE]} + # TODO: Avoid duplication of keywords - ideally by moving definitions out of models.py + 'kwargs': {'job_types': ['letter']} }, 'remove_transformed_dvla_files': { 'task': 'remove_transformed_dvla_files', @@ -122,7 +124,7 @@ class NotifyCelery(Celery): Queue(queue, Exchange('default'), routing_key=queue) ) - self.config_from_object() + self.config_from_object(celery_config) TaskBase = self.Task class ContextTask(TaskBase): diff --git a/app/notifications/receive_notifications.py b/app/notifications/receive_notifications.py index 34e726cf1..07847a45c 100644 --- a/app/notifications/receive_notifications.py +++ b/app/notifications/receive_notifications.py @@ -6,7 +6,7 @@ from notifications_utils.recipients import validate_and_format_phone_number from app import statsd_client, firetext_client, mmg_client from app.celery import tasks -from app.config import QueueNames +from app.celery import QueueNames from app.dao.services_dao import dao_fetch_services_by_sms_sender from app.dao.inbound_sms_dao import dao_create_inbound_sms from app.models import InboundSms, INBOUND_SMS_TYPE, SMS_TYPE diff --git a/app/service/send_notification.py b/app/service/send_notification.py index 1ca19661c..6119ac714 100644 --- a/app/service/send_notification.py +++ b/app/service/send_notification.py @@ -1,4 +1,4 @@ -from app.config import QueueNames +from app.celery import QueueNames from app.notifications.validators import ( check_service_over_daily_message_limit, validate_and_format_recipient, diff --git a/tests/app/service/test_send_one_off_notification.py b/tests/app/service/test_send_one_off_notification.py index 89ddcd91a..3baf886e7 100644 --- a/tests/app/service/test_send_one_off_notification.py +++ b/tests/app/service/test_send_one_off_notification.py @@ -5,7 +5,7 @@ import pytest from notifications_utils.recipients import InvalidPhoneError from app.v2.errors import BadRequestError, TooManyRequestsError -from app.config import QueueNames +from app.celery import QueueNames from app.service.send_notification import send_one_off_notification from app.models import KEY_TYPE_NORMAL, PRIORITY, SMS_TYPE From 4ced59f7d39a2dcf9606349f4617129d83e78bd2 Mon Sep 17 00:00:00 2001 From: Leo Hemsted Date: Wed, 12 Jul 2017 16:01:22 +0100 Subject: [PATCH 5/7] remove pycurl from reqs --- requirements.txt | 1 - 1 file changed, 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 1e1f7014c..7b558ecef 100644 --- a/requirements.txt +++ b/requirements.txt @@ -21,7 +21,6 @@ gunicorn==19.6.0 docopt==0.6.2 six==1.10.0 iso8601==0.1.11 -pycurl==7.43.0 # pin to minor version 3.1.x notifications-python-client>=3.1,<3.2 From 3331491ef1c5b10fbac1a6209a6f287cd64f6948 Mon Sep 17 00:00:00 2001 From: Leo Hemsted Date: Wed, 12 Jul 2017 17:17:02 +0100 Subject: [PATCH 6/7] add pycurl to requirements it's needed by kombu 4.0.2. Also moved import about --- app/__init__.py | 6 +----- docker/Dockerfile | 1 + requirements.txt | 1 + 3 files changed, 3 insertions(+), 5 deletions(-) diff --git a/app/__init__.py b/app/__init__.py index 2e1445c43..72b8f2885 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -13,6 +13,7 @@ from notifications_utils.clients.redis.redis_client import RedisClient from notifications_utils import logging, request_id from werkzeug.local import LocalProxy +from app.celery.celery import NotifyCelery from app.clients import Clients from app.clients.email.aws_ses import AwsSesClient from app.clients.sms.firetext import FiretextClient @@ -25,11 +26,6 @@ DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ" DATE_FORMAT = "%Y-%m-%d" db = SQLAlchemy() - -# avoid circular import by importing after db -from app.celery.celery import NotifyCelery - - ma = Marshmallow() notify_celery = NotifyCelery() firetext_client = FiretextClient() diff --git a/docker/Dockerfile b/docker/Dockerfile index 2ccfe0e1e..0a96a16f6 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -20,6 +20,7 @@ RUN \ zip \ libpq-dev \ jq \ + libcurl4-openssl-dev \ && echo "Clean up" \ && rm -rf /var/lib/apt/lists/* /tmp/* diff --git a/requirements.txt b/requirements.txt index 7b558ecef..1e1f7014c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -21,6 +21,7 @@ gunicorn==19.6.0 docopt==0.6.2 six==1.10.0 iso8601==0.1.11 +pycurl==7.43.0 # pin to minor version 3.1.x notifications-python-client>=3.1,<3.2 From 8679fc35d115c2e6389178abf125e4bf25dc7184 Mon Sep 17 00:00:00 2001 From: Leo Hemsted Date: Thu, 13 Jul 2017 11:31:45 +0100 Subject: [PATCH 7/7] pin kombu to fix SQS issues Kombu is a library that celery uses under the hood. Kombu is pinned to a specific commit that brings in fixes for SQS - see https://github.com/celery/kombu/pull/693. Kombu v4.0.2 (which ships with celery v4.0.2) doesn't work with SQS due to problems with their use of boto2, so Kombu migrated to boto3 - We're waiting for that to get a release version, and then to get a new version of celery that pulls that in. Until that point, we should override the kombu version to get these SQS fixes. Additionally, we don't need boto2 any more :) --- requirements.txt | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 1e1f7014c..452bfb96a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -32,4 +32,9 @@ awscli-cwlogs>=1.4,<1.5 git+https://github.com/alphagov/notifications-utils.git@17.5.3#egg=notifications-utils==17.5.3 -git+https://github.com/alphagov/boto.git@2.43.0-patch3#egg=boto==2.43.0-patch3 +# Kombu is a library that celery uses under the hood. +# Kombu is pinned to a specific commit that brings in fixes for SQS - see https://github.com/celery/kombu/pull/693 +# Kombu v4.0.2 (which ships with celery v4.0.2) doesn't work with SQS due to problems with their use of boto2, so +# Kombu migrated to boto3 - We're waiting for that to get a release version, and then to get a new version of celery +# that pulls that in. Until that point, we should override the kombu version to get these SQS fixes. +git+https://github.com/celery/kombu@09bd23bbd83344b09cbf38b7257107e560db9f25