Merge branch 'master' into schedule-api-notification

Conflicts:
	app/celery/scheduled_tasks.py
	app/v2/notifications/post_notifications.py
	tests/app/celery/test_scheduled_tasks.py
This commit is contained in:
Rebecca Law
2017-05-22 14:05:57 +01:00
48 changed files with 1917 additions and 291 deletions

View File

@@ -15,7 +15,9 @@ from app.dao.notifications_dao import (
delete_notifications_created_more_than_a_week_ago,
dao_timeout_notifications,
is_delivery_slow_for_provider,
dao_get_scheduled_notifications)
dao_get_scheduled_notifications
)
from app.dao.statistics_dao import dao_timeout_job_statistics
from app.dao.provider_details_dao import (
get_current_provider,
dao_toggle_sms_provider
@@ -195,3 +197,12 @@ def switch_current_sms_provider_on_slow_delivery():
)
dao_toggle_sms_provider(current_provider.identifier)
@notify_celery.task(name='timeout-job-statistics')
@statsd(namespace="tasks")
def timeout_job_statistics():
updated = dao_timeout_job_statistics(current_app.config.get('SENDING_NOTIFICATIONS_TIMEOUT_PERIOD'))
if updated:
current_app.logger.info(
"Timeout period reached for {} job statistics, failure count has been updated.".format(updated))

View File

@@ -0,0 +1,67 @@
from sqlalchemy.exc import SQLAlchemyError
from app import notify_celery
from flask import current_app
from app.models import JobStatistics
from app.statsd_decorators import statsd
from app.dao.statistics_dao import (
create_or_update_job_sending_statistics,
update_job_stats_outcome_count
)
from app.dao.notifications_dao import get_notification_by_id
from app.models import NOTIFICATION_STATUS_TYPES_COMPLETED
def create_initial_notification_statistic_tasks(notification):
if notification.job_id and notification.status:
record_initial_job_statistics.apply_async((str(notification.id),), queue="statistics")
def create_outcome_notification_statistic_tasks(notification):
if notification.job_id and notification.status in NOTIFICATION_STATUS_TYPES_COMPLETED:
record_outcome_job_statistics.apply_async((str(notification.id),), queue="statistics")
@notify_celery.task(bind=True, name='record_initial_job_statistics', max_retries=20, default_retry_delay=10)
@statsd(namespace="tasks")
def record_initial_job_statistics(self, notification_id):
notification = None
try:
notification = get_notification_by_id(notification_id)
if notification:
create_or_update_job_sending_statistics(notification)
else:
raise SQLAlchemyError("Failed to find notification with id {}".format(notification_id))
except SQLAlchemyError as e:
current_app.logger.exception(e)
self.retry(queue="retry")
except self.MaxRetriesExceededError:
current_app.logger.error(
"RETRY FAILED: task record_initial_job_statistics failed for notification {}".format(
notification.id if notification else "missing ID"
)
)
@notify_celery.task(bind=True, name='record_outcome_job_statistics', max_retries=20, default_retry_delay=10)
@statsd(namespace="tasks")
def record_outcome_job_statistics(self, notification_id):
notification = None
try:
notification = get_notification_by_id(notification_id)
if notification:
updated_count = update_job_stats_outcome_count(notification)
if updated_count == 0:
self.retry(queue="retry")
else:
raise SQLAlchemyError("Failed to find notification with id {}".format(notification_id))
except SQLAlchemyError as e:
current_app.logger.exception(e)
self.retry(queue="retry")
except self.MaxRetriesExceededError:
current_app.logger.error(
"RETRY FAILED: task update_job_stats_outcome_count failed for notification {}".format(
notification.id if notification else "missing ID"
)
)

View File

@@ -1,5 +1,3 @@
import random
from datetime import (datetime)
from collections import namedtuple
@@ -361,21 +359,20 @@ def get_template_class(template_type):
@statsd(namespace="tasks")
def update_letter_notifications_statuses(self, filename):
bucket_location = '{}-ftp'.format(current_app.config['NOTIFY_EMAIL_DOMAIN'])
response_file = s3.get_s3_file(bucket_location, filename)
response_file_content = s3.get_s3_file(bucket_location, filename)
try:
NotificationUpdate = namedtuple('NotificationUpdate', ['reference', 'status', 'page_count', 'cost_threshold'])
notification_updates = [NotificationUpdate(*line.split('|')) for line in response_file.splitlines()]
notification_updates = process_updates_from_file(response_file_content)
except TypeError:
current_app.logger.exception('DVLA response file: {} has an invalid format'.format(filename))
raise
else:
if notification_updates:
for update in notification_updates:
current_app.logger.info('DVLA update: {}'.format(str(update)))
# TODO: Update notifications with desired status
return notification_updates
else:
current_app.logger.exception('DVLA response file contained no updates')
for update in notification_updates:
current_app.logger.info('DVLA update: {}'.format(str(update)))
# TODO: Update notifications with desired status
def process_updates_from_file(response_file):
NotificationUpdate = namedtuple('NotificationUpdate', ['reference', 'status', 'page_count', 'cost_threshold'])
notification_updates = [NotificationUpdate(*line.split('|')) for line in response_file.splitlines()]
return notification_updates