2017-02-13 15:05:39 +00:00
|
|
|
from datetime import (
|
|
|
|
|
datetime,
|
|
|
|
|
timedelta
|
|
|
|
|
)
|
2016-06-20 13:33:53 +01:00
|
|
|
|
|
|
|
|
from flask import current_app
|
2018-02-06 09:35:33 +00:00
|
|
|
from notifications_utils.statsd_decorators import statsd
|
2019-01-14 17:22:41 +00:00
|
|
|
from sqlalchemy import and_
|
2016-06-20 13:33:53 +01:00
|
|
|
from sqlalchemy.exc import SQLAlchemyError
|
|
|
|
|
|
2019-06-17 13:52:36 +01:00
|
|
|
from app import notify_celery, zendesk_client
|
2019-11-05 16:47:00 +00:00
|
|
|
from app.celery.tasks import (
|
|
|
|
|
process_job,
|
2019-11-08 10:30:26 +00:00
|
|
|
get_recipient_csv_and_template_and_sender_id,
|
2019-11-05 16:47:00 +00:00
|
|
|
process_row
|
|
|
|
|
)
|
2019-11-19 16:04:21 +00:00
|
|
|
from app.celery.letters_pdf_tasks import create_letters_pdf
|
2018-04-25 10:10:25 +01:00
|
|
|
from app.config import QueueNames, TaskNames
|
2018-02-16 10:56:12 +00:00
|
|
|
from app.dao.invited_org_user_dao import delete_org_invitations_created_more_than_two_days_ago
|
2018-04-25 10:10:25 +01:00
|
|
|
from app.dao.invited_user_dao import delete_invitations_created_more_than_two_days_ago
|
2019-11-05 16:47:00 +00:00
|
|
|
from app.dao.jobs_dao import (
|
|
|
|
|
dao_set_scheduled_jobs_to_pending,
|
|
|
|
|
find_jobs_with_missing_rows,
|
|
|
|
|
find_missing_row_for_job
|
|
|
|
|
)
|
2018-04-25 10:10:25 +01:00
|
|
|
from app.dao.jobs_dao import dao_update_job
|
2017-01-27 12:30:56 +00:00
|
|
|
from app.dao.notifications_dao import (
|
2017-05-22 15:07:16 +01:00
|
|
|
dao_get_scheduled_notifications,
|
2017-09-15 17:46:08 +01:00
|
|
|
set_scheduled_notification_to_processed,
|
2019-06-11 13:16:34 +01:00
|
|
|
notifications_not_yet_sent,
|
|
|
|
|
dao_precompiled_letters_still_pending_virus_check,
|
2019-06-11 15:13:06 +01:00
|
|
|
dao_old_letters_with_created_status,
|
2019-11-13 17:01:03 +00:00
|
|
|
letters_missing_from_sending_bucket,
|
|
|
|
|
is_delivery_slow_for_providers,
|
2017-01-27 12:30:56 +00:00
|
|
|
)
|
2019-11-13 17:01:03 +00:00
|
|
|
from app.dao.provider_details_dao import dao_reduce_sms_provider_priority
|
2016-06-20 13:33:53 +01:00
|
|
|
from app.dao.users_dao import delete_codes_older_created_more_than_a_day_ago
|
2017-10-13 16:46:17 +01:00
|
|
|
from app.models import (
|
|
|
|
|
Job,
|
2017-10-17 11:07:36 +01:00
|
|
|
JOB_STATUS_IN_PROGRESS,
|
2018-03-23 15:38:35 +00:00
|
|
|
JOB_STATUS_ERROR,
|
|
|
|
|
SMS_TYPE,
|
2018-04-25 10:10:25 +01:00
|
|
|
EMAIL_TYPE,
|
2017-10-13 16:46:17 +01:00
|
|
|
)
|
2017-05-16 13:47:22 +01:00
|
|
|
from app.notifications.process_notifications import send_notification_to_queue
|
2018-01-23 09:51:43 +00:00
|
|
|
from app.v2.errors import JobIncompleteError
|
2016-08-24 17:03:56 +01:00
|
|
|
|
2019-11-29 21:24:17 +00:00
|
|
|
from app.service.utils import get_services_with_high_failure_rates
|
|
|
|
|
|
2016-08-24 17:03:56 +01:00
|
|
|
|
|
|
|
|
@notify_celery.task(name="run-scheduled-jobs")
|
|
|
|
|
@statsd(namespace="tasks")
|
|
|
|
|
def run_scheduled_jobs():
|
|
|
|
|
try:
|
2016-10-07 12:28:42 +01:00
|
|
|
for job in dao_set_scheduled_jobs_to_pending():
|
2017-05-25 10:51:49 +01:00
|
|
|
process_job.apply_async([str(job.id)], queue=QueueNames.JOBS)
|
2016-10-07 12:55:48 +01:00
|
|
|
current_app.logger.info("Job ID {} added to process job queue".format(job.id))
|
2017-05-22 16:30:45 +01:00
|
|
|
except SQLAlchemyError:
|
2016-10-17 17:44:17 +01:00
|
|
|
current_app.logger.exception("Failed to run scheduled jobs")
|
2016-08-24 17:03:56 +01:00
|
|
|
raise
|
2016-06-20 13:33:53 +01:00
|
|
|
|
|
|
|
|
|
2017-05-16 13:47:22 +01:00
|
|
|
@notify_celery.task(name='send-scheduled-notifications')
|
|
|
|
|
@statsd(namespace="tasks")
|
|
|
|
|
def send_scheduled_notifications():
|
|
|
|
|
try:
|
2017-05-17 12:59:00 +01:00
|
|
|
scheduled_notifications = dao_get_scheduled_notifications()
|
|
|
|
|
for notification in scheduled_notifications:
|
2017-05-16 13:47:22 +01:00
|
|
|
send_notification_to_queue(notification, notification.service.research_mode)
|
2017-05-22 15:07:16 +01:00
|
|
|
set_scheduled_notification_to_processed(notification.id)
|
2017-05-17 12:59:00 +01:00
|
|
|
current_app.logger.info(
|
2017-05-22 16:30:45 +01:00
|
|
|
"Sent {} scheduled notifications to the provider queue".format(len(scheduled_notifications)))
|
|
|
|
|
except SQLAlchemyError:
|
2017-05-16 13:47:22 +01:00
|
|
|
current_app.logger.exception("Failed to send scheduled notifications")
|
|
|
|
|
raise
|
|
|
|
|
|
|
|
|
|
|
2016-06-20 13:33:53 +01:00
|
|
|
@notify_celery.task(name="delete-verify-codes")
|
2016-08-05 10:44:43 +01:00
|
|
|
@statsd(namespace="tasks")
|
2016-06-20 13:33:53 +01:00
|
|
|
def delete_verify_codes():
|
|
|
|
|
try:
|
|
|
|
|
start = datetime.utcnow()
|
|
|
|
|
deleted = delete_codes_older_created_more_than_a_day_ago()
|
|
|
|
|
current_app.logger.info(
|
|
|
|
|
"Delete job started {} finished {} deleted {} verify codes".format(start, datetime.utcnow(), deleted)
|
|
|
|
|
)
|
2017-08-24 17:08:39 +01:00
|
|
|
except SQLAlchemyError:
|
2016-10-17 17:44:17 +01:00
|
|
|
current_app.logger.exception("Failed to delete verify codes")
|
2016-06-20 13:33:53 +01:00
|
|
|
raise
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@notify_celery.task(name="delete-invitations")
|
2016-08-05 10:44:43 +01:00
|
|
|
@statsd(namespace="tasks")
|
2016-06-20 13:33:53 +01:00
|
|
|
def delete_invitations():
|
|
|
|
|
try:
|
|
|
|
|
start = datetime.utcnow()
|
2018-02-16 10:56:12 +00:00
|
|
|
deleted_invites = delete_invitations_created_more_than_two_days_ago()
|
|
|
|
|
deleted_invites += delete_org_invitations_created_more_than_two_days_ago()
|
2016-06-20 13:33:53 +01:00
|
|
|
current_app.logger.info(
|
2018-02-16 14:42:03 +00:00
|
|
|
"Delete job started {} finished {} deleted {} invitations".format(start, datetime.utcnow(), deleted_invites)
|
2016-06-20 13:33:53 +01:00
|
|
|
)
|
2017-08-24 17:08:39 +01:00
|
|
|
except SQLAlchemyError:
|
2016-10-17 17:44:17 +01:00
|
|
|
current_app.logger.exception("Failed to delete invitations")
|
2016-06-20 13:33:53 +01:00
|
|
|
raise
|
|
|
|
|
|
|
|
|
|
|
2017-02-13 15:05:39 +00:00
|
|
|
@notify_celery.task(name='switch-current-sms-provider-on-slow-delivery')
|
|
|
|
|
@statsd(namespace="tasks")
|
|
|
|
|
def switch_current_sms_provider_on_slow_delivery():
|
|
|
|
|
"""
|
2019-11-13 17:01:03 +00:00
|
|
|
Reduce provider's priority if at least 30% of notifications took more than four minutes to be delivered
|
|
|
|
|
in the last ten minutes. If both providers are slow, don't do anything. If we changed the providers in the
|
|
|
|
|
last ten minutes, then don't update them again either.
|
2017-02-13 15:05:39 +00:00
|
|
|
"""
|
2019-10-14 15:01:08 +01:00
|
|
|
slow_delivery_notifications = is_delivery_slow_for_providers(
|
2019-03-14 15:37:09 +00:00
|
|
|
threshold=0.3,
|
2018-12-05 14:40:07 +00:00
|
|
|
created_at=datetime.utcnow() - timedelta(minutes=10),
|
|
|
|
|
delivery_time=timedelta(minutes=4),
|
|
|
|
|
)
|
2017-02-13 15:05:39 +00:00
|
|
|
|
2019-11-13 17:01:03 +00:00
|
|
|
# only adjust if some values are true and some are false - ie, don't adjust if all providers are fast or
|
|
|
|
|
# all providers are slow
|
|
|
|
|
if len(set(slow_delivery_notifications.values())) != 1:
|
|
|
|
|
for provider_name, is_slow in slow_delivery_notifications.items():
|
|
|
|
|
if is_slow:
|
|
|
|
|
current_app.logger.warning('Slow delivery notifications detected for provider {}'.format(provider_name))
|
2019-11-20 17:23:39 +00:00
|
|
|
dao_reduce_sms_provider_priority(provider_name, time_threshold=timedelta(minutes=10))
|
2017-05-11 15:22:57 +01:00
|
|
|
|
|
|
|
|
|
2017-10-12 16:21:08 +01:00
|
|
|
@notify_celery.task(name='check-job-status')
|
|
|
|
|
@statsd(namespace="tasks")
|
|
|
|
|
def check_job_status():
|
|
|
|
|
"""
|
|
|
|
|
every x minutes do this check
|
|
|
|
|
select
|
|
|
|
|
from jobs
|
|
|
|
|
where job_status == 'in progress'
|
|
|
|
|
and template_type in ('sms', 'email')
|
|
|
|
|
and scheduled_at or created_at is older that 30 minutes.
|
|
|
|
|
if any results then
|
|
|
|
|
raise error
|
2017-10-12 16:23:28 +01:00
|
|
|
process the rows in the csv that are missing (in another task) just do the check here.
|
2017-10-12 16:21:08 +01:00
|
|
|
"""
|
|
|
|
|
thirty_minutes_ago = datetime.utcnow() - timedelta(minutes=30)
|
|
|
|
|
thirty_five_minutes_ago = datetime.utcnow() - timedelta(minutes=35)
|
|
|
|
|
|
|
|
|
|
jobs_not_complete_after_30_minutes = Job.query.filter(
|
|
|
|
|
Job.job_status == JOB_STATUS_IN_PROGRESS,
|
|
|
|
|
and_(thirty_five_minutes_ago < Job.processing_started, Job.processing_started < thirty_minutes_ago)
|
2017-10-18 09:50:39 +01:00
|
|
|
).order_by(Job.processing_started).all()
|
2017-10-12 16:21:08 +01:00
|
|
|
|
2018-03-09 16:34:47 +00:00
|
|
|
# temporarily mark them as ERROR so that they don't get picked up by future check_job_status tasks
|
|
|
|
|
# if they haven't been re-processed in time.
|
|
|
|
|
job_ids = []
|
|
|
|
|
for job in jobs_not_complete_after_30_minutes:
|
|
|
|
|
job.job_status = JOB_STATUS_ERROR
|
|
|
|
|
dao_update_job(job)
|
|
|
|
|
job_ids.append(str(job.id))
|
|
|
|
|
|
2017-10-12 16:21:08 +01:00
|
|
|
if job_ids:
|
2017-10-13 16:46:17 +01:00
|
|
|
notify_celery.send_task(
|
|
|
|
|
name=TaskNames.PROCESS_INCOMPLETE_JOBS,
|
|
|
|
|
args=(job_ids,),
|
|
|
|
|
queue=QueueNames.JOBS
|
|
|
|
|
)
|
2017-10-12 16:21:08 +01:00
|
|
|
raise JobIncompleteError("Job(s) {} have not completed.".format(job_ids))
|
2017-11-09 10:32:39 +00:00
|
|
|
|
|
|
|
|
|
2018-03-23 15:38:35 +00:00
|
|
|
@notify_celery.task(name='replay-created-notifications')
|
|
|
|
|
@statsd(namespace="tasks")
|
|
|
|
|
def replay_created_notifications():
|
2019-11-21 15:51:27 +00:00
|
|
|
# if the notification has not be send after 1 hour, then try to resend.
|
|
|
|
|
resend_created_notifications_older_than = (60 * 60)
|
2018-03-23 15:38:35 +00:00
|
|
|
for notification_type in (EMAIL_TYPE, SMS_TYPE):
|
|
|
|
|
notifications_to_resend = notifications_not_yet_sent(
|
2018-03-26 10:26:24 +01:00
|
|
|
resend_created_notifications_older_than,
|
2018-03-23 15:38:35 +00:00
|
|
|
notification_type
|
|
|
|
|
)
|
|
|
|
|
|
2018-12-21 13:57:35 +00:00
|
|
|
if len(notifications_to_resend) > 0:
|
|
|
|
|
current_app.logger.info("Sending {} {} notifications "
|
|
|
|
|
"to the delivery queue because the notification "
|
|
|
|
|
"status was created.".format(len(notifications_to_resend), notification_type))
|
2018-03-23 15:38:35 +00:00
|
|
|
|
|
|
|
|
for n in notifications_to_resend:
|
|
|
|
|
send_notification_to_queue(notification=n, research_mode=n.service.research_mode)
|
2019-06-11 13:16:34 +01:00
|
|
|
|
2019-11-27 13:26:53 +00:00
|
|
|
# if the letter has not be send after an hour, then create a zendesk ticket
|
2019-11-13 16:39:59 +00:00
|
|
|
letters = letters_missing_from_sending_bucket(resend_created_notifications_older_than)
|
|
|
|
|
|
|
|
|
|
if len(letters) > 0:
|
2019-11-21 15:51:27 +00:00
|
|
|
msg = "{} letters were created over an hour ago, " \
|
2019-11-13 16:39:59 +00:00
|
|
|
"but do not have an updated_at timestamp or billable units. " \
|
2019-11-19 16:04:21 +00:00
|
|
|
"\n Creating app.celery.letters_pdf_tasks.create_letters tasks to upload letter to S3 " \
|
|
|
|
|
"and update notifications for the following notification ids: " \
|
|
|
|
|
"\n {}".format(len(letters), [x.id for x in letters])
|
|
|
|
|
|
2019-11-13 16:39:59 +00:00
|
|
|
current_app.logger.info(msg)
|
2019-11-19 16:04:21 +00:00
|
|
|
for letter in letters:
|
|
|
|
|
create_letters_pdf.apply_async([letter.id], queue=QueueNames.LETTERS)
|
2019-11-13 16:39:59 +00:00
|
|
|
|
2019-06-11 13:16:34 +01:00
|
|
|
|
|
|
|
|
@notify_celery.task(name='check-precompiled-letter-state')
|
|
|
|
|
@statsd(namespace="tasks")
|
|
|
|
|
def check_precompiled_letter_state():
|
|
|
|
|
letters = dao_precompiled_letters_still_pending_virus_check()
|
|
|
|
|
|
|
|
|
|
if len(letters) > 0:
|
2019-06-17 13:52:36 +01:00
|
|
|
letter_ids = [str(letter.id) for letter in letters]
|
|
|
|
|
|
|
|
|
|
msg = "{} precompiled letters have been pending-virus-check for over 90 minutes. " \
|
|
|
|
|
"Notifications: {}".format(len(letters), letter_ids)
|
|
|
|
|
|
|
|
|
|
current_app.logger.exception(msg)
|
|
|
|
|
|
|
|
|
|
if current_app.config['NOTIFY_ENVIRONMENT'] in ['live', 'production', 'test']:
|
|
|
|
|
zendesk_client.create_ticket(
|
|
|
|
|
subject="[{}] Letters still pending virus check".format(current_app.config['NOTIFY_ENVIRONMENT']),
|
|
|
|
|
message=msg,
|
|
|
|
|
ticket_type=zendesk_client.TYPE_INCIDENT
|
|
|
|
|
)
|
2019-06-11 15:13:06 +01:00
|
|
|
|
|
|
|
|
|
|
|
|
|
@notify_celery.task(name='check-templated-letter-state')
|
|
|
|
|
@statsd(namespace="tasks")
|
|
|
|
|
def check_templated_letter_state():
|
|
|
|
|
letters = dao_old_letters_with_created_status()
|
|
|
|
|
|
|
|
|
|
if len(letters) > 0:
|
2019-06-17 13:52:36 +01:00
|
|
|
letter_ids = [str(letter.id) for letter in letters]
|
|
|
|
|
|
|
|
|
|
msg = "{} letters were created before 17.30 yesterday and still have 'created' status. " \
|
|
|
|
|
"Notifications: {}".format(len(letters), letter_ids)
|
|
|
|
|
|
|
|
|
|
current_app.logger.exception(msg)
|
|
|
|
|
|
|
|
|
|
if current_app.config['NOTIFY_ENVIRONMENT'] in ['live', 'production', 'test']:
|
|
|
|
|
zendesk_client.create_ticket(
|
|
|
|
|
subject="[{}] Letters still in 'created' status".format(current_app.config['NOTIFY_ENVIRONMENT']),
|
|
|
|
|
message=msg,
|
|
|
|
|
ticket_type=zendesk_client.TYPE_INCIDENT
|
|
|
|
|
)
|
2019-11-05 15:14:23 +00:00
|
|
|
|
2019-11-05 16:47:00 +00:00
|
|
|
|
|
|
|
|
@notify_celery.task(name='check-for-missing-rows-in-completed-jobs')
|
|
|
|
|
def check_for_missing_rows_in_completed_jobs():
|
2019-11-05 15:14:23 +00:00
|
|
|
jobs_and_job_size = find_jobs_with_missing_rows()
|
|
|
|
|
for x in jobs_and_job_size:
|
2019-11-05 16:47:00 +00:00
|
|
|
job = x[1]
|
|
|
|
|
missing_rows = find_missing_row_for_job(job.id, job.notification_count)
|
|
|
|
|
for row_to_process in missing_rows:
|
2019-11-08 10:30:26 +00:00
|
|
|
recipient_csv, template, sender_id = get_recipient_csv_and_template_and_sender_id(job)
|
2019-11-05 16:47:00 +00:00
|
|
|
for row in recipient_csv.get_rows():
|
|
|
|
|
if row.index == row_to_process.missing_row:
|
2019-11-15 15:32:16 +00:00
|
|
|
current_app.logger.info(
|
2019-11-07 15:01:23 +00:00
|
|
|
"Processing missing row: {} for job: {}".format(row_to_process.missing_row, job.id))
|
2019-11-08 10:30:26 +00:00
|
|
|
process_row(row, template, job, job.service, sender_id=sender_id)
|
2019-11-29 21:24:17 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
@notify_celery.task(name='check-for-services-with-high-failure-rates-or-sending-to-tv-numbers')
|
|
|
|
|
@statsd(namespace="tasks")
|
|
|
|
|
def check_for_services_with_high_failure_rates_or_sending_to_tv_numbers():
|
|
|
|
|
services_with_failures = get_services_with_high_failure_rates()
|
|
|
|
|
# services_sending_to_tv_numbers = dao_find_services_sending_to_tv_numbers(number=100)
|
|
|
|
|
|
|
|
|
|
if services_with_failures:
|
|
|
|
|
message = "{} service(s) have had high permanent-failure rates for sms messages in last 24 hours: ".format(
|
|
|
|
|
len(services_with_failures)
|
|
|
|
|
)
|
|
|
|
|
for service in services_with_failures:
|
|
|
|
|
message += "service id: {} failure rate: {}, ".format(service["id"], service["permanent_failure_rate"])
|
|
|
|
|
|
|
|
|
|
current_app.logger.exception(message)
|
|
|
|
|
|
|
|
|
|
if current_app.config['NOTIFY_ENVIRONMENT'] in ['live', 'production', 'test']:
|
|
|
|
|
zendesk_client.create_ticket(
|
|
|
|
|
subject="[{}] High failure rates for sms spotted for services".format(
|
|
|
|
|
current_app.config['NOTIFY_ENVIRONMENT']
|
|
|
|
|
),
|
|
|
|
|
message=message,
|
|
|
|
|
ticket_type=zendesk_client.TYPE_INCIDENT
|
|
|
|
|
)
|