mirror of
https://github.com/GSA/notifications-api.git
synced 2026-02-01 07:35:34 -05:00
separate nightly tasks and other scheduled tasks.
other tasks is anything that is run on a different frequency than nightly
This commit is contained in:
317
app/celery/nightly_tasks.py
Normal file
317
app/celery/nightly_tasks.py
Normal file
@@ -0,0 +1,317 @@
|
||||
from datetime import (
|
||||
datetime,
|
||||
timedelta
|
||||
)
|
||||
|
||||
import pytz
|
||||
from flask import current_app
|
||||
from notifications_utils.statsd_decorators import statsd
|
||||
from sqlalchemy import func
|
||||
from sqlalchemy.exc import SQLAlchemyError
|
||||
|
||||
from app import notify_celery, performance_platform_client, zendesk_client
|
||||
from app.aws import s3
|
||||
from app.celery.service_callback_tasks import (
|
||||
send_delivery_status_to_service,
|
||||
create_delivery_status_callback_data,
|
||||
)
|
||||
from app.config import QueueNames
|
||||
from app.dao.inbound_sms_dao import delete_inbound_sms_created_more_than_a_week_ago
|
||||
from app.dao.jobs_dao import (
|
||||
dao_get_jobs_older_than_data_retention,
|
||||
dao_archive_job
|
||||
)
|
||||
from app.dao.notifications_dao import (
|
||||
dao_timeout_notifications,
|
||||
delete_notifications_created_more_than_a_week_ago_by_type,
|
||||
)
|
||||
from app.dao.service_callback_api_dao import get_service_delivery_status_callback_api_for_service
|
||||
from app.exceptions import NotificationTechnicalFailureException
|
||||
from app.models import (
|
||||
Notification,
|
||||
NOTIFICATION_SENDING,
|
||||
LETTER_TYPE,
|
||||
KEY_TYPE_NORMAL
|
||||
)
|
||||
from app.performance_platform import total_sent_notifications, processing_time
|
||||
|
||||
|
||||
@notify_celery.task(name="remove_csv_files")
|
||||
@statsd(namespace="tasks")
|
||||
def remove_csv_files(job_types):
|
||||
jobs = dao_get_jobs_older_than_data_retention(notification_types=job_types)
|
||||
for job in jobs:
|
||||
s3.remove_job_from_s3(job.service_id, job.id)
|
||||
dao_archive_job(job)
|
||||
current_app.logger.info("Job ID {} has been removed from s3.".format(job.id))
|
||||
|
||||
|
||||
@notify_celery.task(name="delete-sms-notifications")
|
||||
@statsd(namespace="tasks")
|
||||
def delete_sms_notifications_older_than_seven_days():
|
||||
try:
|
||||
start = datetime.utcnow()
|
||||
deleted = delete_notifications_created_more_than_a_week_ago_by_type('sms')
|
||||
current_app.logger.info(
|
||||
"Delete {} job started {} finished {} deleted {} sms notifications".format(
|
||||
'sms',
|
||||
start,
|
||||
datetime.utcnow(),
|
||||
deleted
|
||||
)
|
||||
)
|
||||
except SQLAlchemyError:
|
||||
current_app.logger.exception("Failed to delete sms notifications")
|
||||
raise
|
||||
|
||||
|
||||
@notify_celery.task(name="delete-email-notifications")
|
||||
@statsd(namespace="tasks")
|
||||
def delete_email_notifications_older_than_seven_days():
|
||||
try:
|
||||
start = datetime.utcnow()
|
||||
deleted = delete_notifications_created_more_than_a_week_ago_by_type('email')
|
||||
current_app.logger.info(
|
||||
"Delete {} job started {} finished {} deleted {} email notifications".format(
|
||||
'email',
|
||||
start,
|
||||
datetime.utcnow(),
|
||||
deleted
|
||||
)
|
||||
)
|
||||
except SQLAlchemyError:
|
||||
current_app.logger.exception("Failed to delete email notifications")
|
||||
raise
|
||||
|
||||
|
||||
@notify_celery.task(name="delete-letter-notifications")
|
||||
@statsd(namespace="tasks")
|
||||
def delete_letter_notifications_older_than_seven_days():
|
||||
try:
|
||||
start = datetime.utcnow()
|
||||
deleted = delete_notifications_created_more_than_a_week_ago_by_type('letter')
|
||||
current_app.logger.info(
|
||||
"Delete {} job started {} finished {} deleted {} letter notifications".format(
|
||||
'letter',
|
||||
start,
|
||||
datetime.utcnow(),
|
||||
deleted
|
||||
)
|
||||
)
|
||||
except SQLAlchemyError:
|
||||
current_app.logger.exception("Failed to delete letter notifications")
|
||||
raise
|
||||
|
||||
|
||||
@notify_celery.task(name='timeout-sending-notifications')
|
||||
@statsd(namespace="tasks")
|
||||
def timeout_notifications():
|
||||
technical_failure_notifications, temporary_failure_notifications = \
|
||||
dao_timeout_notifications(current_app.config.get('SENDING_NOTIFICATIONS_TIMEOUT_PERIOD'))
|
||||
|
||||
notifications = technical_failure_notifications + temporary_failure_notifications
|
||||
for notification in notifications:
|
||||
# queue callback task only if the service_callback_api exists
|
||||
service_callback_api = get_service_delivery_status_callback_api_for_service(service_id=notification.service_id)
|
||||
if service_callback_api:
|
||||
encrypted_notification = create_delivery_status_callback_data(notification, service_callback_api)
|
||||
send_delivery_status_to_service.apply_async([str(notification.id), encrypted_notification],
|
||||
queue=QueueNames.CALLBACKS)
|
||||
|
||||
current_app.logger.info(
|
||||
"Timeout period reached for {} notifications, status has been updated.".format(len(notifications)))
|
||||
if technical_failure_notifications:
|
||||
message = "{} notifications have been updated to technical-failure because they " \
|
||||
"have timed out and are still in created.Notification ids: {}".format(
|
||||
len(technical_failure_notifications), [str(x.id) for x in technical_failure_notifications])
|
||||
raise NotificationTechnicalFailureException(message)
|
||||
|
||||
|
||||
@notify_celery.task(name='send-daily-performance-platform-stats')
|
||||
@statsd(namespace="tasks")
|
||||
def send_daily_performance_platform_stats():
|
||||
if performance_platform_client.active:
|
||||
yesterday = datetime.utcnow() - timedelta(days=1)
|
||||
send_total_sent_notifications_to_performance_platform(yesterday)
|
||||
processing_time.send_processing_time_to_performance_platform()
|
||||
|
||||
|
||||
def send_total_sent_notifications_to_performance_platform(day):
|
||||
count_dict = total_sent_notifications.get_total_sent_notifications_for_day(day)
|
||||
email_sent_count = count_dict.get('email').get('count')
|
||||
sms_sent_count = count_dict.get('sms').get('count')
|
||||
letter_sent_count = count_dict.get('letter').get('count')
|
||||
start_date = count_dict.get('start_date')
|
||||
|
||||
current_app.logger.info(
|
||||
"Attempting to update Performance Platform for {} with {} emails, {} text messages and {} letters"
|
||||
.format(start_date, email_sent_count, sms_sent_count, letter_sent_count)
|
||||
)
|
||||
|
||||
total_sent_notifications.send_total_notifications_sent_for_day_stats(
|
||||
start_date,
|
||||
'sms',
|
||||
sms_sent_count
|
||||
)
|
||||
|
||||
total_sent_notifications.send_total_notifications_sent_for_day_stats(
|
||||
start_date,
|
||||
'email',
|
||||
email_sent_count
|
||||
)
|
||||
|
||||
total_sent_notifications.send_total_notifications_sent_for_day_stats(
|
||||
start_date,
|
||||
'letter',
|
||||
letter_sent_count
|
||||
)
|
||||
|
||||
|
||||
@notify_celery.task(name="delete-inbound-sms")
|
||||
@statsd(namespace="tasks")
|
||||
def delete_inbound_sms_older_than_seven_days():
|
||||
try:
|
||||
start = datetime.utcnow()
|
||||
deleted = delete_inbound_sms_created_more_than_a_week_ago()
|
||||
current_app.logger.info(
|
||||
"Delete inbound sms job started {} finished {} deleted {} inbound sms notifications".format(
|
||||
start,
|
||||
datetime.utcnow(),
|
||||
deleted
|
||||
)
|
||||
)
|
||||
except SQLAlchemyError:
|
||||
current_app.logger.exception("Failed to delete inbound sms notifications")
|
||||
raise
|
||||
|
||||
|
||||
@notify_celery.task(name="remove_transformed_dvla_files")
|
||||
@statsd(namespace="tasks")
|
||||
def remove_transformed_dvla_files():
|
||||
jobs = dao_get_jobs_older_than_data_retention(notification_types=[LETTER_TYPE])
|
||||
for job in jobs:
|
||||
s3.remove_transformed_dvla_file(job.id)
|
||||
current_app.logger.info("Transformed dvla file for job {} has been removed from s3.".format(job.id))
|
||||
|
||||
|
||||
@notify_celery.task(name="delete_dvla_response_files")
|
||||
@statsd(namespace="tasks")
|
||||
def delete_dvla_response_files_older_than_seven_days():
|
||||
try:
|
||||
start = datetime.utcnow()
|
||||
bucket_objects = s3.get_s3_bucket_objects(
|
||||
current_app.config['DVLA_RESPONSE_BUCKET_NAME'],
|
||||
'root/dispatch'
|
||||
)
|
||||
older_than_seven_days = s3.filter_s3_bucket_objects_within_date_range(bucket_objects)
|
||||
|
||||
for f in older_than_seven_days:
|
||||
s3.remove_s3_object(current_app.config['DVLA_RESPONSE_BUCKET_NAME'], f['Key'])
|
||||
|
||||
current_app.logger.info(
|
||||
"Delete dvla response files started {} finished {} deleted {} files".format(
|
||||
start,
|
||||
datetime.utcnow(),
|
||||
len(older_than_seven_days)
|
||||
)
|
||||
)
|
||||
except SQLAlchemyError:
|
||||
current_app.logger.exception("Failed to delete dvla response files")
|
||||
raise
|
||||
|
||||
|
||||
@notify_celery.task(name="raise-alert-if-letter-notifications-still-sending")
|
||||
@statsd(namespace="tasks")
|
||||
def raise_alert_if_letter_notifications_still_sending():
|
||||
today = datetime.utcnow().date()
|
||||
|
||||
# Do nothing on the weekend
|
||||
if today.isoweekday() in [6, 7]:
|
||||
return
|
||||
|
||||
if today.isoweekday() in [1, 2]:
|
||||
offset_days = 4
|
||||
else:
|
||||
offset_days = 2
|
||||
still_sending = Notification.query.filter(
|
||||
Notification.notification_type == LETTER_TYPE,
|
||||
Notification.status == NOTIFICATION_SENDING,
|
||||
Notification.key_type == KEY_TYPE_NORMAL,
|
||||
func.date(Notification.sent_at) <= today - timedelta(days=offset_days)
|
||||
).count()
|
||||
|
||||
if still_sending:
|
||||
message = "There are {} letters in the 'sending' state from {}".format(
|
||||
still_sending,
|
||||
(today - timedelta(days=offset_days)).strftime('%A %d %B')
|
||||
)
|
||||
# Only send alerts in production
|
||||
if current_app.config['NOTIFY_ENVIRONMENT'] in ['live', 'production', 'test']:
|
||||
zendesk_client.create_ticket(
|
||||
subject="[{}] Letters still sending".format(current_app.config['NOTIFY_ENVIRONMENT']),
|
||||
message=message,
|
||||
ticket_type=zendesk_client.TYPE_INCIDENT
|
||||
)
|
||||
else:
|
||||
current_app.logger.info(message)
|
||||
|
||||
|
||||
@notify_celery.task(name='raise-alert-if-no-letter-ack-file')
|
||||
@statsd(namespace="tasks")
|
||||
def letter_raise_alert_if_no_ack_file_for_zip():
|
||||
# get a list of zip files since yesterday
|
||||
zip_file_set = set()
|
||||
|
||||
for key in s3.get_list_of_files_by_suffix(bucket_name=current_app.config['LETTERS_PDF_BUCKET_NAME'],
|
||||
subfolder=datetime.utcnow().strftime('%Y-%m-%d') + '/zips_sent',
|
||||
suffix='.TXT'):
|
||||
subname = key.split('/')[-1] # strip subfolder in name
|
||||
zip_file_set.add(subname.upper().rstrip('.TXT'))
|
||||
|
||||
# get acknowledgement file
|
||||
ack_file_set = set()
|
||||
|
||||
yesterday = datetime.now(tz=pytz.utc) - timedelta(days=1) # AWS datetime format
|
||||
|
||||
for key in s3.get_list_of_files_by_suffix(bucket_name=current_app.config['DVLA_RESPONSE_BUCKET_NAME'],
|
||||
subfolder='root/dispatch', suffix='.ACK.txt', last_modified=yesterday):
|
||||
ack_file_set.add(key)
|
||||
|
||||
today_str = datetime.utcnow().strftime('%Y%m%d')
|
||||
|
||||
ack_content_set = set()
|
||||
for key in ack_file_set:
|
||||
if today_str in key:
|
||||
content = s3.get_s3_file(current_app.config['DVLA_RESPONSE_BUCKET_NAME'], key)
|
||||
for zip_file in content.split('\n'): # each line
|
||||
s = zip_file.split('|')
|
||||
ack_content_set.add(s[0].upper())
|
||||
|
||||
message = (
|
||||
"Letter ack file does not contain all zip files sent. "
|
||||
"Missing ack for zip files: {}, "
|
||||
"pdf bucket: {}, subfolder: {}, "
|
||||
"ack bucket: {}"
|
||||
).format(
|
||||
str(sorted(zip_file_set - ack_content_set)),
|
||||
current_app.config['LETTERS_PDF_BUCKET_NAME'],
|
||||
datetime.utcnow().strftime('%Y-%m-%d') + '/zips_sent',
|
||||
current_app.config['DVLA_RESPONSE_BUCKET_NAME']
|
||||
)
|
||||
# strip empty element before comparison
|
||||
ack_content_set.discard('')
|
||||
zip_file_set.discard('')
|
||||
|
||||
if len(zip_file_set - ack_content_set) > 0:
|
||||
if current_app.config['NOTIFY_ENVIRONMENT'] in ['live', 'production', 'test']:
|
||||
zendesk_client.create_ticket(
|
||||
subject="Letter acknowledge error",
|
||||
message=message,
|
||||
ticket_type=zendesk_client.TYPE_INCIDENT
|
||||
)
|
||||
current_app.logger.error(message)
|
||||
|
||||
if len(ack_content_set - zip_file_set) > 0:
|
||||
current_app.logger.info(
|
||||
"letter ack contains zip that is not for today: {}".format(ack_content_set - zip_file_set)
|
||||
)
|
||||
@@ -3,34 +3,20 @@ from datetime import (
|
||||
timedelta
|
||||
)
|
||||
|
||||
import pytz
|
||||
from flask import current_app
|
||||
from notifications_utils.statsd_decorators import statsd
|
||||
from sqlalchemy import and_, func
|
||||
from sqlalchemy import and_
|
||||
from sqlalchemy.exc import SQLAlchemyError
|
||||
|
||||
from app import notify_celery
|
||||
from app import performance_platform_client, zendesk_client
|
||||
from app.aws import s3
|
||||
from app.celery.service_callback_tasks import (
|
||||
send_delivery_status_to_service,
|
||||
create_delivery_status_callback_data,
|
||||
)
|
||||
from app.celery.tasks import process_job
|
||||
from app.config import QueueNames, TaskNames
|
||||
from app.dao.inbound_sms_dao import delete_inbound_sms_created_more_than_a_week_ago
|
||||
from app.dao.invited_org_user_dao import delete_org_invitations_created_more_than_two_days_ago
|
||||
from app.dao.invited_user_dao import delete_invitations_created_more_than_two_days_ago
|
||||
from app.dao.jobs_dao import (
|
||||
dao_set_scheduled_jobs_to_pending,
|
||||
dao_get_jobs_older_than_data_retention,
|
||||
dao_archive_job
|
||||
)
|
||||
from app.dao.jobs_dao import dao_set_scheduled_jobs_to_pending
|
||||
from app.dao.jobs_dao import dao_update_job
|
||||
from app.dao.notifications_dao import (
|
||||
dao_timeout_notifications,
|
||||
is_delivery_slow_for_provider,
|
||||
delete_notifications_created_more_than_a_week_ago_by_type,
|
||||
dao_get_scheduled_notifications,
|
||||
set_scheduled_notification_to_processed,
|
||||
notifications_not_yet_sent
|
||||
@@ -39,35 +25,18 @@ from app.dao.provider_details_dao import (
|
||||
get_current_provider,
|
||||
dao_toggle_sms_provider
|
||||
)
|
||||
from app.dao.service_callback_api_dao import get_service_delivery_status_callback_api_for_service
|
||||
from app.dao.users_dao import delete_codes_older_created_more_than_a_day_ago
|
||||
from app.exceptions import NotificationTechnicalFailureException
|
||||
from app.models import (
|
||||
Job,
|
||||
Notification,
|
||||
NOTIFICATION_SENDING,
|
||||
LETTER_TYPE,
|
||||
JOB_STATUS_IN_PROGRESS,
|
||||
JOB_STATUS_ERROR,
|
||||
SMS_TYPE,
|
||||
EMAIL_TYPE,
|
||||
KEY_TYPE_NORMAL
|
||||
)
|
||||
from app.notifications.process_notifications import send_notification_to_queue
|
||||
from app.performance_platform import total_sent_notifications, processing_time
|
||||
from app.v2.errors import JobIncompleteError
|
||||
|
||||
|
||||
@notify_celery.task(name="remove_csv_files")
|
||||
@statsd(namespace="tasks")
|
||||
def remove_csv_files(job_types):
|
||||
jobs = dao_get_jobs_older_than_data_retention(notification_types=job_types)
|
||||
for job in jobs:
|
||||
s3.remove_job_from_s3(job.service_id, job.id)
|
||||
dao_archive_job(job)
|
||||
current_app.logger.info("Job ID {} has been removed from s3.".format(job.id))
|
||||
|
||||
|
||||
@notify_celery.task(name="run-scheduled-jobs")
|
||||
@statsd(namespace="tasks")
|
||||
def run_scheduled_jobs():
|
||||
@@ -109,63 +78,6 @@ def delete_verify_codes():
|
||||
raise
|
||||
|
||||
|
||||
@notify_celery.task(name="delete-sms-notifications")
|
||||
@statsd(namespace="tasks")
|
||||
def delete_sms_notifications_older_than_seven_days():
|
||||
try:
|
||||
start = datetime.utcnow()
|
||||
deleted = delete_notifications_created_more_than_a_week_ago_by_type('sms')
|
||||
current_app.logger.info(
|
||||
"Delete {} job started {} finished {} deleted {} sms notifications".format(
|
||||
'sms',
|
||||
start,
|
||||
datetime.utcnow(),
|
||||
deleted
|
||||
)
|
||||
)
|
||||
except SQLAlchemyError:
|
||||
current_app.logger.exception("Failed to delete sms notifications")
|
||||
raise
|
||||
|
||||
|
||||
@notify_celery.task(name="delete-email-notifications")
|
||||
@statsd(namespace="tasks")
|
||||
def delete_email_notifications_older_than_seven_days():
|
||||
try:
|
||||
start = datetime.utcnow()
|
||||
deleted = delete_notifications_created_more_than_a_week_ago_by_type('email')
|
||||
current_app.logger.info(
|
||||
"Delete {} job started {} finished {} deleted {} email notifications".format(
|
||||
'email',
|
||||
start,
|
||||
datetime.utcnow(),
|
||||
deleted
|
||||
)
|
||||
)
|
||||
except SQLAlchemyError:
|
||||
current_app.logger.exception("Failed to delete email notifications")
|
||||
raise
|
||||
|
||||
|
||||
@notify_celery.task(name="delete-letter-notifications")
|
||||
@statsd(namespace="tasks")
|
||||
def delete_letter_notifications_older_than_seven_days():
|
||||
try:
|
||||
start = datetime.utcnow()
|
||||
deleted = delete_notifications_created_more_than_a_week_ago_by_type('letter')
|
||||
current_app.logger.info(
|
||||
"Delete {} job started {} finished {} deleted {} letter notifications".format(
|
||||
'letter',
|
||||
start,
|
||||
datetime.utcnow(),
|
||||
deleted
|
||||
)
|
||||
)
|
||||
except SQLAlchemyError:
|
||||
current_app.logger.exception("Failed to delete letter notifications")
|
||||
raise
|
||||
|
||||
|
||||
@notify_celery.task(name="delete-invitations")
|
||||
@statsd(namespace="tasks")
|
||||
def delete_invitations():
|
||||
@@ -181,70 +93,6 @@ def delete_invitations():
|
||||
raise
|
||||
|
||||
|
||||
@notify_celery.task(name='timeout-sending-notifications')
|
||||
@statsd(namespace="tasks")
|
||||
def timeout_notifications():
|
||||
technical_failure_notifications, temporary_failure_notifications = \
|
||||
dao_timeout_notifications(current_app.config.get('SENDING_NOTIFICATIONS_TIMEOUT_PERIOD'))
|
||||
|
||||
notifications = technical_failure_notifications + temporary_failure_notifications
|
||||
for notification in notifications:
|
||||
# queue callback task only if the service_callback_api exists
|
||||
service_callback_api = get_service_delivery_status_callback_api_for_service(service_id=notification.service_id)
|
||||
if service_callback_api:
|
||||
encrypted_notification = create_delivery_status_callback_data(notification, service_callback_api)
|
||||
send_delivery_status_to_service.apply_async([str(notification.id), encrypted_notification],
|
||||
queue=QueueNames.CALLBACKS)
|
||||
|
||||
current_app.logger.info(
|
||||
"Timeout period reached for {} notifications, status has been updated.".format(len(notifications)))
|
||||
if technical_failure_notifications:
|
||||
message = "{} notifications have been updated to technical-failure because they " \
|
||||
"have timed out and are still in created.Notification ids: {}".format(
|
||||
len(technical_failure_notifications), [str(x.id) for x in technical_failure_notifications])
|
||||
raise NotificationTechnicalFailureException(message)
|
||||
|
||||
|
||||
@notify_celery.task(name='send-daily-performance-platform-stats')
|
||||
@statsd(namespace="tasks")
|
||||
def send_daily_performance_platform_stats():
|
||||
if performance_platform_client.active:
|
||||
yesterday = datetime.utcnow() - timedelta(days=1)
|
||||
send_total_sent_notifications_to_performance_platform(yesterday)
|
||||
processing_time.send_processing_time_to_performance_platform()
|
||||
|
||||
|
||||
def send_total_sent_notifications_to_performance_platform(day):
|
||||
count_dict = total_sent_notifications.get_total_sent_notifications_for_day(day)
|
||||
email_sent_count = count_dict.get('email').get('count')
|
||||
sms_sent_count = count_dict.get('sms').get('count')
|
||||
letter_sent_count = count_dict.get('letter').get('count')
|
||||
start_date = count_dict.get('start_date')
|
||||
|
||||
current_app.logger.info(
|
||||
"Attempting to update Performance Platform for {} with {} emails, {} text messages and {} letters"
|
||||
.format(start_date, email_sent_count, sms_sent_count, letter_sent_count)
|
||||
)
|
||||
|
||||
total_sent_notifications.send_total_notifications_sent_for_day_stats(
|
||||
start_date,
|
||||
'sms',
|
||||
sms_sent_count
|
||||
)
|
||||
|
||||
total_sent_notifications.send_total_notifications_sent_for_day_stats(
|
||||
start_date,
|
||||
'email',
|
||||
email_sent_count
|
||||
)
|
||||
|
||||
total_sent_notifications.send_total_notifications_sent_for_day_stats(
|
||||
start_date,
|
||||
'letter',
|
||||
letter_sent_count
|
||||
)
|
||||
|
||||
|
||||
@notify_celery.task(name='switch-current-sms-provider-on-slow-delivery')
|
||||
@statsd(namespace="tasks")
|
||||
def switch_current_sms_provider_on_slow_delivery():
|
||||
@@ -273,95 +121,6 @@ def switch_current_sms_provider_on_slow_delivery():
|
||||
dao_toggle_sms_provider(current_provider.identifier)
|
||||
|
||||
|
||||
@notify_celery.task(name="delete-inbound-sms")
|
||||
@statsd(namespace="tasks")
|
||||
def delete_inbound_sms_older_than_seven_days():
|
||||
try:
|
||||
start = datetime.utcnow()
|
||||
deleted = delete_inbound_sms_created_more_than_a_week_ago()
|
||||
current_app.logger.info(
|
||||
"Delete inbound sms job started {} finished {} deleted {} inbound sms notifications".format(
|
||||
start,
|
||||
datetime.utcnow(),
|
||||
deleted
|
||||
)
|
||||
)
|
||||
except SQLAlchemyError:
|
||||
current_app.logger.exception("Failed to delete inbound sms notifications")
|
||||
raise
|
||||
|
||||
|
||||
@notify_celery.task(name="remove_transformed_dvla_files")
|
||||
@statsd(namespace="tasks")
|
||||
def remove_transformed_dvla_files():
|
||||
jobs = dao_get_jobs_older_than_data_retention(notification_types=[LETTER_TYPE])
|
||||
for job in jobs:
|
||||
s3.remove_transformed_dvla_file(job.id)
|
||||
current_app.logger.info("Transformed dvla file for job {} has been removed from s3.".format(job.id))
|
||||
|
||||
|
||||
@notify_celery.task(name="delete_dvla_response_files")
|
||||
@statsd(namespace="tasks")
|
||||
def delete_dvla_response_files_older_than_seven_days():
|
||||
try:
|
||||
start = datetime.utcnow()
|
||||
bucket_objects = s3.get_s3_bucket_objects(
|
||||
current_app.config['DVLA_RESPONSE_BUCKET_NAME'],
|
||||
'root/dispatch'
|
||||
)
|
||||
older_than_seven_days = s3.filter_s3_bucket_objects_within_date_range(bucket_objects)
|
||||
|
||||
for f in older_than_seven_days:
|
||||
s3.remove_s3_object(current_app.config['DVLA_RESPONSE_BUCKET_NAME'], f['Key'])
|
||||
|
||||
current_app.logger.info(
|
||||
"Delete dvla response files started {} finished {} deleted {} files".format(
|
||||
start,
|
||||
datetime.utcnow(),
|
||||
len(older_than_seven_days)
|
||||
)
|
||||
)
|
||||
except SQLAlchemyError:
|
||||
current_app.logger.exception("Failed to delete dvla response files")
|
||||
raise
|
||||
|
||||
|
||||
@notify_celery.task(name="raise-alert-if-letter-notifications-still-sending")
|
||||
@statsd(namespace="tasks")
|
||||
def raise_alert_if_letter_notifications_still_sending():
|
||||
today = datetime.utcnow().date()
|
||||
|
||||
# Do nothing on the weekend
|
||||
if today.isoweekday() in [6, 7]:
|
||||
return
|
||||
|
||||
if today.isoweekday() in [1, 2]:
|
||||
offset_days = 4
|
||||
else:
|
||||
offset_days = 2
|
||||
still_sending = Notification.query.filter(
|
||||
Notification.notification_type == LETTER_TYPE,
|
||||
Notification.status == NOTIFICATION_SENDING,
|
||||
Notification.key_type == KEY_TYPE_NORMAL,
|
||||
func.date(Notification.sent_at) <= today - timedelta(days=offset_days)
|
||||
).count()
|
||||
|
||||
if still_sending:
|
||||
message = "There are {} letters in the 'sending' state from {}".format(
|
||||
still_sending,
|
||||
(today - timedelta(days=offset_days)).strftime('%A %d %B')
|
||||
)
|
||||
# Only send alerts in production
|
||||
if current_app.config['NOTIFY_ENVIRONMENT'] in ['live', 'production', 'test']:
|
||||
zendesk_client.create_ticket(
|
||||
subject="[{}] Letters still sending".format(current_app.config['NOTIFY_ENVIRONMENT']),
|
||||
message=message,
|
||||
ticket_type=zendesk_client.TYPE_INCIDENT
|
||||
)
|
||||
else:
|
||||
current_app.logger.info(message)
|
||||
|
||||
|
||||
@notify_celery.task(name='check-job-status')
|
||||
@statsd(namespace="tasks")
|
||||
def check_job_status():
|
||||
@@ -401,67 +160,6 @@ def check_job_status():
|
||||
raise JobIncompleteError("Job(s) {} have not completed.".format(job_ids))
|
||||
|
||||
|
||||
@notify_celery.task(name='raise-alert-if-no-letter-ack-file')
|
||||
@statsd(namespace="tasks")
|
||||
def letter_raise_alert_if_no_ack_file_for_zip():
|
||||
# get a list of zip files since yesterday
|
||||
zip_file_set = set()
|
||||
|
||||
for key in s3.get_list_of_files_by_suffix(bucket_name=current_app.config['LETTERS_PDF_BUCKET_NAME'],
|
||||
subfolder=datetime.utcnow().strftime('%Y-%m-%d') + '/zips_sent',
|
||||
suffix='.TXT'):
|
||||
subname = key.split('/')[-1] # strip subfolder in name
|
||||
zip_file_set.add(subname.upper().rstrip('.TXT'))
|
||||
|
||||
# get acknowledgement file
|
||||
ack_file_set = set()
|
||||
|
||||
yesterday = datetime.now(tz=pytz.utc) - timedelta(days=1) # AWS datetime format
|
||||
|
||||
for key in s3.get_list_of_files_by_suffix(bucket_name=current_app.config['DVLA_RESPONSE_BUCKET_NAME'],
|
||||
subfolder='root/dispatch', suffix='.ACK.txt', last_modified=yesterday):
|
||||
ack_file_set.add(key)
|
||||
|
||||
today_str = datetime.utcnow().strftime('%Y%m%d')
|
||||
|
||||
ack_content_set = set()
|
||||
for key in ack_file_set:
|
||||
if today_str in key:
|
||||
content = s3.get_s3_file(current_app.config['DVLA_RESPONSE_BUCKET_NAME'], key)
|
||||
for zip_file in content.split('\n'): # each line
|
||||
s = zip_file.split('|')
|
||||
ack_content_set.add(s[0].upper())
|
||||
|
||||
message = (
|
||||
"Letter ack file does not contain all zip files sent. "
|
||||
"Missing ack for zip files: {}, "
|
||||
"pdf bucket: {}, subfolder: {}, "
|
||||
"ack bucket: {}"
|
||||
).format(
|
||||
str(sorted(zip_file_set - ack_content_set)),
|
||||
current_app.config['LETTERS_PDF_BUCKET_NAME'],
|
||||
datetime.utcnow().strftime('%Y-%m-%d') + '/zips_sent',
|
||||
current_app.config['DVLA_RESPONSE_BUCKET_NAME']
|
||||
)
|
||||
# strip empty element before comparison
|
||||
ack_content_set.discard('')
|
||||
zip_file_set.discard('')
|
||||
|
||||
if len(zip_file_set - ack_content_set) > 0:
|
||||
if current_app.config['NOTIFY_ENVIRONMENT'] in ['live', 'production', 'test']:
|
||||
zendesk_client.create_ticket(
|
||||
subject="Letter acknowledge error",
|
||||
message=message,
|
||||
ticket_type=zendesk_client.TYPE_INCIDENT
|
||||
)
|
||||
current_app.logger.error(message)
|
||||
|
||||
if len(ack_content_set - zip_file_set) > 0:
|
||||
current_app.logger.info(
|
||||
"letter ack contains zip that is not for today: {}".format(ack_content_set - zip_file_set)
|
||||
)
|
||||
|
||||
|
||||
@notify_celery.task(name='replay-created-notifications')
|
||||
@statsd(namespace="tasks")
|
||||
def replay_created_notifications():
|
||||
|
||||
Reference in New Issue
Block a user