Files
notifications-api/app/celery/scheduled_tasks.py

320 lines
12 KiB
Python
Raw Normal View History

2023-11-28 12:30:53 -08:00
import os
2021-03-10 13:55:06 +00:00
from datetime import datetime, timedelta
from flask import current_app
from notifications_utils.clients.zendesk.zendesk_client import NotifySupportTicket
from sqlalchemy import between
from sqlalchemy.exc import SQLAlchemyError
2023-11-28 12:30:53 -08:00
from app import notify_celery, redis_store, zendesk_client
from app.celery.tasks import (
2019-11-08 10:30:26 +00:00
get_recipient_csv_and_template_and_sender_id,
2021-03-10 13:55:06 +00:00
process_incomplete_jobs,
process_job,
process_row,
2021-03-10 13:55:06 +00:00
)
from app.config import QueueNames
2021-03-10 13:55:06 +00:00
from app.dao.invited_org_user_dao import (
delete_org_invitations_created_more_than_two_days_ago,
)
from app.dao.invited_user_dao import expire_invitations_created_more_than_two_days_ago
from app.dao.jobs_dao import (
dao_set_scheduled_jobs_to_pending,
2021-03-10 13:55:06 +00:00
dao_update_job,
find_jobs_with_missing_rows,
2021-03-10 13:55:06 +00:00
find_missing_row_for_job,
)
2023-11-28 12:30:53 -08:00
from app.dao.notifications_dao import (
dao_get_failed_notification_count,
notifications_not_yet_sent,
)
2021-03-10 13:55:06 +00:00
from app.dao.services_dao import (
dao_find_services_sending_to_tv_numbers,
dao_find_services_with_high_failure_rates,
)
from app.dao.users_dao import delete_codes_older_created_more_than_a_day_ago
2023-11-28 12:30:53 -08:00
from app.delivery.send_to_providers import provider_to_use
from app.models import (
2021-03-10 13:55:06 +00:00
EMAIL_TYPE,
JOB_STATUS_ERROR,
2021-03-10 13:55:06 +00:00
JOB_STATUS_IN_PROGRESS,
JOB_STATUS_PENDING,
SMS_TYPE,
2021-03-10 13:55:06 +00:00
Job,
)
from app.notifications.process_notifications import send_notification_to_queue
2023-11-29 09:04:34 -08:00
MAX_NOTIFICATION_FAILS = 10000
@notify_celery.task(name="run-scheduled-jobs")
def run_scheduled_jobs():
try:
for job in dao_set_scheduled_jobs_to_pending():
process_job.apply_async([str(job.id)], queue=QueueNames.JOBS)
2023-08-29 14:54:30 -07:00
current_app.logger.info(
"Job ID {} added to process job queue".format(job.id)
)
except SQLAlchemyError:
current_app.logger.exception("Failed to run scheduled jobs")
raise
@notify_celery.task(name="delete-verify-codes")
def delete_verify_codes():
try:
start = datetime.utcnow()
deleted = delete_codes_older_created_more_than_a_day_ago()
current_app.logger.info(
2023-08-29 14:54:30 -07:00
"Delete job started {} finished {} deleted {} verify codes".format(
start, datetime.utcnow(), deleted
)
)
except SQLAlchemyError:
current_app.logger.exception("Failed to delete verify codes")
raise
@notify_celery.task(name="expire-or-delete-invitations")
def expire_or_delete_invitations():
try:
start = datetime.utcnow()
expired_invites = expire_invitations_created_more_than_two_days_ago()
current_app.logger.info(
f"Expire job started {start} finished {datetime.utcnow()} expired {expired_invites} invitations"
)
except SQLAlchemyError:
current_app.logger.exception("Failed to expire invitations")
raise
try:
start = datetime.utcnow()
deleted_invites = delete_org_invitations_created_more_than_two_days_ago()
current_app.logger.info(
f"Delete job started {start} finished {datetime.utcnow()} deleted {deleted_invites} invitations"
)
except SQLAlchemyError:
current_app.logger.exception("Failed to delete invitations")
raise
2024-01-12 14:28:14 -08:00
# TODO THIS IS ACTUALLY DEPRECATED, WE ARE REMOVING PHONE NUMBERS FROM THE DB
# SO THERE WILL BE NO REASON TO KEEP TRACK OF THIS COUNT
2023-11-28 12:30:53 -08:00
@notify_celery.task(name="check-db-notification-fails")
def check_db_notification_fails():
2023-11-29 08:14:28 -08:00
"""
We are going to use redis to keep track of the previous fail count.
2023-11-28 12:30:53 -08:00
2023-11-29 08:14:28 -08:00
If the number of fails is more than 100% of the limit, we want to send an alert every time this
runs, because it is urgent to fix it.
If the number is more than 25%, 50% or 75% of the limit, we only want to send an alert
on a breach. I.e., if the last number was at 23% and the current number is 27%, send an email.
But if the last number was 26% and the current is 27%, don't.
"""
2024-01-12 15:08:59 -08:00
last_value = redis_store.get("LAST_DB_NOTIFICATION_COUNT")
2023-11-28 12:30:53 -08:00
if not last_value:
last_value = 0
2024-01-12 15:08:59 -08:00
else:
last_value = int(last_value.decode("utf-8"))
2023-11-29 08:14:28 -08:00
2023-11-28 12:30:53 -08:00
failed_count = dao_get_failed_notification_count()
if failed_count > last_value:
redis_store.set("LAST_DB_NOTIFICATION_COUNT", failed_count)
message = ""
2023-11-29 09:04:34 -08:00
curr_env = os.getenv("ENVIRONMENT")
if failed_count >= MAX_NOTIFICATION_FAILS:
message = f"We are over 100% in the db for failed notifications on {curr_env}"
elif (
failed_count >= MAX_NOTIFICATION_FAILS * 0.9
and last_value < MAX_NOTIFICATION_FAILS * 0.9
):
message = (
"tts-notify-alerts@gsa.gov",
2023-12-06 07:32:03 -08:00
f"We crossed above 90% in the db for failed notifications on {curr_env}",
2023-11-29 09:04:34 -08:00
)
elif (
failed_count >= MAX_NOTIFICATION_FAILS * 0.75
and last_value < MAX_NOTIFICATION_FAILS * 0.75
):
2023-11-28 12:30:53 -08:00
message = (
"tts-notify-alerts@gsa.gov",
2023-11-29 09:04:34 -08:00
f"We crossed above 75% in the db for failed notifications on {curr_env}",
2023-11-28 12:30:53 -08:00
)
2023-11-29 09:04:34 -08:00
elif (
failed_count >= MAX_NOTIFICATION_FAILS * 0.5
and last_value < MAX_NOTIFICATION_FAILS * 0.5
):
2023-11-28 12:30:53 -08:00
message = (
"tts-notify-alerts@gsa.gov",
2023-11-29 09:04:34 -08:00
f"We crossed above 50% in the db for failed notifications on {curr_env}",
2023-11-28 12:30:53 -08:00
)
2023-11-29 09:04:34 -08:00
elif (
failed_count >= MAX_NOTIFICATION_FAILS * 0.25
and last_value < MAX_NOTIFICATION_FAILS * 0.25
):
2023-11-28 12:30:53 -08:00
message = (
"tts-notify-alerts@gsa.gov",
2023-11-29 09:04:34 -08:00
f"We crossed above 25% in the db for failed notifications on {curr_env}",
2023-11-28 12:30:53 -08:00
)
2023-11-29 08:14:28 -08:00
# suppress any spam coming from development tier
2023-11-29 09:04:34 -08:00
if message and curr_env != "development":
2023-11-28 12:30:53 -08:00
provider = provider_to_use(EMAIL_TYPE, False)
2023-11-29 08:14:28 -08:00
from_address = '"{}" <{}@{}>'.format(
"Failed Notification Count Alert",
"test_sender",
current_app.config["NOTIFY_EMAIL_DOMAIN"],
)
2023-11-28 12:30:53 -08:00
provider.send_email(
2023-11-29 08:14:28 -08:00
from_address,
"tts-notify-alerts@gsa.gov",
2023-11-28 12:30:53 -08:00
"DB Notification Failures Level Breached",
body=str(message),
)
2023-08-29 14:54:30 -07:00
@notify_celery.task(name="check-job-status")
def check_job_status():
"""
every x minutes do this check
select
from jobs
where job_status == 'in progress'
and processing started between 30 and 35 minutes ago
OR where the job_status == 'pending'
and the job scheduled_for timestamp is between 30 and 35 minutes ago.
if any results then
update the job_status to 'error'
2017-10-12 16:23:28 +01:00
process the rows in the csv that are missing (in another task) just do the check here.
"""
thirty_minutes_ago = datetime.utcnow() - timedelta(minutes=30)
thirty_five_minutes_ago = datetime.utcnow() - timedelta(minutes=35)
incomplete_in_progress_jobs = Job.query.filter(
Job.job_status == JOB_STATUS_IN_PROGRESS,
2023-08-29 14:54:30 -07:00
between(Job.processing_started, thirty_five_minutes_ago, thirty_minutes_ago),
)
incomplete_pending_jobs = Job.query.filter(
Job.job_status == JOB_STATUS_PENDING,
Job.scheduled_for.isnot(None),
2023-08-29 14:54:30 -07:00
between(Job.scheduled_for, thirty_five_minutes_ago, thirty_minutes_ago),
)
2023-08-29 14:54:30 -07:00
jobs_not_complete_after_30_minutes = (
incomplete_in_progress_jobs.union(incomplete_pending_jobs)
.order_by(Job.processing_started, Job.scheduled_for)
.all()
)
# temporarily mark them as ERROR so that they don't get picked up by future check_job_status tasks
# if they haven't been re-processed in time.
job_ids = []
for job in jobs_not_complete_after_30_minutes:
job.job_status = JOB_STATUS_ERROR
dao_update_job(job)
job_ids.append(str(job.id))
if job_ids:
current_app.logger.info("Job(s) {} have not completed.".format(job_ids))
2023-08-29 14:54:30 -07:00
process_incomplete_jobs.apply_async([job_ids], queue=QueueNames.JOBS)
2023-08-29 14:54:30 -07:00
@notify_celery.task(name="replay-created-notifications")
def replay_created_notifications():
# if the notification has not be send after 1 hour, then try to resend.
2023-08-29 14:54:30 -07:00
resend_created_notifications_older_than = 60 * 60
for notification_type in (EMAIL_TYPE, SMS_TYPE):
notifications_to_resend = notifications_not_yet_sent(
2023-08-29 14:54:30 -07:00
resend_created_notifications_older_than, notification_type
)
2018-12-21 13:57:35 +00:00
if len(notifications_to_resend) > 0:
2023-08-29 14:54:30 -07:00
current_app.logger.info(
"Sending {} {} notifications "
"to the delivery queue because the notification "
"status was created.".format(
len(notifications_to_resend), notification_type
)
)
for n in notifications_to_resend:
2023-08-25 12:09:00 -07:00
send_notification_to_queue(notification=n)
2023-08-29 14:54:30 -07:00
@notify_celery.task(name="check-for-missing-rows-in-completed-jobs")
def check_for_missing_rows_in_completed_jobs():
2020-09-26 12:09:54 +01:00
jobs = find_jobs_with_missing_rows()
for job in jobs:
2023-08-29 14:54:30 -07:00
(
recipient_csv,
template,
sender_id,
) = get_recipient_csv_and_template_and_sender_id(job)
missing_rows = find_missing_row_for_job(job.id, job.notification_count)
for row_to_process in missing_rows:
row = recipient_csv[row_to_process.missing_row]
current_app.logger.info(
2023-08-29 14:54:30 -07:00
"Processing missing row: {} for job: {}".format(
row_to_process.missing_row, job.id
)
)
process_row(row, template, job, job.service, sender_id=sender_id)
2023-08-29 14:54:30 -07:00
@notify_celery.task(
name="check-for-services-with-high-failure-rates-or-sending-to-tv-numbers"
)
def check_for_services_with_high_failure_rates_or_sending_to_tv_numbers():
2023-08-29 14:54:30 -07:00
start_date = datetime.utcnow() - timedelta(days=1)
end_date = datetime.utcnow()
message = ""
2023-08-29 14:54:30 -07:00
services_with_failures = dao_find_services_with_high_failure_rates(
start_date=start_date, end_date=end_date
)
services_sending_to_tv_numbers = dao_find_services_sending_to_tv_numbers(
start_date=start_date, end_date=end_date
)
if services_with_failures:
message += "{} service(s) have had high permanent-failure rates for sms messages in last 24 hours:\n".format(
len(services_with_failures)
)
for service in services_with_failures:
service_dashboard = "{}/services/{}".format(
2023-08-29 14:54:30 -07:00
current_app.config["ADMIN_BASE_URL"],
str(service.service_id),
)
2023-08-29 14:54:30 -07:00
message += "service: {} failure rate: {},\n".format(
service_dashboard, service.permanent_failure_rate
)
elif services_sending_to_tv_numbers:
message += "{} service(s) have sent over 500 sms messages to tv numbers in last 24 hours:\n".format(
len(services_sending_to_tv_numbers)
)
for service in services_sending_to_tv_numbers:
service_dashboard = "{}/services/{}".format(
2023-08-29 14:54:30 -07:00
current_app.config["ADMIN_BASE_URL"],
str(service.service_id),
)
message += "service: {} count of sms to tv numbers: {},\n".format(
service_dashboard, service.notification_count
)
if services_with_failures or services_sending_to_tv_numbers:
current_app.logger.warning(message)
2023-08-29 14:54:30 -07:00
if current_app.config["NOTIFY_ENVIRONMENT"] in ["live", "production", "test"]:
message += (
"\nYou can find instructions for this ticket in our manual:\n"
"https://github.com/alphagov/notifications-manuals/wiki/Support-Runbook#Deal-with-services-with-high-failure-rates-or-sending-sms-to-tv-numbers" # noqa
)
ticket = NotifySupportTicket(
subject=f"[{current_app.config['NOTIFY_ENVIRONMENT']}] High failure rates for sms spotted for services",
message=message,
ticket_type=NotifySupportTicket.TYPE_INCIDENT,
2023-08-29 14:54:30 -07:00
technical_ticket=True,
)
zendesk_client.send_ticket_to_zendesk(ticket)