split delete task up into per service

we really don't gain anything by running each service delete in sequence
- we get the services, and then just loop through them deleting per
service. By deleting per service in separate tasks, we can take
advantage of parallelism. the only thing we lose is some log lines but I
don't think we're that interested in them.

only set query limit at the move_notifications dao function - the task
doesn't really care about the technical implementation of how it deletes
the notifications
This commit is contained in:
Leo Hemsted
2021-12-06 09:30:48 +00:00
parent bbc68293bb
commit 49cc1b643f
4 changed files with 197 additions and 90 deletions

View File

@@ -5,6 +5,7 @@ from flask import current_app
from notifications_utils.clients.zendesk.zendesk_client import (
NotifySupportTicket,
)
from notifications_utils.timezones import convert_utc_to_bst
from sqlalchemy import func
from sqlalchemy.exc import SQLAlchemyError
@@ -21,7 +22,11 @@ from app.dao.jobs_dao import (
from app.dao.notifications_dao import (
dao_get_notifications_processing_time_stats,
dao_timeout_notifications,
delete_notifications_older_than_retention_by_type,
get_service_ids_that_have_notifications_from_before_timestamp,
move_notifications_to_notification_history,
)
from app.dao.service_data_retention_dao import (
fetch_service_data_retention_for_all_services_by_notification_type,
)
from app.models import (
EMAIL_TYPE,
@@ -68,45 +73,74 @@ def delete_notifications_older_than_retention():
@notify_celery.task(name="delete-sms-notifications")
@cronitor("delete-sms-notifications")
def delete_sms_notifications_older_than_retention():
start = datetime.utcnow()
deleted = delete_notifications_older_than_retention_by_type('sms')
current_app.logger.info(
"Delete {} job started {} finished {} deleted {} sms notifications".format(
'sms',
start,
datetime.utcnow(),
deleted
)
)
_delete_notifications_older_than_retention_by_type('sms')
@notify_celery.task(name="delete-email-notifications")
@cronitor("delete-email-notifications")
def delete_email_notifications_older_than_retention():
start = datetime.utcnow()
deleted = delete_notifications_older_than_retention_by_type('email')
current_app.logger.info(
"Delete {} job started {} finished {} deleted {} email notifications".format(
'email',
start,
datetime.utcnow(),
deleted
)
)
_delete_notifications_older_than_retention_by_type('email')
@notify_celery.task(name="delete-letter-notifications")
@cronitor("delete-letter-notifications")
def delete_letter_notifications_older_than_retention():
start = datetime.utcnow()
deleted = delete_notifications_older_than_retention_by_type('letter')
_delete_notifications_older_than_retention_by_type('letter')
def _delete_notifications_older_than_retention_by_type(notification_type):
current_app.logger.info(
"Delete {} job started {} finished {} deleted {} letter notifications".format(
'letter',
start,
datetime.utcnow(),
deleted
'Deleting {} notifications for services with flexible data retention'.format(notification_type))
flexible_data_retention = fetch_service_data_retention_for_all_services_by_notification_type(notification_type)
for f in flexible_data_retention:
day_to_delete_backwards_from = get_london_midnight_in_utc(
convert_utc_to_bst(datetime.utcnow()).date() - timedelta(days=f.days_of_retention)
)
delete_notifications_for_service_and_type.apply_async(queue=QueueNames.REPORTING, kwargs={
'service_id': f.service_id,
'notification_type': notification_type,
'datetime_to_delete_before': day_to_delete_backwards_from
})
seven_days_ago = get_london_midnight_in_utc(convert_utc_to_bst(datetime.utcnow()).date() - timedelta(days=7))
service_ids_with_data_retention = {x.service_id for x in flexible_data_retention}
# get a list of all service ids that we'll need to delete for. Typically that might only be 5% of services.
# This query takes a couple of mins to run.
service_ids_that_have_sent_notifications_recently = get_service_ids_that_have_notifications_from_before_timestamp(
notification_type,
seven_days_ago
)
service_ids_to_purge = service_ids_that_have_sent_notifications_recently - service_ids_with_data_retention
current_app.logger.info('Deleting {} notifications for {} services without flexible data retention'.format(
notification_type,
len(service_ids_to_purge)
))
for service_id in service_ids_to_purge:
delete_notifications_for_service_and_type.apply_async(queue=QueueNames.REPORTING, kwargs={
'service_id': service_id,
'notification_type': notification_type,
'datetime_to_delete_before': seven_days_ago
})
@notify_celery.task(name='delete-notifications-for-service-and-type')
def delete_notifications_for_service_and_type(service_id, notification_type, datetime_to_delete_before):
start = datetime.utcnow()
num_deleted = move_notifications_to_notification_history(
notification_type,
service_id,
datetime_to_delete_before,
)
end = datetime.utcnow()
current_app.logger.info(
f'Deleted {num_deleted} {notification_type} notifications for '
f'service id: {service_id} in {(end - start).seconds} seconds'
)

View File

@@ -45,13 +45,8 @@ from app.models import (
Notification,
NotificationHistory,
ProviderDetails,
ServiceDataRetention,
)
from app.utils import (
escape_special_characters,
get_london_midnight_in_utc,
midnight_n_days_ago,
)
from app.utils import escape_special_characters, midnight_n_days_ago
def dao_get_last_date_template_was_used(template_id, service_id):
@@ -304,55 +299,6 @@ def _filter_query(query, filter_dict=None):
return query
def delete_notifications_older_than_retention_by_type(notification_type, qry_limit=50000):
current_app.logger.info(
'Deleting {} notifications for services with flexible data retention'.format(notification_type))
flexible_data_retention = ServiceDataRetention.query.filter(
ServiceDataRetention.notification_type == notification_type
).all()
deleted = 0
for f in flexible_data_retention:
current_app.logger.info(
"Deleting {} notifications for service id: {}".format(notification_type, f.service_id))
day_to_delete_backwards_from = get_london_midnight_in_utc(
convert_utc_to_bst(datetime.utcnow()).date()) - timedelta(days=f.days_of_retention)
deleted += _move_notifications_to_notification_history(
notification_type, f.service_id, day_to_delete_backwards_from, qry_limit)
seven_days_ago = get_london_midnight_in_utc(convert_utc_to_bst(datetime.utcnow()).date()) - timedelta(days=7)
service_ids_with_data_retention = {x.service_id for x in flexible_data_retention}
# get a list of all service ids that we'll need to delete for. Typically that might only be 5% of services.
# This query takes a couple of mins to run.
service_ids_that_have_sent_notifications_recently = {
row.service_id
for row in db.session.query(
Notification.service_id
).filter(
Notification.notification_type == notification_type,
Notification.created_at < seven_days_ago
).distinct()
}
service_ids_to_purge = service_ids_that_have_sent_notifications_recently - service_ids_with_data_retention
current_app.logger.info('Deleting {} notifications for {} services without flexible data retention'.format(
notification_type,
len(service_ids_to_purge)
))
for service_id in service_ids_to_purge:
deleted += _move_notifications_to_notification_history(
notification_type, service_id, seven_days_ago, qry_limit)
current_app.logger.info('Finished deleting {} notifications'.format(notification_type))
return deleted
@autocommit
def insert_notification_history_delete_notifications(
notification_type, service_id, timestamp_to_delete_backwards_from, qry_limit=50000
@@ -431,18 +377,23 @@ def insert_notification_history_delete_notifications(
return result
def _move_notifications_to_notification_history(notification_type, service_id, day_to_delete_backwards_from, qry_limit):
def move_notifications_to_notification_history(
notification_type,
service_id,
timestamp_to_delete_backwards_from,
qry_limit=50000
):
deleted = 0
if notification_type == LETTER_TYPE:
_delete_letters_from_s3(
notification_type, service_id, day_to_delete_backwards_from, qry_limit
notification_type, service_id, timestamp_to_delete_backwards_from, qry_limit
)
delete_count_per_call = 1
while delete_count_per_call > 0:
delete_count_per_call = insert_notification_history_delete_notifications(
notification_type=notification_type,
service_id=service_id,
timestamp_to_delete_backwards_from=day_to_delete_backwards_from,
timestamp_to_delete_backwards_from=timestamp_to_delete_backwards_from,
qry_limit=qry_limit
)
deleted += delete_count_per_call
@@ -451,7 +402,7 @@ def _move_notifications_to_notification_history(notification_type, service_id, d
Notification.query.filter(
Notification.notification_type == notification_type,
Notification.service_id == service_id,
Notification.created_at < day_to_delete_backwards_from,
Notification.created_at < timestamp_to_delete_backwards_from,
Notification.key_type == KEY_TYPE_TEST
).delete(synchronize_session=False)
db.session.commit()
@@ -840,3 +791,15 @@ def _duplicate_update_warning(notification, status):
sent_by=notification.sent_by
)
)
def get_service_ids_that_have_notifications_from_before_timestamp(notification_type, timestamp):
return {
row.service_id
for row in db.session.query(
Notification.service_id
).filter(
Notification.notification_type == notification_type,
Notification.created_at < timestamp
).distinct()
}

View File

@@ -50,3 +50,7 @@ def update_service_data_retention(service_data_retention_id, service_id, days_of
}
)
return updated_count
def fetch_service_data_retention_for_all_services_by_notification_type(notification_type):
return ServiceDataRetention.query.filter(ServiceDataRetention.notification_type == notification_type).all()