From 49cc1b643f499b5f2d40fe47aaf7fab892c51e2d Mon Sep 17 00:00:00 2001 From: Leo Hemsted Date: Mon, 6 Dec 2021 09:30:48 +0000 Subject: [PATCH] split delete task up into per service we really don't gain anything by running each service delete in sequence - we get the services, and then just loop through them deleting per service. By deleting per service in separate tasks, we can take advantage of parallelism. the only thing we lose is some log lines but I don't think we're that interested in them. only set query limit at the move_notifications dao function - the task doesn't really care about the technical implementation of how it deletes the notifications --- app/celery/nightly_tasks.py | 90 +++++++++++++------- app/dao/notifications_dao.py | 81 +++++------------- app/dao/service_data_retention_dao.py | 4 + tests/app/celery/test_nightly_tasks.py | 112 ++++++++++++++++++++++++- 4 files changed, 197 insertions(+), 90 deletions(-) diff --git a/app/celery/nightly_tasks.py b/app/celery/nightly_tasks.py index 5479baff3..ba20791f9 100644 --- a/app/celery/nightly_tasks.py +++ b/app/celery/nightly_tasks.py @@ -5,6 +5,7 @@ from flask import current_app from notifications_utils.clients.zendesk.zendesk_client import ( NotifySupportTicket, ) +from notifications_utils.timezones import convert_utc_to_bst from sqlalchemy import func from sqlalchemy.exc import SQLAlchemyError @@ -21,7 +22,11 @@ from app.dao.jobs_dao import ( from app.dao.notifications_dao import ( dao_get_notifications_processing_time_stats, dao_timeout_notifications, - delete_notifications_older_than_retention_by_type, + get_service_ids_that_have_notifications_from_before_timestamp, + move_notifications_to_notification_history, +) +from app.dao.service_data_retention_dao import ( + fetch_service_data_retention_for_all_services_by_notification_type, ) from app.models import ( EMAIL_TYPE, @@ -68,45 +73,74 @@ def delete_notifications_older_than_retention(): @notify_celery.task(name="delete-sms-notifications") @cronitor("delete-sms-notifications") def delete_sms_notifications_older_than_retention(): - start = datetime.utcnow() - deleted = delete_notifications_older_than_retention_by_type('sms') - current_app.logger.info( - "Delete {} job started {} finished {} deleted {} sms notifications".format( - 'sms', - start, - datetime.utcnow(), - deleted - ) - ) + _delete_notifications_older_than_retention_by_type('sms') @notify_celery.task(name="delete-email-notifications") @cronitor("delete-email-notifications") def delete_email_notifications_older_than_retention(): - start = datetime.utcnow() - deleted = delete_notifications_older_than_retention_by_type('email') - current_app.logger.info( - "Delete {} job started {} finished {} deleted {} email notifications".format( - 'email', - start, - datetime.utcnow(), - deleted - ) - ) + _delete_notifications_older_than_retention_by_type('email') @notify_celery.task(name="delete-letter-notifications") @cronitor("delete-letter-notifications") def delete_letter_notifications_older_than_retention(): - start = datetime.utcnow() - deleted = delete_notifications_older_than_retention_by_type('letter') + _delete_notifications_older_than_retention_by_type('letter') + + +def _delete_notifications_older_than_retention_by_type(notification_type): current_app.logger.info( - "Delete {} job started {} finished {} deleted {} letter notifications".format( - 'letter', - start, - datetime.utcnow(), - deleted + 'Deleting {} notifications for services with flexible data retention'.format(notification_type)) + + flexible_data_retention = fetch_service_data_retention_for_all_services_by_notification_type(notification_type) + for f in flexible_data_retention: + day_to_delete_backwards_from = get_london_midnight_in_utc( + convert_utc_to_bst(datetime.utcnow()).date() - timedelta(days=f.days_of_retention) ) + + delete_notifications_for_service_and_type.apply_async(queue=QueueNames.REPORTING, kwargs={ + 'service_id': f.service_id, + 'notification_type': notification_type, + 'datetime_to_delete_before': day_to_delete_backwards_from + }) + + seven_days_ago = get_london_midnight_in_utc(convert_utc_to_bst(datetime.utcnow()).date() - timedelta(days=7)) + service_ids_with_data_retention = {x.service_id for x in flexible_data_retention} + + # get a list of all service ids that we'll need to delete for. Typically that might only be 5% of services. + # This query takes a couple of mins to run. + service_ids_that_have_sent_notifications_recently = get_service_ids_that_have_notifications_from_before_timestamp( + notification_type, + seven_days_ago + ) + + service_ids_to_purge = service_ids_that_have_sent_notifications_recently - service_ids_with_data_retention + + current_app.logger.info('Deleting {} notifications for {} services without flexible data retention'.format( + notification_type, + len(service_ids_to_purge) + )) + + for service_id in service_ids_to_purge: + delete_notifications_for_service_and_type.apply_async(queue=QueueNames.REPORTING, kwargs={ + 'service_id': service_id, + 'notification_type': notification_type, + 'datetime_to_delete_before': seven_days_ago + }) + + +@notify_celery.task(name='delete-notifications-for-service-and-type') +def delete_notifications_for_service_and_type(service_id, notification_type, datetime_to_delete_before): + start = datetime.utcnow() + num_deleted = move_notifications_to_notification_history( + notification_type, + service_id, + datetime_to_delete_before, + ) + end = datetime.utcnow() + current_app.logger.info( + f'Deleted {num_deleted} {notification_type} notifications for ' + f'service id: {service_id} in {(end - start).seconds} seconds' ) diff --git a/app/dao/notifications_dao.py b/app/dao/notifications_dao.py index 39cac60cd..eb1561e5b 100644 --- a/app/dao/notifications_dao.py +++ b/app/dao/notifications_dao.py @@ -45,13 +45,8 @@ from app.models import ( Notification, NotificationHistory, ProviderDetails, - ServiceDataRetention, -) -from app.utils import ( - escape_special_characters, - get_london_midnight_in_utc, - midnight_n_days_ago, ) +from app.utils import escape_special_characters, midnight_n_days_ago def dao_get_last_date_template_was_used(template_id, service_id): @@ -304,55 +299,6 @@ def _filter_query(query, filter_dict=None): return query -def delete_notifications_older_than_retention_by_type(notification_type, qry_limit=50000): - current_app.logger.info( - 'Deleting {} notifications for services with flexible data retention'.format(notification_type)) - - flexible_data_retention = ServiceDataRetention.query.filter( - ServiceDataRetention.notification_type == notification_type - ).all() - deleted = 0 - for f in flexible_data_retention: - current_app.logger.info( - "Deleting {} notifications for service id: {}".format(notification_type, f.service_id)) - - day_to_delete_backwards_from = get_london_midnight_in_utc( - convert_utc_to_bst(datetime.utcnow()).date()) - timedelta(days=f.days_of_retention) - - deleted += _move_notifications_to_notification_history( - notification_type, f.service_id, day_to_delete_backwards_from, qry_limit) - - seven_days_ago = get_london_midnight_in_utc(convert_utc_to_bst(datetime.utcnow()).date()) - timedelta(days=7) - service_ids_with_data_retention = {x.service_id for x in flexible_data_retention} - - # get a list of all service ids that we'll need to delete for. Typically that might only be 5% of services. - # This query takes a couple of mins to run. - service_ids_that_have_sent_notifications_recently = { - row.service_id - for row in db.session.query( - Notification.service_id - ).filter( - Notification.notification_type == notification_type, - Notification.created_at < seven_days_ago - ).distinct() - } - - service_ids_to_purge = service_ids_that_have_sent_notifications_recently - service_ids_with_data_retention - - current_app.logger.info('Deleting {} notifications for {} services without flexible data retention'.format( - notification_type, - len(service_ids_to_purge) - )) - - for service_id in service_ids_to_purge: - deleted += _move_notifications_to_notification_history( - notification_type, service_id, seven_days_ago, qry_limit) - - current_app.logger.info('Finished deleting {} notifications'.format(notification_type)) - - return deleted - - @autocommit def insert_notification_history_delete_notifications( notification_type, service_id, timestamp_to_delete_backwards_from, qry_limit=50000 @@ -431,18 +377,23 @@ def insert_notification_history_delete_notifications( return result -def _move_notifications_to_notification_history(notification_type, service_id, day_to_delete_backwards_from, qry_limit): +def move_notifications_to_notification_history( + notification_type, + service_id, + timestamp_to_delete_backwards_from, + qry_limit=50000 +): deleted = 0 if notification_type == LETTER_TYPE: _delete_letters_from_s3( - notification_type, service_id, day_to_delete_backwards_from, qry_limit + notification_type, service_id, timestamp_to_delete_backwards_from, qry_limit ) delete_count_per_call = 1 while delete_count_per_call > 0: delete_count_per_call = insert_notification_history_delete_notifications( notification_type=notification_type, service_id=service_id, - timestamp_to_delete_backwards_from=day_to_delete_backwards_from, + timestamp_to_delete_backwards_from=timestamp_to_delete_backwards_from, qry_limit=qry_limit ) deleted += delete_count_per_call @@ -451,7 +402,7 @@ def _move_notifications_to_notification_history(notification_type, service_id, d Notification.query.filter( Notification.notification_type == notification_type, Notification.service_id == service_id, - Notification.created_at < day_to_delete_backwards_from, + Notification.created_at < timestamp_to_delete_backwards_from, Notification.key_type == KEY_TYPE_TEST ).delete(synchronize_session=False) db.session.commit() @@ -840,3 +791,15 @@ def _duplicate_update_warning(notification, status): sent_by=notification.sent_by ) ) + + +def get_service_ids_that_have_notifications_from_before_timestamp(notification_type, timestamp): + return { + row.service_id + for row in db.session.query( + Notification.service_id + ).filter( + Notification.notification_type == notification_type, + Notification.created_at < timestamp + ).distinct() + } diff --git a/app/dao/service_data_retention_dao.py b/app/dao/service_data_retention_dao.py index cf6e7116d..c77def87e 100644 --- a/app/dao/service_data_retention_dao.py +++ b/app/dao/service_data_retention_dao.py @@ -50,3 +50,7 @@ def update_service_data_retention(service_data_retention_id, service_id, days_of } ) return updated_count + + +def fetch_service_data_retention_for_all_services_by_notification_type(notification_type): + return ServiceDataRetention.query.filter(ServiceDataRetention.notification_type == notification_type).all() diff --git a/tests/app/celery/test_nightly_tasks.py b/tests/app/celery/test_nightly_tasks.py index ade428b5f..43c1395b3 100644 --- a/tests/app/celery/test_nightly_tasks.py +++ b/tests/app/celery/test_nightly_tasks.py @@ -11,6 +11,7 @@ from notifications_utils.clients.zendesk.zendesk_client import ( from app.celery import nightly_tasks from app.celery.nightly_tasks import ( + _delete_notifications_older_than_retention_by_type, delete_email_notifications_older_than_retention, delete_inbound_sms, delete_letter_notifications_older_than_retention, @@ -143,24 +144,35 @@ def test_remove_csv_files_filters_by_type(mocker, sample_service): def test_delete_sms_notifications_older_than_retention_calls_child_task(notify_api, mocker): - mocked = mocker.patch('app.celery.nightly_tasks.delete_notifications_older_than_retention_by_type') + mocked = mocker.patch('app.celery.nightly_tasks._delete_notifications_older_than_retention_by_type') delete_sms_notifications_older_than_retention() mocked.assert_called_once_with('sms') def test_delete_email_notifications_older_than_retentions_calls_child_task(notify_api, mocker): mocked_notifications = mocker.patch( - 'app.celery.nightly_tasks.delete_notifications_older_than_retention_by_type') + 'app.celery.nightly_tasks._delete_notifications_older_than_retention_by_type') delete_email_notifications_older_than_retention() mocked_notifications.assert_called_once_with('email') def test_delete_letter_notifications_older_than_retention_calls_child_task(notify_api, mocker): - mocked = mocker.patch('app.celery.nightly_tasks.delete_notifications_older_than_retention_by_type') + mocked = mocker.patch('app.celery.nightly_tasks._delete_notifications_older_than_retention_by_type') delete_letter_notifications_older_than_retention() mocked.assert_called_once_with('letter') +def test_should_not_update_status_of_letter_notifications(client, sample_letter_template): + created_at = datetime.utcnow() - timedelta(days=5) + not1 = create_notification(template=sample_letter_template, status='sending', created_at=created_at) + not2 = create_notification(template=sample_letter_template, status='created', created_at=created_at) + + timeout_notifications() + + assert not1.status == 'sending' + assert not2.status == 'created' + + @freeze_time("2021-12-13T10:00") def test_timeout_notifications(mocker, sample_notification): mock_update = mocker.patch('app.celery.nightly_tasks.check_and_queue_callback_task') @@ -427,3 +439,97 @@ def test_save_daily_notification_processing_time_when_in_bst(mocker, sample_temp assert persisted_to_db[0].bst_date == date(2021, 4, 17) assert persisted_to_db[0].messages_total == 2 assert persisted_to_db[0].messages_within_10_secs == 2 + + +@freeze_time('2021-06-05 03:00') +def test_delete_notifications_task_calls_task_for_services_with_data_retention_of_same_type(notify_db_session, mocker): + sms_service = create_service(service_name='a') + email_service = create_service(service_name='b') + letter_service = create_service(service_name='c') + + create_service_data_retention(sms_service, notification_type='sms') + create_service_data_retention(email_service, notification_type='email') + create_service_data_retention(letter_service, notification_type='letter') + + mock_subtask = mocker.patch('app.celery.nightly_tasks.delete_notifications_for_service_and_type') + + _delete_notifications_older_than_retention_by_type('sms') + + mock_subtask.apply_async.assert_called_once_with(queue='reporting-tasks', kwargs={ + 'service_id': sms_service.id, + 'notification_type': 'sms', + # three days of retention, its morn of 5th, so we want to keep all messages from 4th, 3rd and 2nd. + 'datetime_to_delete_before': datetime(2021, 6, 1, 23, 0), + }) + + +@freeze_time('2021-04-05 03:00') +def test_delete_notifications_task_calls_task_for_services_with_data_retention_by_looking_at_retention( + notify_db_session, + mocker +): + service_14_days = create_service(service_name='a') + service_3_days = create_service(service_name='b') + create_service_data_retention(service_14_days, days_of_retention=14) + create_service_data_retention(service_3_days, days_of_retention=3) + + mock_subtask = mocker.patch('app.celery.nightly_tasks.delete_notifications_for_service_and_type') + + _delete_notifications_older_than_retention_by_type('sms') + + assert mock_subtask.apply_async.call_count == 2 + mock_subtask.apply_async.assert_has_calls(any_order=True, calls=[ + call(queue=ANY, kwargs={ + 'service_id': service_14_days.id, + 'notification_type': 'sms', + 'datetime_to_delete_before': datetime(2021, 3, 22, 0, 0) + }), + call(queue=ANY, kwargs={ + 'service_id': service_3_days.id, + 'notification_type': 'sms', + 'datetime_to_delete_before': datetime(2021, 4, 1, 23, 0) + }), + ]) + + +@freeze_time('2021-04-03 03:00') +def test_delete_notifications_task_calls_task_for_services_that_have_sent_notifications_recently( + notify_db_session, + mocker +): + + service_will_delete_1 = create_service(service_name='a') + service_will_delete_2 = create_service(service_name='b') + service_nothing_to_delete = create_service(service_name='c') + + create_template(service_will_delete_1) + create_template(service_will_delete_2) + nothing_to_delete_sms_template = create_template(service_nothing_to_delete, template_type='sms') + nothing_to_delete_email_template = create_template(service_nothing_to_delete, template_type='email') + + # will be deleted as service has no custom retention, but past our default 7 days + create_notification(service_will_delete_1.templates[0], created_at=datetime.now() - timedelta(days=8)) + create_notification(service_will_delete_2.templates[0], created_at=datetime.now() - timedelta(days=8)) + + # will be kept as it's recent, and we won't run delete_notifications_for_service_and_type + create_notification(nothing_to_delete_sms_template, created_at=datetime.now() - timedelta(days=2)) + # this is an old notification, but for email not sms, so we won't run delete_notifications_for_service_and_type + create_notification(nothing_to_delete_email_template, created_at=datetime.now() - timedelta(days=8)) + + mock_subtask = mocker.patch('app.celery.nightly_tasks.delete_notifications_for_service_and_type') + + _delete_notifications_older_than_retention_by_type('sms') + + assert mock_subtask.apply_async.call_count == 2 + mock_subtask.apply_async.assert_has_calls(any_order=True, calls=[ + call(queue=ANY, kwargs={ + 'service_id': service_will_delete_1.id, + 'notification_type': 'sms', + 'datetime_to_delete_before': datetime(2021, 3, 27, 0, 0) + }), + call(queue=ANY, kwargs={ + 'service_id': service_will_delete_2.id, + 'notification_type': 'sms', + 'datetime_to_delete_before': datetime(2021, 3, 27, 0, 0) + }), + ])