2017-02-13 15:05:39 +00:00
|
|
|
from datetime import (
|
|
|
|
|
datetime,
|
|
|
|
|
timedelta
|
|
|
|
|
)
|
2016-06-20 13:33:53 +01:00
|
|
|
|
|
|
|
|
from flask import current_app
|
|
|
|
|
from sqlalchemy.exc import SQLAlchemyError
|
|
|
|
|
|
2016-09-07 15:36:59 +01:00
|
|
|
from app.aws import s3
|
2016-06-20 13:33:53 +01:00
|
|
|
from app import notify_celery
|
2017-01-27 12:30:56 +00:00
|
|
|
from app import performance_platform_client
|
2016-06-20 13:33:53 +01:00
|
|
|
from app.dao.invited_user_dao import delete_invitations_created_more_than_two_days_ago
|
Updates to the delete CSV file job to reduce the number of eligible jobs in any run
- previously this was unbounded, so it got all jobs older then 7 days. In excess of 75,000 🔥
- this meant that the job took (a) a long time and (b) a lot memory and (c) doing the same thing every day
These changes mean that the job has a 2 day eligible window for jobs, minimising the number of eligible jobs in a run, whilst still retaining some leeway in event if it failing one night.
In principle the job runs early morning on a given day. The previous 7 days are left along, and then the previous 2 days worth of files are deleted:
so:
runs on
31st
30,29,28,27,26,25,24 are ignored
23,22 jobs here have files deleted
21 and earlier are ignored.
2017-04-05 16:23:41 +01:00
|
|
|
from app.dao.jobs_dao import dao_set_scheduled_jobs_to_pending, dao_get_jobs_older_than_limited_by
|
2017-01-27 12:30:56 +00:00
|
|
|
from app.dao.notifications_dao import (
|
|
|
|
|
delete_notifications_created_more_than_a_week_ago,
|
2017-02-13 15:05:39 +00:00
|
|
|
dao_timeout_notifications,
|
2017-05-16 13:47:22 +01:00
|
|
|
is_delivery_slow_for_provider,
|
2017-05-22 15:07:16 +01:00
|
|
|
dao_get_scheduled_notifications,
|
|
|
|
|
set_scheduled_notification_to_processed)
|
2017-05-12 12:19:56 +01:00
|
|
|
from app.dao.statistics_dao import dao_timeout_job_statistics
|
2017-02-13 15:05:39 +00:00
|
|
|
from app.dao.provider_details_dao import (
|
|
|
|
|
get_current_provider,
|
|
|
|
|
dao_toggle_sms_provider
|
2017-01-27 12:30:56 +00:00
|
|
|
)
|
2016-06-20 13:33:53 +01:00
|
|
|
from app.dao.users_dao import delete_codes_older_created_more_than_a_day_ago
|
2017-05-16 13:47:22 +01:00
|
|
|
from app.notifications.process_notifications import send_notification_to_queue
|
2016-08-05 10:44:43 +01:00
|
|
|
from app.statsd_decorators import statsd
|
2016-08-24 17:03:56 +01:00
|
|
|
from app.celery.tasks import process_job
|
|
|
|
|
|
|
|
|
|
|
2016-09-07 15:36:59 +01:00
|
|
|
@notify_celery.task(name="remove_csv_files")
|
|
|
|
|
@statsd(namespace="tasks")
|
|
|
|
|
def remove_csv_files():
|
Updates to the delete CSV file job to reduce the number of eligible jobs in any run
- previously this was unbounded, so it got all jobs older then 7 days. In excess of 75,000 🔥
- this meant that the job took (a) a long time and (b) a lot memory and (c) doing the same thing every day
These changes mean that the job has a 2 day eligible window for jobs, minimising the number of eligible jobs in a run, whilst still retaining some leeway in event if it failing one night.
In principle the job runs early morning on a given day. The previous 7 days are left along, and then the previous 2 days worth of files are deleted:
so:
runs on
31st
30,29,28,27,26,25,24 are ignored
23,22 jobs here have files deleted
21 and earlier are ignored.
2017-04-05 16:23:41 +01:00
|
|
|
jobs = dao_get_jobs_older_than_limited_by()
|
2016-09-07 15:36:59 +01:00
|
|
|
for job in jobs:
|
|
|
|
|
s3.remove_job_from_s3(job.service_id, job.id)
|
|
|
|
|
current_app.logger.info("Job ID {} has been removed from s3.".format(job.id))
|
|
|
|
|
|
|
|
|
|
|
2016-08-24 17:03:56 +01:00
|
|
|
@notify_celery.task(name="run-scheduled-jobs")
|
|
|
|
|
@statsd(namespace="tasks")
|
|
|
|
|
def run_scheduled_jobs():
|
|
|
|
|
try:
|
2016-10-07 12:28:42 +01:00
|
|
|
for job in dao_set_scheduled_jobs_to_pending():
|
|
|
|
|
process_job.apply_async([str(job.id)], queue="process-job")
|
2016-10-07 12:55:48 +01:00
|
|
|
current_app.logger.info("Job ID {} added to process job queue".format(job.id))
|
2017-05-22 16:30:45 +01:00
|
|
|
except SQLAlchemyError:
|
2016-10-17 17:44:17 +01:00
|
|
|
current_app.logger.exception("Failed to run scheduled jobs")
|
2016-08-24 17:03:56 +01:00
|
|
|
raise
|
2016-06-20 13:33:53 +01:00
|
|
|
|
|
|
|
|
|
2017-05-16 13:47:22 +01:00
|
|
|
@notify_celery.task(name='send-scheduled-notifications')
|
|
|
|
|
@statsd(namespace="tasks")
|
|
|
|
|
def send_scheduled_notifications():
|
|
|
|
|
try:
|
2017-05-17 12:59:00 +01:00
|
|
|
scheduled_notifications = dao_get_scheduled_notifications()
|
|
|
|
|
for notification in scheduled_notifications:
|
2017-05-16 13:47:22 +01:00
|
|
|
send_notification_to_queue(notification, notification.service.research_mode)
|
2017-05-22 15:07:16 +01:00
|
|
|
set_scheduled_notification_to_processed(notification.id)
|
2017-05-17 12:59:00 +01:00
|
|
|
current_app.logger.info(
|
2017-05-22 16:30:45 +01:00
|
|
|
"Sent {} scheduled notifications to the provider queue".format(len(scheduled_notifications)))
|
|
|
|
|
except SQLAlchemyError:
|
2017-05-16 13:47:22 +01:00
|
|
|
current_app.logger.exception("Failed to send scheduled notifications")
|
|
|
|
|
raise
|
|
|
|
|
|
|
|
|
|
|
2016-06-20 13:33:53 +01:00
|
|
|
@notify_celery.task(name="delete-verify-codes")
|
2016-08-05 10:44:43 +01:00
|
|
|
@statsd(namespace="tasks")
|
2016-06-20 13:33:53 +01:00
|
|
|
def delete_verify_codes():
|
|
|
|
|
try:
|
|
|
|
|
start = datetime.utcnow()
|
|
|
|
|
deleted = delete_codes_older_created_more_than_a_day_ago()
|
|
|
|
|
current_app.logger.info(
|
|
|
|
|
"Delete job started {} finished {} deleted {} verify codes".format(start, datetime.utcnow(), deleted)
|
|
|
|
|
)
|
2016-08-24 17:03:56 +01:00
|
|
|
except SQLAlchemyError as e:
|
2016-10-17 17:44:17 +01:00
|
|
|
current_app.logger.exception("Failed to delete verify codes")
|
2016-06-20 13:33:53 +01:00
|
|
|
raise
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@notify_celery.task(name="delete-successful-notifications")
|
2016-08-05 10:44:43 +01:00
|
|
|
@statsd(namespace="tasks")
|
2016-06-20 13:33:53 +01:00
|
|
|
def delete_successful_notifications():
|
|
|
|
|
try:
|
|
|
|
|
start = datetime.utcnow()
|
|
|
|
|
deleted = delete_notifications_created_more_than_a_week_ago('delivered')
|
|
|
|
|
current_app.logger.info(
|
|
|
|
|
"Delete job started {} finished {} deleted {} successful notifications".format(
|
|
|
|
|
start,
|
|
|
|
|
datetime.utcnow(),
|
|
|
|
|
deleted
|
|
|
|
|
)
|
|
|
|
|
)
|
2016-08-24 17:03:56 +01:00
|
|
|
except SQLAlchemyError as e:
|
2016-10-17 17:44:17 +01:00
|
|
|
current_app.logger.exception("Failed to delete successful notifications")
|
2016-06-20 13:33:53 +01:00
|
|
|
raise
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@notify_celery.task(name="delete-failed-notifications")
|
2016-08-05 10:44:43 +01:00
|
|
|
@statsd(namespace="tasks")
|
2016-06-20 13:33:53 +01:00
|
|
|
def delete_failed_notifications():
|
|
|
|
|
try:
|
|
|
|
|
start = datetime.utcnow()
|
|
|
|
|
deleted = delete_notifications_created_more_than_a_week_ago('failed')
|
|
|
|
|
deleted += delete_notifications_created_more_than_a_week_ago('technical-failure')
|
|
|
|
|
deleted += delete_notifications_created_more_than_a_week_ago('temporary-failure')
|
|
|
|
|
deleted += delete_notifications_created_more_than_a_week_ago('permanent-failure')
|
|
|
|
|
current_app.logger.info(
|
|
|
|
|
"Delete job started {} finished {} deleted {} failed notifications".format(
|
|
|
|
|
start,
|
|
|
|
|
datetime.utcnow(),
|
|
|
|
|
deleted
|
|
|
|
|
)
|
|
|
|
|
)
|
2016-08-24 17:03:56 +01:00
|
|
|
except SQLAlchemyError as e:
|
2016-10-17 17:44:17 +01:00
|
|
|
current_app.logger.exception("Failed to delete failed notifications")
|
2016-06-20 13:33:53 +01:00
|
|
|
raise
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@notify_celery.task(name="delete-invitations")
|
2016-08-05 10:44:43 +01:00
|
|
|
@statsd(namespace="tasks")
|
2016-06-20 13:33:53 +01:00
|
|
|
def delete_invitations():
|
|
|
|
|
try:
|
|
|
|
|
start = datetime.utcnow()
|
|
|
|
|
deleted = delete_invitations_created_more_than_two_days_ago()
|
|
|
|
|
current_app.logger.info(
|
|
|
|
|
"Delete job started {} finished {} deleted {} invitations".format(start, datetime.utcnow(), deleted)
|
|
|
|
|
)
|
2016-08-24 17:03:56 +01:00
|
|
|
except SQLAlchemyError as e:
|
2016-10-17 17:44:17 +01:00
|
|
|
current_app.logger.exception("Failed to delete invitations")
|
2016-06-20 13:33:53 +01:00
|
|
|
raise
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@notify_celery.task(name='timeout-sending-notifications')
|
2016-08-05 10:44:43 +01:00
|
|
|
@statsd(namespace="tasks")
|
2016-06-20 13:33:53 +01:00
|
|
|
def timeout_notifications():
|
2016-09-14 10:38:34 +01:00
|
|
|
updated = dao_timeout_notifications(current_app.config.get('SENDING_NOTIFICATIONS_TIMEOUT_PERIOD'))
|
|
|
|
|
if updated:
|
|
|
|
|
current_app.logger.info(
|
|
|
|
|
"Timeout period reached for {} notifications, status has been updated.".format(updated))
|
2017-01-27 12:30:56 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
@notify_celery.task(name='send-daily-performance-platform-stats')
|
|
|
|
|
@statsd(namespace="tasks")
|
2017-01-30 18:24:18 +00:00
|
|
|
def send_daily_performance_platform_stats():
|
|
|
|
|
if performance_platform_client.active:
|
|
|
|
|
count_dict = performance_platform_client.get_total_sent_notifications_yesterday()
|
|
|
|
|
email_sent_count = count_dict.get('email').get('count')
|
|
|
|
|
sms_sent_count = count_dict.get('sms').get('count')
|
|
|
|
|
start_date = count_dict.get('start_date')
|
|
|
|
|
|
|
|
|
|
current_app.logger.info(
|
|
|
|
|
"Attempting to update performance platform for date {} with email count {} and sms count {}"
|
|
|
|
|
.format(start_date, email_sent_count, sms_sent_count)
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
performance_platform_client.send_performance_stats(
|
|
|
|
|
start_date,
|
|
|
|
|
'sms',
|
|
|
|
|
sms_sent_count,
|
|
|
|
|
'day'
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
performance_platform_client.send_performance_stats(
|
|
|
|
|
start_date,
|
|
|
|
|
'email',
|
|
|
|
|
email_sent_count,
|
|
|
|
|
'day'
|
|
|
|
|
)
|
2017-02-13 15:05:39 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
@notify_celery.task(name='switch-current-sms-provider-on-slow-delivery')
|
|
|
|
|
@statsd(namespace="tasks")
|
|
|
|
|
def switch_current_sms_provider_on_slow_delivery():
|
|
|
|
|
"""
|
|
|
|
|
Switch providers if there are at least two slow delivery notifications (more than four minutes)
|
|
|
|
|
in the last ten minutes. Search from the time we last switched to the current provider.
|
|
|
|
|
"""
|
|
|
|
|
functional_test_provider_service_id = current_app.config.get('FUNCTIONAL_TEST_PROVIDER_SERVICE_ID')
|
|
|
|
|
functional_test_provider_template_id = current_app.config.get('FUNCTIONAL_TEST_PROVIDER_SMS_TEMPLATE_ID')
|
|
|
|
|
|
|
|
|
|
if functional_test_provider_service_id and functional_test_provider_template_id:
|
|
|
|
|
current_provider = get_current_provider('sms')
|
2017-02-24 13:41:32 +00:00
|
|
|
slow_delivery_notifications = is_delivery_slow_for_provider(
|
2017-02-13 15:05:39 +00:00
|
|
|
provider=current_provider.identifier,
|
|
|
|
|
threshold=2,
|
2017-02-24 13:41:32 +00:00
|
|
|
sent_at=max(datetime.utcnow() - timedelta(minutes=10), current_provider.updated_at),
|
2017-02-13 15:05:39 +00:00
|
|
|
delivery_time=timedelta(minutes=4),
|
|
|
|
|
service_id=functional_test_provider_service_id,
|
|
|
|
|
template_id=functional_test_provider_template_id
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
if slow_delivery_notifications:
|
|
|
|
|
current_app.logger.warning(
|
2017-02-24 13:41:32 +00:00
|
|
|
'Slow delivery notifications detected for provider {}'.format(
|
|
|
|
|
current_provider.identifier
|
2017-02-13 15:05:39 +00:00
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
dao_toggle_sms_provider(current_provider.identifier)
|
2017-05-11 15:22:57 +01:00
|
|
|
|
|
|
|
|
|
|
|
|
|
@notify_celery.task(name='timeout-job-statistics')
|
|
|
|
|
@statsd(namespace="tasks")
|
|
|
|
|
def timeout_job_statistics():
|
|
|
|
|
updated = dao_timeout_job_statistics(current_app.config.get('SENDING_NOTIFICATIONS_TIMEOUT_PERIOD'))
|
|
|
|
|
if updated:
|
|
|
|
|
current_app.logger.info(
|
|
|
|
|
"Timeout period reached for {} job statistics, failure count has been updated.".format(updated))
|