Merge branch 'master' into month-billing-table

This commit is contained in:
Rebecca Law
2017-07-24 15:17:36 +01:00
33 changed files with 286 additions and 269 deletions

View File

@@ -49,6 +49,7 @@ def create_app(app_name=None):
from app.config import configs
notify_environment = os.environ['NOTIFY_ENVIRONMENT']
application.config.from_object(configs[notify_environment])
if app_name:

View File

@@ -1,27 +0,0 @@
class QueueNames(object):
PERIODIC = 'periodic-tasks'
PRIORITY = 'priority-tasks'
DATABASE = 'database-tasks'
SEND = 'send-tasks'
RESEARCH_MODE = 'research-mode-tasks'
STATISTICS = 'statistics-tasks'
JOBS = 'job-tasks'
RETRY = 'retry-tasks'
NOTIFY = 'notify-internal-tasks'
PROCESS_FTP = 'process-ftp-tasks'
@staticmethod
def all_queues():
return [
QueueNames.PRIORITY,
QueueNames.PERIODIC,
QueueNames.DATABASE,
QueueNames.SEND,
QueueNames.RESEARCH_MODE,
QueueNames.STATISTICS,
QueueNames.JOBS,
QueueNames.RETRY,
QueueNames.NOTIFY,
QueueNames.PROCESS_FTP
]

View File

@@ -1,130 +1,11 @@
from datetime import timedelta
from celery import Celery
from celery.schedules import crontab
from kombu import Queue, Exchange
from app.celery import QueueNames
class CeleryConfig:
def __init__(self, config):
self.broker_transport_options['queue_name_prefix'] = config['NOTIFICATION_QUEUE_PREFIX']
self.broker_url = config.get('BROKER_URL', 'sqs://')
broker_transport_options = {
'region': 'eu-west-1',
'polling_interval': 1, # 1 second
'visibility_timeout': 310,
'queue_name_prefix': None
}
enable_utc = True,
timezone = 'Europe/London'
accept_content = ['json']
task_serializer = 'json'
imports = ('app.celery.tasks', 'app.celery.scheduled_tasks')
beat_schedule = {
'run-scheduled-jobs': {
'task': 'run-scheduled-jobs',
'schedule': crontab(minute=1),
'options': {'queue': QueueNames.PERIODIC}
},
# 'send-scheduled-notifications': {
# 'task': 'send-scheduled-notifications',
# 'schedule': crontab(minute='*/15'),
# 'options': {'queue': 'periodic'}
# },
'delete-verify-codes': {
'task': 'delete-verify-codes',
'schedule': timedelta(minutes=63),
'options': {'queue': QueueNames.PERIODIC}
},
'delete-invitations': {
'task': 'delete-invitations',
'schedule': timedelta(minutes=66),
'options': {'queue': QueueNames.PERIODIC}
},
'delete-sms-notifications': {
'task': 'delete-sms-notifications',
'schedule': crontab(minute=0, hour=0),
'options': {'queue': QueueNames.PERIODIC}
},
'delete-email-notifications': {
'task': 'delete-email-notifications',
'schedule': crontab(minute=20, hour=0),
'options': {'queue': QueueNames.PERIODIC}
},
'delete-letter-notifications': {
'task': 'delete-letter-notifications',
'schedule': crontab(minute=40, hour=0),
'options': {'queue': QueueNames.PERIODIC}
},
'delete-inbound-sms': {
'task': 'delete-inbound-sms',
'schedule': crontab(minute=0, hour=1),
'options': {'queue': QueueNames.PERIODIC}
},
'send-daily-performance-platform-stats': {
'task': 'send-daily-performance-platform-stats',
'schedule': crontab(minute=0, hour=2),
'options': {'queue': QueueNames.PERIODIC}
},
'switch-current-sms-provider-on-slow-delivery': {
'task': 'switch-current-sms-provider-on-slow-delivery',
'schedule': crontab(), # Every minute
'options': {'queue': QueueNames.PERIODIC}
},
'timeout-sending-notifications': {
'task': 'timeout-sending-notifications',
'schedule': crontab(minute=0, hour=3),
'options': {'queue': QueueNames.PERIODIC}
},
'remove_sms_email_jobs': {
'task': 'remove_csv_files',
'schedule': crontab(minute=0, hour=4),
'options': {'queue': QueueNames.PERIODIC},
# TODO: Avoid duplication of keywords - ideally by moving definitions out of models.py
'kwargs': {'job_types': ['email', 'sms']}
},
'remove_letter_jobs': {
'task': 'remove_csv_files',
'schedule': crontab(minute=20, hour=4),
'options': {'queue': QueueNames.PERIODIC},
# TODO: Avoid duplication of keywords - ideally by moving definitions out of models.py
'kwargs': {'job_types': ['letter']}
},
'remove_transformed_dvla_files': {
'task': 'remove_transformed_dvla_files',
'schedule': crontab(minute=40, hour=4),
'options': {'queue': QueueNames.PERIODIC}
},
'delete_dvla_response_files': {
'task': 'delete_dvla_response_files',
'schedule': crontab(minute=10, hour=5),
'options': {'queue': QueueNames.PERIODIC}
},
'timeout-job-statistics': {
'task': 'timeout-job-statistics',
'schedule': crontab(minute=0, hour=5),
'options': {'queue': QueueNames.PERIODIC}
}
}
task_queues = []
class NotifyCelery(Celery):
def init_app(self, app):
celery_config = CeleryConfig(app.config)
super().__init__(app.import_name, broker=celery_config.broker_url)
if app.config['INITIALISE_QUEUES']:
for queue in QueueNames.all_queues():
CeleryConfig.task_queues.append(
Queue(queue, Exchange('default'), routing_key=queue)
)
self.config_from_object(celery_config)
super().__init__(app.import_name, broker=app.config['BROKER_URL'])
self.conf.update(app.config)
TaskBase = self.Task
class ContextTask(TaskBase):
@@ -133,5 +14,4 @@ class NotifyCelery(Celery):
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
self.Task = ContextTask

View File

@@ -3,7 +3,7 @@ from notifications_utils.recipients import InvalidEmailError
from sqlalchemy.orm.exc import NoResultFound
from app import notify_celery
from app.celery import QueueNames
from app.config import QueueNames
from app.dao import notifications_dao
from app.dao.notifications_dao import update_notification_status_by_id
from app.statsd_decorators import statsd

View File

@@ -1,6 +1,7 @@
import json
from flask import current_app
from app import notify_celery
from requests import request, RequestException, HTTPError
from app.models import SMS_TYPE

View File

@@ -36,7 +36,7 @@ from app.models import LETTER_TYPE
from app.notifications.process_notifications import send_notification_to_queue
from app.statsd_decorators import statsd
from app.celery.tasks import process_job
from app.celery import QueueNames
from app.config import QueueNames
@notify_celery.task(name="remove_csv_files")

View File

@@ -10,7 +10,7 @@ from app.dao.statistics_dao import (
)
from app.dao.notifications_dao import get_notification_by_id
from app.models import NOTIFICATION_STATUS_TYPES_COMPLETED
from app.celery import QueueNames
from app.config import QueueNames
def create_initial_notification_statistic_tasks(notification):

View File

@@ -19,8 +19,8 @@ from app import (
)
from app.aws import s3
from app.celery import provider_tasks
from app.config import QueueNames
from app.dao.inbound_sms_dao import dao_get_inbound_sms_by_id
from app.celery import QueueNames
from app.dao.jobs_dao import (
dao_update_job,
dao_get_job_by_id,
@@ -182,7 +182,7 @@ def send_sms(self,
provider_tasks.deliver_sms.apply_async(
[str(saved_notification.id)],
queue=QueueNames.SEND if not service.research_mode else QueueNames.RESEARCH_MODE
queue=QueueNames.SEND_SMS if not service.research_mode else QueueNames.RESEARCH_MODE
)
current_app.logger.info(
@@ -227,7 +227,7 @@ def send_email(self,
provider_tasks.deliver_email.apply_async(
[str(saved_notification.id)],
queue=QueueNames.SEND if not service.research_mode else QueueNames.RESEARCH_MODE
queue=QueueNames.SEND_EMAIL if not service.research_mode else QueueNames.RESEARCH_MODE
)
current_app.logger.info("Email {} created at {}".format(saved_notification.id, created_at))

View File

@@ -1,6 +1,10 @@
from datetime import timedelta
import os
import json
from celery.schedules import crontab
from kombu import Exchange, Queue
from app.models import (
EMAIL_TYPE, SMS_TYPE, LETTER_TYPE,
KEY_TYPE_NORMAL, KEY_TYPE_TEAM, KEY_TYPE_TEST
@@ -14,6 +18,38 @@ if os.environ.get('VCAP_SERVICES'):
extract_cloudfoundry_config()
class QueueNames(object):
PERIODIC = 'periodic-tasks'
PRIORITY = 'priority-tasks'
DATABASE = 'database-tasks'
SEND_COMBINED = 'send-tasks'
SEND_SMS = 'send-sms-tasks'
SEND_EMAIL = 'send-email-tasks'
RESEARCH_MODE = 'research-mode-tasks'
STATISTICS = 'statistics-tasks'
JOBS = 'job-tasks'
RETRY = 'retry-tasks'
NOTIFY = 'notify-internal-tasks'
PROCESS_FTP = 'process-ftp-tasks'
@staticmethod
def all_queues():
return [
QueueNames.PRIORITY,
QueueNames.PERIODIC,
QueueNames.DATABASE,
QueueNames.SEND_COMBINED,
QueueNames.SEND_SMS,
QueueNames.SEND_EMAIL,
QueueNames.RESEARCH_MODE,
QueueNames.STATISTICS,
QueueNames.JOBS,
QueueNames.RETRY,
QueueNames.NOTIFY,
QueueNames.PROCESS_FTP
]
class Config(object):
# URL of admin app
ADMIN_BASE_URL = os.environ['ADMIN_BASE_URL']
@@ -94,6 +130,104 @@ class Config(object):
CHANGE_EMAIL_CONFIRMATION_TEMPLATE_ID = 'eb4d9930-87ab-4aef-9bce-786762687884'
SERVICE_NOW_LIVE_TEMPLATE_ID = '618185c6-3636-49cd-b7d2-6f6f5eb3bdde'
BROKER_URL = 'sqs://'
BROKER_TRANSPORT_OPTIONS = {
'region': AWS_REGION,
'polling_interval': 1, # 1 second
'visibility_timeout': 310,
'queue_name_prefix': NOTIFICATION_QUEUE_PREFIX
}
CELERY_ENABLE_UTC = True,
CELERY_TIMEZONE = 'Europe/London'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_IMPORTS = ('app.celery.tasks', 'app.celery.scheduled_tasks')
CELERYBEAT_SCHEDULE = {
'run-scheduled-jobs': {
'task': 'run-scheduled-jobs',
'schedule': crontab(minute=1),
'options': {'queue': QueueNames.PERIODIC}
},
# 'send-scheduled-notifications': {
# 'task': 'send-scheduled-notifications',
# 'schedule': crontab(minute='*/15'),
# 'options': {'queue': 'periodic'}
# },
'delete-verify-codes': {
'task': 'delete-verify-codes',
'schedule': timedelta(minutes=63),
'options': {'queue': QueueNames.PERIODIC}
},
'delete-invitations': {
'task': 'delete-invitations',
'schedule': timedelta(minutes=66),
'options': {'queue': QueueNames.PERIODIC}
},
'delete-sms-notifications': {
'task': 'delete-sms-notifications',
'schedule': crontab(minute=0, hour=0),
'options': {'queue': QueueNames.PERIODIC}
},
'delete-email-notifications': {
'task': 'delete-email-notifications',
'schedule': crontab(minute=20, hour=0),
'options': {'queue': QueueNames.PERIODIC}
},
'delete-letter-notifications': {
'task': 'delete-letter-notifications',
'schedule': crontab(minute=40, hour=0),
'options': {'queue': QueueNames.PERIODIC}
},
'delete-inbound-sms': {
'task': 'delete-inbound-sms',
'schedule': crontab(minute=0, hour=1),
'options': {'queue': QueueNames.PERIODIC}
},
'send-daily-performance-platform-stats': {
'task': 'send-daily-performance-platform-stats',
'schedule': crontab(minute=0, hour=2),
'options': {'queue': QueueNames.PERIODIC}
},
'switch-current-sms-provider-on-slow-delivery': {
'task': 'switch-current-sms-provider-on-slow-delivery',
'schedule': crontab(), # Every minute
'options': {'queue': QueueNames.PERIODIC}
},
'timeout-sending-notifications': {
'task': 'timeout-sending-notifications',
'schedule': crontab(minute=0, hour=3),
'options': {'queue': QueueNames.PERIODIC}
},
'remove_sms_email_jobs': {
'task': 'remove_csv_files',
'schedule': crontab(minute=0, hour=4),
'options': {'queue': QueueNames.PERIODIC},
'kwargs': {'job_types': [EMAIL_TYPE, SMS_TYPE]}
},
'remove_letter_jobs': {
'task': 'remove_csv_files',
'schedule': crontab(minute=20, hour=4),
'options': {'queue': QueueNames.PERIODIC},
'kwargs': {'job_types': [LETTER_TYPE]}
},
'remove_transformed_dvla_files': {
'task': 'remove_transformed_dvla_files',
'schedule': crontab(minute=40, hour=4),
'options': {'queue': QueueNames.PERIODIC}
},
'delete_dvla_response_files': {
'task': 'delete_dvla_response_files',
'schedule': crontab(minute=10, hour=5),
'options': {'queue': QueueNames.PERIODIC}
},
'timeout-job-statistics': {
'task': 'timeout-job-statistics',
'schedule': crontab(minute=0, hour=5),
'options': {'queue': QueueNames.PERIODIC}
}
}
CELERY_QUEUES = []
NOTIFICATIONS_ALERT = 5 # five mins
FROM_NUMBER = 'development'
@@ -132,7 +266,6 @@ class Config(object):
}
FREE_SMS_TIER_FRAGMENT_COUNT = 250000
INITIALISE_QUEUES = False
SMS_INBOUND_WHITELIST = json.loads(os.environ.get('SMS_INBOUND_WHITELIST', '[]'))
@@ -142,20 +275,24 @@ class Config(object):
######################
class Development(Config):
INITIALISE_QUEUES = True
SQLALCHEMY_ECHO = False
NOTIFY_EMAIL_DOMAIN = 'notify.tools'
CSV_UPLOAD_BUCKET_NAME = 'development-notifications-csv-upload'
DVLA_RESPONSE_BUCKET_NAME = 'notify.tools-ftp'
NOTIFY_ENVIRONMENT = 'development'
NOTIFICATION_QUEUE_PREFIX = 'development'
DEBUG = True
for queue in QueueNames.all_queues():
Config.CELERY_QUEUES.append(
Queue(queue, Exchange('default'), routing_key=queue)
)
API_HOST_NAME = "http://localhost:6011"
API_RATE_LIMIT_ENABLED = True
class Test(Config):
INITIALISE_QUEUES = True
NOTIFY_EMAIL_DOMAIN = 'test.notify.com'
FROM_NUMBER = 'testing'
NOTIFY_ENVIRONMENT = 'test'
@@ -169,6 +306,11 @@ class Test(Config):
BROKER_URL = 'you-forgot-to-mock-celery-in-your-tests://'
for queue in QueueNames.all_queues():
Config.CELERY_QUEUES.append(
Queue(queue, Exchange('default'), routing_key=queue)
)
API_RATE_LIMIT_ENABLED = True
API_HOST_NAME = "http://localhost:6011"

View File

@@ -1,6 +1,6 @@
from flask import Blueprint, jsonify
from app.celery import QueueNames
from app.config import QueueNames
from app.delivery import send_to_providers
from app.models import EMAIL_TYPE
from app.celery import provider_tasks
@@ -24,16 +24,20 @@ def send_notification_to_provider(notification_id):
send_response(
send_to_providers.send_email_to_provider,
provider_tasks.deliver_email,
notification)
notification,
QueueNames.SEND_EMAIL
)
else:
send_response(
send_to_providers.send_sms_to_provider,
provider_tasks.deliver_sms,
notification)
notification,
QueueNames.SEND_SMS
)
return jsonify({}), 204
def send_response(send_call, task_call, notification):
def send_response(send_call, task_call, notification, queue):
try:
send_call(notification)
except Exception as e:
@@ -42,4 +46,4 @@ def send_response(send_call, task_call, notification):
notification.id,
notification.notification_type),
e)
task_call.apply_async((str(notification.id)), queue=QueueNames.SEND)
task_call.apply_async((str(notification.id)), queue=queue)

View File

@@ -4,7 +4,7 @@ from flask import (
jsonify,
current_app)
from app.celery import QueueNames
from app.config import QueueNames
from app.dao.invited_user_dao import (
save_invited_user,
get_invited_user,

View File

@@ -36,7 +36,7 @@ from app.models import JOB_STATUS_SCHEDULED, JOB_STATUS_PENDING, JOB_STATUS_CANC
from app.utils import pagination_links
from app.celery import QueueNames
from app.config import QueueNames
job_blueprint = Blueprint('job', __name__, url_prefix='/service/<uuid:service_id>/job')

View File

@@ -2,7 +2,7 @@ from flask import Blueprint, jsonify
from flask import request
from app import notify_celery
from app.celery import QueueNames
from app.config import QueueNames
from app.dao.jobs_dao import dao_get_all_letter_jobs
from app.schemas import job_schema
from app.v2.errors import register_errors

View File

@@ -13,7 +13,7 @@ from app.celery.tasks import update_letter_notifications_statuses
from app.v2.errors import register_errors
from app.notifications.utils import autoconfirm_subscription
from app.schema_validation import validate
from app.celery import QueueNames
from app.config import QueueNames
letter_callback_blueprint = Blueprint('notifications_letter_callback', __name__)
register_errors(letter_callback_blueprint)

View File

@@ -1,4 +1,5 @@
from flask import Blueprint
from flask import current_app
from flask import json
from flask import request, jsonify
@@ -22,6 +23,12 @@ def process_mmg_response():
success, errors = process_sms_client_response(status=str(data.get('status')),
reference=data.get('CID'),
client_name=client_name)
safe_to_log = data.copy()
safe_to_log.pop("MSISDN")
current_app.logger.info(
"Full delivery response from {} for notification: {}\n{}".format(client_name, request.form.get('CID'),
safe_to_log))
if errors:
raise InvalidRequest(errors, status_code=400)
else:
@@ -36,9 +43,12 @@ def process_firetext_response():
client_name=client_name)
if errors:
raise InvalidRequest(errors, status_code=400)
status = request.form.get('status')
success, errors = process_sms_client_response(status=status,
safe_to_log = dict(request.form).copy()
safe_to_log.pop('mobile')
current_app.logger.info(
"Full delivery response from {} for notification: {}\n{}".format(client_name, request.form.get('reference'),
safe_to_log))
success, errors = process_sms_client_response(status=request.form.get('status'),
reference=request.form.get('reference'),
client_name=client_name)
if errors:

View File

@@ -1,3 +1,4 @@
import uuid
from datetime import datetime
from flask import current_app
@@ -12,7 +13,7 @@ from app import redis_store
from app.celery import provider_tasks
from notifications_utils.clients import redis
from app.celery import QueueNames
from app.config import QueueNames
from app.models import SMS_TYPE, Notification, KEY_TYPE_TEST, EMAIL_TYPE, ScheduledNotification
from app.dao.notifications_dao import (dao_create_notification,
dao_delete_notifications_and_history_by_id,
@@ -53,7 +54,8 @@ def persist_notification(
created_by_id=None
):
notification_created_at = created_at or datetime.utcnow()
if not notification_id and simulated:
notification_id = uuid.uuid4()
notification = Notification(
id=notification_id,
template_id=template_id,
@@ -100,12 +102,14 @@ def persist_notification(
def send_notification_to_queue(notification, research_mode, queue=None):
if research_mode or notification.key_type == KEY_TYPE_TEST:
queue = QueueNames.RESEARCH_MODE
elif not queue:
queue = QueueNames.SEND
if notification.notification_type == SMS_TYPE:
if not queue:
queue = QueueNames.SEND_SMS
deliver_task = provider_tasks.deliver_sms
if notification.notification_type == EMAIL_TYPE:
if not queue:
queue = QueueNames.SEND_EMAIL
deliver_task = provider_tasks.deliver_email
try:

View File

@@ -6,7 +6,7 @@ from notifications_utils.recipients import validate_and_format_phone_number
from app import statsd_client, firetext_client, mmg_client
from app.celery import tasks
from app.celery import QueueNames
from app.config import QueueNames
from app.dao.services_dao import dao_fetch_services_by_sms_sender
from app.dao.inbound_sms_dao import dao_create_inbound_sms
from app.models import InboundSms, INBOUND_SMS_TYPE, SMS_TYPE

View File

@@ -6,7 +6,7 @@ from flask import (
)
from app import api_user, authenticated_service
from app.celery import QueueNames
from app.config import QueueNames
from app.dao import (
templates_dao,
notifications_dao

View File

@@ -1,4 +1,4 @@
from app.celery import QueueNames
from app.config import QueueNames
from app.notifications.validators import (
check_service_over_daily_message_limit,
validate_and_format_recipient,

View File

@@ -1,6 +1,6 @@
from flask import current_app
from app.celery import QueueNames
from app.config import QueueNames
from app.dao.services_dao import dao_fetch_service_by_id, dao_fetch_active_users_for_service
from app.dao.templates_dao import dao_get_template_by_id
from app.models import EMAIL_TYPE, KEY_TYPE_NORMAL

View File

@@ -4,7 +4,7 @@ from datetime import datetime
from flask import (jsonify, request, Blueprint, current_app)
from app.celery import QueueNames
from app.config import QueueNames
from app.dao.users_dao import (
get_user_by_id,
save_model_user,

View File

@@ -1,8 +1,8 @@
from flask import request, jsonify, current_app
from app import api_user, authenticated_service
from app.models import SMS_TYPE, EMAIL_TYPE, PRIORITY
from app.celery import QueueNames
from app.config import QueueNames
from app.models import SMS_TYPE, EMAIL_TYPE, PRIORITY, SCHEDULE_NOTIFICATIONS
from app.notifications.process_notifications import (
persist_notification,
send_notification_to_queue,
@@ -11,17 +11,20 @@ from app.notifications.process_notifications import (
from app.notifications.validators import (
validate_and_format_recipient,
check_rate_limiting,
service_has_permission,
check_service_can_schedule_notification,
check_service_has_permission,
validate_template
)
from app.schema_validation import validate
from app.utils import get_public_notify_type_text
from app.v2.notifications import v2_notification_blueprint
from app.v2.notifications.notification_schemas import (
post_sms_request,
create_post_sms_response_from_notification,
post_email_request,
create_post_email_response_from_notification)
from app.v2.errors import BadRequestError
@v2_notification_blueprint.route('/<notification_type>', methods=['POST'])