Refactor delete_notifications_older_than_retention_by_type to use a new strategy.

The insert_notification_history_delete_notifications function uses a temp table to store the data to insert and delete. This will save extra queries while performing the insert and delete operations.
The function is written in such a way that if the task is stop while processing when it's started up again it will just pick up where it left off.

I've made a decision to delete all test data in one query, I don't anticipate a problem with that.

The performance of this might also be better than last nights test because we are inserting everything we need for the NotificationHistory insert, so we don't need the join to Notifications to perform the insert.
This commit is contained in:
Rebecca Law
2020-03-24 12:21:28 +00:00
parent b243257906
commit f04210ba7d
3 changed files with 111 additions and 243 deletions

View File

@@ -21,7 +21,6 @@ from sqlalchemy.orm import joinedload
from sqlalchemy.orm.exc import NoResultFound
from sqlalchemy.sql import functions
from sqlalchemy.sql.expression import case
from sqlalchemy.dialects.postgresql import insert
from werkzeug.datastructures import MultiDict
from app import db, create_uuid
@@ -335,31 +334,33 @@ def delete_notifications_older_than_retention_by_type(notification_type, qry_lim
@statsd(namespace="dao")
@transactional
def insert_notification_history_delete_notifications(
notification_type, service_id, start_time, end_time, qry_limit=10000
notification_type, service_id, timestamp_to_delete_backwards_from, qry_limit=50000
):
# Setting default query limit to 50,000 which take about 48 seconds on current table size
# 10, 000 took 11s and 100,000 took 1 min 30 seconds.
drop_table_if_exists = """
DROP TABLE if exists NOTIFICATION_ARCHIVE
"""
select_into_temp_table = """
CREATE TEMP TABLE NOTIFICATION_ARCHIVE AS
SELECT id
FROM notifications
WHERE service_id = :service_id
AND notification_type = :notification_type
AND created_at >= :start_time
AND created_at <= :end_time
AND key_type = 'normal'
AND notification_status in ('delivered', 'permanent-failure', 'temporary-failure')
limit :qry_limit
"""
insert_query = """
insert into notification_history
SELECT id, job_id, job_row_number, service_id, template_id, template_version, api_key_id,
key_type, notification_type, created_at, sent_at, sent_by, updated_at, reference, billable_units,
client_reference, international, phone_prefix, rate_multiplier, notification_status,
created_by_id, postage, document_download_count
FROM notifications
WHERE id in (select id from NOTIFICATION_ARCHIVE)
FROM notifications
WHERE service_id = :service_id
AND notification_type = :notification_type
AND created_at <= :timestamp_to_delete_backwards_from
AND key_type = 'normal'
AND notification_status in ('delivered', 'permanent-failure', 'temporary-failure')
limit :qry_limit
"""
# Insert into NotificationHistory if the row already exists do nothing.
insert_query = """
insert into notification_history
SELECT * from NOTIFICATION_ARCHIVE
ON CONFLICT ON CONSTRAINT notification_history_pkey
DO NOTHING
"""
delete_query = """
DELETE FROM notifications
@@ -368,11 +369,12 @@ def insert_notification_history_delete_notifications(
input_params = {
"service_id": service_id,
"notification_type": notification_type,
"start_time": start_time,
"end_time": end_time,
"timestamp_to_delete_backwards_from": timestamp_to_delete_backwards_from,
"qry_limit": qry_limit
}
current_app.logger.info(f"Start insert_notification_history_delete_notifications for input params {input_params}")
current_app.logger.info(
f"Start executing insert_notification_history_delete_notifications for input params {input_params}"
)
db.session.execute(drop_table_if_exists)
current_app.logger.info('Start executing select into temp table')
db.session.execute(select_into_temp_table, input_params)
@@ -382,7 +384,7 @@ def insert_notification_history_delete_notifications(
current_app.logger.info('Start executing insert into history')
db.session.execute(insert_query)
current_app.logger.info('Start deleting notifications')
current_app.logger.info('Start executing deleting notifications')
db.session.execute(delete_query)
db.session.execute("DROP TABLE NOTIFICATION_ARCHIVE")
@@ -406,105 +408,24 @@ def _move_notifications_to_notification_history(notification_type, service_id, d
# This enables us to break this into smaller database queries
timestamp_to_delete_backwards_from = day_to_delete_backwards_from - timedelta(hours=hour_delta)
if service_id == '539d63a1-701d-400d-ab11-f3ee2319d4d4':
current_app.logger.info(
"Beginning insert_update_notification_history for GOV.UK Email from {} backwards".format(
timestamp_to_delete_backwards_from
)
)
insert_update_notification_history(notification_type, timestamp_to_delete_backwards_from, service_id, qry_limit)
if service_id == '539d63a1-701d-400d-ab11-f3ee2319d4d4':
current_app.logger.info(
"Beginning _delete_notifications for GOV.UK Email {} backwards".format(
timestamp_to_delete_backwards_from
)
)
deleted += _delete_notifications(
notification_type, timestamp_to_delete_backwards_from, service_id, qry_limit
deleted += insert_notification_history_delete_notifications(
notification_type=notification_type,
service_id=service_id,
timestamp_to_delete_backwards_from=timestamp_to_delete_backwards_from,
qry_limit=qry_limit
)
return deleted
def _delete_notifications(notification_type, date_to_delete_from, service_id, query_limit):
subquery = db.session.query(
Notification.id
).join(NotificationHistory, NotificationHistory.id == Notification.id).filter(
Notification.query.filter(
Notification.notification_type == notification_type,
Notification.service_id == service_id,
Notification.created_at < date_to_delete_from,
).limit(query_limit).subquery()
deleted = _delete_for_query(subquery)
subquery_for_test_keys = db.session.query(
Notification.id
).filter(
Notification.notification_type == notification_type,
Notification.service_id == service_id,
Notification.created_at < date_to_delete_from,
Notification.created_at < day_to_delete_backwards_from,
Notification.key_type == KEY_TYPE_TEST
).limit(query_limit).subquery()
deleted += _delete_for_query(subquery_for_test_keys)
return deleted
def _delete_for_query(subquery):
number_deleted = db.session.query(Notification).filter(
Notification.id.in_(subquery)).delete(synchronize_session='fetch')
deleted = number_deleted
).delete(synchronize_session=False)
db.session.commit()
while number_deleted > 0:
number_deleted = db.session.query(Notification).filter(
Notification.id.in_(subquery)).delete(synchronize_session='fetch')
deleted += number_deleted
db.session.commit()
return deleted
def insert_update_notification_history(notification_type, date_to_delete_from, service_id, query_limit=10000):
offset = 0
notification_query = db.session.query(
*[x.name for x in NotificationHistory.__table__.c]
).filter(
Notification.notification_type == notification_type,
Notification.service_id == service_id,
Notification.created_at < date_to_delete_from,
Notification.key_type != KEY_TYPE_TEST
).order_by(
Notification.created_at
)
notifications_count = notification_query.count()
while offset < notifications_count:
stmt = insert(NotificationHistory).from_select(
NotificationHistory.__table__.c,
notification_query.limit(query_limit).offset(offset)
)
stmt = stmt.on_conflict_do_update(
constraint="notification_history_pkey",
set_={
"notification_status": stmt.excluded.status,
"reference": stmt.excluded.reference,
"billable_units": stmt.excluded.billable_units,
"updated_at": stmt.excluded.updated_at,
"sent_at": stmt.excluded.sent_at,
"sent_by": stmt.excluded.sent_by
}
)
db.session.connection().execute(stmt)
db.session.commit()
offset += query_limit
def _delete_letters_from_s3(
notification_type, service_id, date_to_delete_from, query_limit
):