From 6bbec9f103180761a5f483d0ff1f9df7967161e8 Mon Sep 17 00:00:00 2001 From: Leo Hemsted Date: Wed, 1 Dec 2021 14:28:08 +0000 Subject: [PATCH] make delete notification tasks parallel by notification type we used to do this until apr 2020. Let's try doing it again. Back then, we had problems with timing. We did two things in spring 2020: We moved to using an intermediary temp table [1] We stopped the tasks being parallelised [2] However, it turned out the real time saving was from changing what services we delete for [3]. The task was actually CPU-bound rather than DB-bound, so that's probably why having the tasks in parallel wasn't helping, since they were all competing for the same CPU. It's worth trying the parallel steps again now that we're no longer CPU bound. Note: Temporary tables are in their own postgres schema, and are only viewable by the current session (session == connection. Each celery worker process has its own db connection). We don't need to worry about separate workers both trying to use the same table at once. I've also added a "DROP ON COMMIT" directive to the table definition just to ensure it doesn't persist past the task even if there's an exception. (This also drops on rollback). Cronitor looks at the three functions separately so we don't need to worry about the main task taking milliseconds where it used to take hours as it isn't monitored itself. I've also removed some unnecessary redundant exception logs. [1] https://github.com/alphagov/notifications-api/pull/2767 [2] https://github.com/alphagov/notifications-api/pull/2798 [3] https://github.com/alphagov/notifications-api/pull/3381 --- app/celery/nightly_tasks.py | 72 +++++++++++++++--------------------- app/dao/notifications_dao.py | 25 +++++++++---- 2 files changed, 47 insertions(+), 50 deletions(-) diff --git a/app/celery/nightly_tasks.py b/app/celery/nightly_tasks.py index beb484f89..ae8c68992 100644 --- a/app/celery/nightly_tasks.py +++ b/app/celery/nightly_tasks.py @@ -65,66 +65,54 @@ def _remove_csv_files(job_types): @notify_celery.task(name="delete-notifications-older-than-retention") def delete_notifications_older_than_retention(): - delete_email_notifications_older_than_retention() - delete_sms_notifications_older_than_retention() - delete_letter_notifications_older_than_retention() + delete_email_notifications_older_than_retention.apply_async(queue=QueueNames.PERIODIC) + delete_sms_notifications_older_than_retention.apply_async(queue=QueueNames.PERIODIC) + delete_letter_notifications_older_than_retention.apply_async(queue=QueueNames.PERIODIC) @notify_celery.task(name="delete-sms-notifications") @cronitor("delete-sms-notifications") def delete_sms_notifications_older_than_retention(): - try: - start = datetime.utcnow() - deleted = delete_notifications_older_than_retention_by_type('sms') - current_app.logger.info( - "Delete {} job started {} finished {} deleted {} sms notifications".format( - 'sms', - start, - datetime.utcnow(), - deleted - ) + start = datetime.utcnow() + deleted = delete_notifications_older_than_retention_by_type('sms') + current_app.logger.info( + "Delete {} job started {} finished {} deleted {} sms notifications".format( + 'sms', + start, + datetime.utcnow(), + deleted ) - except SQLAlchemyError: - current_app.logger.exception("Failed to delete sms notifications") - raise + ) @notify_celery.task(name="delete-email-notifications") @cronitor("delete-email-notifications") def delete_email_notifications_older_than_retention(): - try: - start = datetime.utcnow() - deleted = delete_notifications_older_than_retention_by_type('email') - current_app.logger.info( - "Delete {} job started {} finished {} deleted {} email notifications".format( - 'email', - start, - datetime.utcnow(), - deleted - ) + start = datetime.utcnow() + deleted = delete_notifications_older_than_retention_by_type('email') + current_app.logger.info( + "Delete {} job started {} finished {} deleted {} email notifications".format( + 'email', + start, + datetime.utcnow(), + deleted ) - except SQLAlchemyError: - current_app.logger.exception("Failed to delete email notifications") - raise + ) @notify_celery.task(name="delete-letter-notifications") @cronitor("delete-letter-notifications") def delete_letter_notifications_older_than_retention(): - try: - start = datetime.utcnow() - deleted = delete_notifications_older_than_retention_by_type('letter') - current_app.logger.info( - "Delete {} job started {} finished {} deleted {} letter notifications".format( - 'letter', - start, - datetime.utcnow(), - deleted - ) + start = datetime.utcnow() + deleted = delete_notifications_older_than_retention_by_type('letter') + current_app.logger.info( + "Delete {} job started {} finished {} deleted {} letter notifications".format( + 'letter', + start, + datetime.utcnow(), + deleted ) - except SQLAlchemyError: - current_app.logger.exception("Failed to delete letter notifications") - raise + ) @notify_celery.task(name='timeout-sending-notifications') diff --git a/app/dao/notifications_dao.py b/app/dao/notifications_dao.py index 0b4ba58dc..44f2bddee 100644 --- a/app/dao/notifications_dao.py +++ b/app/dao/notifications_dao.py @@ -357,13 +357,25 @@ def delete_notifications_older_than_retention_by_type(notification_type, qry_lim def insert_notification_history_delete_notifications( notification_type, service_id, timestamp_to_delete_backwards_from, qry_limit=50000 ): + """ + Delete up to 50,000 notifications that are past retention for a notification type and service. + + + Steps are as follows: + + Create a temporary notifications table + Populate that table with up to 50k notifications that are to be deleted. (Note: no specified order) + Insert everything in the temp table into notification history + Delete from notifications if notification id is in the temp table + Drop the temp table (automatically when the transaction commits) + + Temporary tables are in a separate postgres schema, and only visible to the current session (db connection, + in a celery task there's one connection per thread.) + """ # Setting default query limit to 50,000 which take about 48 seconds on current table size # 10, 000 took 11s and 100,000 took 1 min 30 seconds. - drop_table_if_exists = """ - DROP TABLE if exists NOTIFICATION_ARCHIVE - """ select_into_temp_table = """ - CREATE TEMP TABLE NOTIFICATION_ARCHIVE AS + CREATE TEMP TABLE NOTIFICATION_ARCHIVE ON COMMIT DROP AS SELECT id, job_id, job_row_number, service_id, template_id, template_version, api_key_id, key_type, notification_type, created_at, sent_at, sent_by, updated_at, reference, billable_units, client_reference, international, phone_prefix, rate_multiplier, notification_status, @@ -376,7 +388,7 @@ def insert_notification_history_delete_notifications( limit :qry_limit """ select_into_temp_table_for_letters = """ - CREATE TEMP TABLE NOTIFICATION_ARCHIVE AS + CREATE TEMP TABLE NOTIFICATION_ARCHIVE ON COMMIT DROP AS SELECT id, job_id, job_row_number, service_id, template_id, template_version, api_key_id, key_type, notification_type, created_at, sent_at, sent_by, updated_at, reference, billable_units, client_reference, international, phone_prefix, rate_multiplier, notification_status, @@ -407,8 +419,6 @@ def insert_notification_history_delete_notifications( "qry_limit": qry_limit } - db.session.execute(drop_table_if_exists) - select_to_use = select_into_temp_table_for_letters if notification_type == 'letter' else select_into_temp_table db.session.execute(select_to_use, input_params) @@ -418,7 +428,6 @@ def insert_notification_history_delete_notifications( db.session.execute(delete_query) - db.session.execute("DROP TABLE NOTIFICATION_ARCHIVE") return result