Function to delete high volume notifications for a service

This command will allow us to explore the timing of deleting notifications, which is why it's written as a command. The intention is to try this strategy out then lift and shift the code into a task.

The next commit will do the insert into NotificationHistory and drop the back up table.
This commit is contained in:
Rebecca Law
2020-03-22 10:18:21 +00:00
parent 3a95fba9b0
commit fa1128a3b6

View File

@@ -11,7 +11,7 @@ from click_datetime import Datetime as click_dt
from flask import current_app, json
from notifications_utils.recipients import RecipientCSV
from notifications_utils.template import SMSMessageTemplate
from sqlalchemy.exc import IntegrityError
from sqlalchemy.exc import IntegrityError, SQLAlchemyError
from sqlalchemy.orm.exc import NoResultFound
from notifications_utils.statsd_decorators import statsd
@@ -45,7 +45,6 @@ from app.dao.templates_dao import dao_get_template_by_id
from app.dao.users_dao import delete_model_user, delete_user_verify_codes, get_user_by_email
from app.models import (
PROVIDERS,
NOTIFICATION_CREATED,
KEY_TYPE_TEST,
SMS_TYPE,
EMAIL_TYPE,
@@ -57,6 +56,10 @@ from app.models import (
Service,
EmailBranding,
LetterBranding,
NOTIFICATION_CREATED,
NOTIFICATION_DELIVERED,
NOTIFICATION_PERMANENT_FAILURE,
NOTIFICATION_TEMPORARY_FAILURE,
)
from app.performance_platform.processing_time import send_processing_time_for_start_and_end
from app.utils import get_london_midnight_in_utc, get_midnight_for_day_before
@@ -901,3 +904,105 @@ def process_row_from_job(job_id, job_row_number):
notification_id = process_row(row, template, job, job.service)
current_app.logger.info("Process row {} for job {} created notification_id: {}".format(
job_row_number, job_id, notification_id))
@notify_command(name='delete-high-volume-service-data')
@click.option('-i', '--service_id', required=True, help='Service id of the high volume service')
@click.option('-s', '--start_date', required=True, type=click_dt(format='%Y-%m-%d %H'),
help='Start date of process YYYY-MM-DD HH:mm')
@click.option('-e', '--end_date', required=True, type=click_dt(format='%Y-%m-%d %H'),
help='End date of process YYYY-MM-DD HH:MM')
@click.option('-t', '--notification_type', required=False, default='email',
help='Notification type of the data to delete')
def delete_high_volume_service_data(service_id, start_date, end_date, notification_type):
str_date = start_date.strftime('%Y_%m_%d_%H')
str_end_date = end_date.strftime('%Y_%m_%d_%H')
bkup_tble_name = f'back_up_notifications_{str_date}_to_{str_end_date}'
print(f"""Creating back up {bkup_tble_name} starting at: {datetime.utcnow()}
for {notification_type} notifications for service: {service_id}, starting at {start_date} and {end_date}
""")
_create_notification_bkup_table(bkup_tble_name, end_date, notification_type, service_id, start_date)
hour_start = start_date
hour_end = hour_start + timedelta(hours=1)
terminate_statuses = [NOTIFICATION_DELIVERED, NOTIFICATION_TEMPORARY_FAILURE, NOTIFICATION_PERMANENT_FAILURE]
delete_query = Notification.query.filter(
Notification.notification_type == notification_type,
Notification.service_id == service_id,
Notification.created_at >= hour_start,
Notification.created_at <= hour_end,
Notification.status.in_(terminate_statuses)
)
# Iterate hour by hour
del_count = 0
while hour_start < end_date:
del_count += delete_query.delete(synchronize_session=False)
db.session.commit()
# print(hour_start, hour_end)
# increment hour
hour_end = hour_end + timedelta(hours=1)
hour_start = hour_start + timedelta(hours=1)
delete_query = Notification.query.filter(
Notification.notification_type == notification_type,
Notification.service_id == service_id,
Notification.created_at >= hour_start,
Notification.created_at <= hour_end,
Notification.status.in_(terminate_statuses)
)
print(f"""Completed deleting {del_count} from notifications
for {notification_type} notifications for
service: {service_id}, starting at {start_date} and {end_date}
""")
def _create_notification_bkup_table(bkup_tble_name, end_date, notification_type, service_id, start_date):
try:
create_tbl_sql = f"""
CREATE TABLE {bkup_tble_name} AS
SELECT *
FROM notifications
WHERE service_id = :service_id
AND notification_type = :notification_type
AND created_at >= :start_date
AND created_at <= :end_date
AND key_type = 'normal'
AND notification_status in ('delivered', 'permanent-failure', 'temporary-failure')
"""
input_params = {
"service_id": service_id,
"notification_type": notification_type,
"start_date": start_date,
"end_date": end_date
}
db.session.execute(create_tbl_sql, input_params)
db.session.commit()
except SQLAlchemyError as e:
db.session.commit() # terminate previous transaction
# This query isn't quite right yet, if the notifications are already deleted still get 0.
# however it doesn't cause any harm, because there is nothing to delete.
# But it will also return 0 if the rows exist in notifications
qry = f""" Select count(*) from notifications
WHERE service_id = :service_id
AND notification_type = :notification_type
AND created_at >= :start_date
AND created_at <= :end_date
AND key_type = 'normal'
UNION
SELECT count(*)
FROM {bkup_tble_name}
"""
result = db.session.execute(qry, input_params).fetchall()
if result[0][0] == result[1][0]:
print("Table and data already exists, keep going")
return
else:
# This will throw the exception if the data is already deleted or partically deleted...
# but gives us a chance to see what's happending.
print(f"Table already exists but row counts are inconsistent is missing bail out. "
f"There are {result[0][0]} rows in notifications and {result[1][0]} rows in {bkup_tble_name}")
raise e