mirror of
https://github.com/GSA/notifications-api.git
synced 2026-02-02 17:31:14 -05:00
Merge pull request #3388 from alphagov/parallel-deletes
make delete notification tasks parallel by notification type
This commit is contained in:
@@ -65,66 +65,54 @@ def _remove_csv_files(job_types):
|
|||||||
|
|
||||||
@notify_celery.task(name="delete-notifications-older-than-retention")
|
@notify_celery.task(name="delete-notifications-older-than-retention")
|
||||||
def delete_notifications_older_than_retention():
|
def delete_notifications_older_than_retention():
|
||||||
delete_email_notifications_older_than_retention()
|
delete_email_notifications_older_than_retention.apply_async(queue=QueueNames.PERIODIC)
|
||||||
delete_sms_notifications_older_than_retention()
|
delete_sms_notifications_older_than_retention.apply_async(queue=QueueNames.PERIODIC)
|
||||||
delete_letter_notifications_older_than_retention()
|
delete_letter_notifications_older_than_retention.apply_async(queue=QueueNames.PERIODIC)
|
||||||
|
|
||||||
|
|
||||||
@notify_celery.task(name="delete-sms-notifications")
|
@notify_celery.task(name="delete-sms-notifications")
|
||||||
@cronitor("delete-sms-notifications")
|
@cronitor("delete-sms-notifications")
|
||||||
def delete_sms_notifications_older_than_retention():
|
def delete_sms_notifications_older_than_retention():
|
||||||
try:
|
start = datetime.utcnow()
|
||||||
start = datetime.utcnow()
|
deleted = delete_notifications_older_than_retention_by_type('sms')
|
||||||
deleted = delete_notifications_older_than_retention_by_type('sms')
|
current_app.logger.info(
|
||||||
current_app.logger.info(
|
"Delete {} job started {} finished {} deleted {} sms notifications".format(
|
||||||
"Delete {} job started {} finished {} deleted {} sms notifications".format(
|
'sms',
|
||||||
'sms',
|
start,
|
||||||
start,
|
datetime.utcnow(),
|
||||||
datetime.utcnow(),
|
deleted
|
||||||
deleted
|
|
||||||
)
|
|
||||||
)
|
)
|
||||||
except SQLAlchemyError:
|
)
|
||||||
current_app.logger.exception("Failed to delete sms notifications")
|
|
||||||
raise
|
|
||||||
|
|
||||||
|
|
||||||
@notify_celery.task(name="delete-email-notifications")
|
@notify_celery.task(name="delete-email-notifications")
|
||||||
@cronitor("delete-email-notifications")
|
@cronitor("delete-email-notifications")
|
||||||
def delete_email_notifications_older_than_retention():
|
def delete_email_notifications_older_than_retention():
|
||||||
try:
|
start = datetime.utcnow()
|
||||||
start = datetime.utcnow()
|
deleted = delete_notifications_older_than_retention_by_type('email')
|
||||||
deleted = delete_notifications_older_than_retention_by_type('email')
|
current_app.logger.info(
|
||||||
current_app.logger.info(
|
"Delete {} job started {} finished {} deleted {} email notifications".format(
|
||||||
"Delete {} job started {} finished {} deleted {} email notifications".format(
|
'email',
|
||||||
'email',
|
start,
|
||||||
start,
|
datetime.utcnow(),
|
||||||
datetime.utcnow(),
|
deleted
|
||||||
deleted
|
|
||||||
)
|
|
||||||
)
|
)
|
||||||
except SQLAlchemyError:
|
)
|
||||||
current_app.logger.exception("Failed to delete email notifications")
|
|
||||||
raise
|
|
||||||
|
|
||||||
|
|
||||||
@notify_celery.task(name="delete-letter-notifications")
|
@notify_celery.task(name="delete-letter-notifications")
|
||||||
@cronitor("delete-letter-notifications")
|
@cronitor("delete-letter-notifications")
|
||||||
def delete_letter_notifications_older_than_retention():
|
def delete_letter_notifications_older_than_retention():
|
||||||
try:
|
start = datetime.utcnow()
|
||||||
start = datetime.utcnow()
|
deleted = delete_notifications_older_than_retention_by_type('letter')
|
||||||
deleted = delete_notifications_older_than_retention_by_type('letter')
|
current_app.logger.info(
|
||||||
current_app.logger.info(
|
"Delete {} job started {} finished {} deleted {} letter notifications".format(
|
||||||
"Delete {} job started {} finished {} deleted {} letter notifications".format(
|
'letter',
|
||||||
'letter',
|
start,
|
||||||
start,
|
datetime.utcnow(),
|
||||||
datetime.utcnow(),
|
deleted
|
||||||
deleted
|
|
||||||
)
|
|
||||||
)
|
)
|
||||||
except SQLAlchemyError:
|
)
|
||||||
current_app.logger.exception("Failed to delete letter notifications")
|
|
||||||
raise
|
|
||||||
|
|
||||||
|
|
||||||
@notify_celery.task(name='timeout-sending-notifications')
|
@notify_celery.task(name='timeout-sending-notifications')
|
||||||
|
|||||||
@@ -357,13 +357,25 @@ def delete_notifications_older_than_retention_by_type(notification_type, qry_lim
|
|||||||
def insert_notification_history_delete_notifications(
|
def insert_notification_history_delete_notifications(
|
||||||
notification_type, service_id, timestamp_to_delete_backwards_from, qry_limit=50000
|
notification_type, service_id, timestamp_to_delete_backwards_from, qry_limit=50000
|
||||||
):
|
):
|
||||||
|
"""
|
||||||
|
Delete up to 50,000 notifications that are past retention for a notification type and service.
|
||||||
|
|
||||||
|
|
||||||
|
Steps are as follows:
|
||||||
|
|
||||||
|
Create a temporary notifications table
|
||||||
|
Populate that table with up to 50k notifications that are to be deleted. (Note: no specified order)
|
||||||
|
Insert everything in the temp table into notification history
|
||||||
|
Delete from notifications if notification id is in the temp table
|
||||||
|
Drop the temp table (automatically when the transaction commits)
|
||||||
|
|
||||||
|
Temporary tables are in a separate postgres schema, and only visible to the current session (db connection,
|
||||||
|
in a celery task there's one connection per thread.)
|
||||||
|
"""
|
||||||
# Setting default query limit to 50,000 which take about 48 seconds on current table size
|
# Setting default query limit to 50,000 which take about 48 seconds on current table size
|
||||||
# 10, 000 took 11s and 100,000 took 1 min 30 seconds.
|
# 10, 000 took 11s and 100,000 took 1 min 30 seconds.
|
||||||
drop_table_if_exists = """
|
|
||||||
DROP TABLE if exists NOTIFICATION_ARCHIVE
|
|
||||||
"""
|
|
||||||
select_into_temp_table = """
|
select_into_temp_table = """
|
||||||
CREATE TEMP TABLE NOTIFICATION_ARCHIVE AS
|
CREATE TEMP TABLE NOTIFICATION_ARCHIVE ON COMMIT DROP AS
|
||||||
SELECT id, job_id, job_row_number, service_id, template_id, template_version, api_key_id,
|
SELECT id, job_id, job_row_number, service_id, template_id, template_version, api_key_id,
|
||||||
key_type, notification_type, created_at, sent_at, sent_by, updated_at, reference, billable_units,
|
key_type, notification_type, created_at, sent_at, sent_by, updated_at, reference, billable_units,
|
||||||
client_reference, international, phone_prefix, rate_multiplier, notification_status,
|
client_reference, international, phone_prefix, rate_multiplier, notification_status,
|
||||||
@@ -376,7 +388,7 @@ def insert_notification_history_delete_notifications(
|
|||||||
limit :qry_limit
|
limit :qry_limit
|
||||||
"""
|
"""
|
||||||
select_into_temp_table_for_letters = """
|
select_into_temp_table_for_letters = """
|
||||||
CREATE TEMP TABLE NOTIFICATION_ARCHIVE AS
|
CREATE TEMP TABLE NOTIFICATION_ARCHIVE ON COMMIT DROP AS
|
||||||
SELECT id, job_id, job_row_number, service_id, template_id, template_version, api_key_id,
|
SELECT id, job_id, job_row_number, service_id, template_id, template_version, api_key_id,
|
||||||
key_type, notification_type, created_at, sent_at, sent_by, updated_at, reference, billable_units,
|
key_type, notification_type, created_at, sent_at, sent_by, updated_at, reference, billable_units,
|
||||||
client_reference, international, phone_prefix, rate_multiplier, notification_status,
|
client_reference, international, phone_prefix, rate_multiplier, notification_status,
|
||||||
@@ -407,8 +419,6 @@ def insert_notification_history_delete_notifications(
|
|||||||
"qry_limit": qry_limit
|
"qry_limit": qry_limit
|
||||||
}
|
}
|
||||||
|
|
||||||
db.session.execute(drop_table_if_exists)
|
|
||||||
|
|
||||||
select_to_use = select_into_temp_table_for_letters if notification_type == 'letter' else select_into_temp_table
|
select_to_use = select_into_temp_table_for_letters if notification_type == 'letter' else select_into_temp_table
|
||||||
db.session.execute(select_to_use, input_params)
|
db.session.execute(select_to_use, input_params)
|
||||||
|
|
||||||
@@ -418,7 +428,6 @@ def insert_notification_history_delete_notifications(
|
|||||||
|
|
||||||
db.session.execute(delete_query)
|
db.session.execute(delete_query)
|
||||||
|
|
||||||
db.session.execute("DROP TABLE NOTIFICATION_ARCHIVE")
|
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user