Merge pull request #2768 from alphagov/optimise-delete-notifications-task

Refactor delete_notifications_older_than_retention_by_type
This commit is contained in:
Rebecca Law
2020-03-24 15:04:48 +00:00
committed by GitHub
3 changed files with 120 additions and 255 deletions

View File

@@ -21,7 +21,6 @@ from sqlalchemy.orm import joinedload
from sqlalchemy.orm.exc import NoResultFound
from sqlalchemy.sql import functions
from sqlalchemy.sql.expression import case
from sqlalchemy.dialects.postgresql import insert
from werkzeug.datastructures import MultiDict
from app import db, create_uuid
@@ -335,31 +334,33 @@ def delete_notifications_older_than_retention_by_type(notification_type, qry_lim
@statsd(namespace="dao")
@transactional
def insert_notification_history_delete_notifications(
notification_type, service_id, start_time, end_time, qry_limit=10000
notification_type, service_id, timestamp_to_delete_backwards_from, qry_limit=50000
):
# Setting default query limit to 50,000 which take about 48 seconds on current table size
# 10, 000 took 11s and 100,000 took 1 min 30 seconds.
drop_table_if_exists = """
DROP TABLE if exists NOTIFICATION_ARCHIVE
"""
select_into_temp_table = """
CREATE TEMP TABLE NOTIFICATION_ARCHIVE AS
SELECT id
FROM notifications
WHERE service_id = :service_id
AND notification_type = :notification_type
AND created_at >= :start_time
AND created_at <= :end_time
AND key_type = 'normal'
AND notification_status in ('delivered', 'permanent-failure', 'temporary-failure')
limit :qry_limit
"""
insert_query = """
insert into notification_history
SELECT id, job_id, job_row_number, service_id, template_id, template_version, api_key_id,
key_type, notification_type, created_at, sent_at, sent_by, updated_at, reference, billable_units,
client_reference, international, phone_prefix, rate_multiplier, notification_status,
created_by_id, postage, document_download_count
FROM notifications
WHERE id in (select id from NOTIFICATION_ARCHIVE)
FROM notifications
WHERE service_id = :service_id
AND notification_type = :notification_type
AND created_at < :timestamp_to_delete_backwards_from
AND key_type = 'normal'
AND notification_status in ('delivered', 'permanent-failure', 'temporary-failure')
limit :qry_limit
"""
# Insert into NotificationHistory if the row already exists do nothing.
insert_query = """
insert into notification_history
SELECT * from NOTIFICATION_ARCHIVE
ON CONFLICT ON CONSTRAINT notification_history_pkey
DO NOTHING
"""
delete_query = """
DELETE FROM notifications
@@ -368,11 +369,12 @@ def insert_notification_history_delete_notifications(
input_params = {
"service_id": service_id,
"notification_type": notification_type,
"start_time": start_time,
"end_time": end_time,
"timestamp_to_delete_backwards_from": timestamp_to_delete_backwards_from,
"qry_limit": qry_limit
}
current_app.logger.info(f"Start insert_notification_history_delete_notifications for input params {input_params}")
current_app.logger.info(
f"Start executing insert_notification_history_delete_notifications for input params {input_params}"
)
db.session.execute(drop_table_if_exists)
current_app.logger.info('Start executing select into temp table')
db.session.execute(select_into_temp_table, input_params)
@@ -382,7 +384,7 @@ def insert_notification_history_delete_notifications(
current_app.logger.info('Start executing insert into history')
db.session.execute(insert_query)
current_app.logger.info('Start deleting notifications')
current_app.logger.info('Start executing deleting notifications')
db.session.execute(delete_query)
db.session.execute("DROP TABLE NOTIFICATION_ARCHIVE")
@@ -395,116 +397,28 @@ def _move_notifications_to_notification_history(notification_type, service_id, d
_delete_letters_from_s3(
notification_type, service_id, day_to_delete_backwards_from, qry_limit
)
stop = -1 # exclusive, we want to include 0
step = -1
for hour_delta in range(23, stop, step):
# We find the timestamp we want to delete all notifications backwards from
# We then start 23 hours ago, and do an insert notification history before deleting all notifications older
# We then look 22 hours ago, do an insert notifications history before deleting all notifications older
# We continue this until we reach the original timestamp we wanted to delete notifications backwardsfrom
# This enables us to break this into smaller database queries
timestamp_to_delete_backwards_from = day_to_delete_backwards_from - timedelta(hours=hour_delta)
if service_id == '539d63a1-701d-400d-ab11-f3ee2319d4d4':
current_app.logger.info(
"Beginning insert_update_notification_history for GOV.UK Email from {} backwards".format(
timestamp_to_delete_backwards_from
)
)
insert_update_notification_history(notification_type, timestamp_to_delete_backwards_from, service_id, qry_limit)
if service_id == '539d63a1-701d-400d-ab11-f3ee2319d4d4':
current_app.logger.info(
"Beginning _delete_notifications for GOV.UK Email {} backwards".format(
timestamp_to_delete_backwards_from
)
)
deleted += _delete_notifications(
notification_type, timestamp_to_delete_backwards_from, service_id, qry_limit
delete_count_per_call = 1
while delete_count_per_call > 0:
delete_count_per_call = insert_notification_history_delete_notifications(
notification_type=notification_type,
service_id=service_id,
timestamp_to_delete_backwards_from=day_to_delete_backwards_from,
qry_limit=qry_limit
)
deleted += delete_count_per_call
return deleted
def _delete_notifications(notification_type, date_to_delete_from, service_id, query_limit):
subquery = db.session.query(
Notification.id
).join(NotificationHistory, NotificationHistory.id == Notification.id).filter(
# Deleting test Notifications, test notifications are not persisted to NotificationHistory
Notification.query.filter(
Notification.notification_type == notification_type,
Notification.service_id == service_id,
Notification.created_at < date_to_delete_from,
).limit(query_limit).subquery()
deleted = _delete_for_query(subquery)
subquery_for_test_keys = db.session.query(
Notification.id
).filter(
Notification.notification_type == notification_type,
Notification.service_id == service_id,
Notification.created_at < date_to_delete_from,
Notification.created_at < day_to_delete_backwards_from,
Notification.key_type == KEY_TYPE_TEST
).limit(query_limit).subquery()
deleted += _delete_for_query(subquery_for_test_keys)
return deleted
def _delete_for_query(subquery):
number_deleted = db.session.query(Notification).filter(
Notification.id.in_(subquery)).delete(synchronize_session='fetch')
deleted = number_deleted
).delete(synchronize_session=False)
db.session.commit()
while number_deleted > 0:
number_deleted = db.session.query(Notification).filter(
Notification.id.in_(subquery)).delete(synchronize_session='fetch')
deleted += number_deleted
db.session.commit()
return deleted
def insert_update_notification_history(notification_type, date_to_delete_from, service_id, query_limit=10000):
offset = 0
notification_query = db.session.query(
*[x.name for x in NotificationHistory.__table__.c]
).filter(
Notification.notification_type == notification_type,
Notification.service_id == service_id,
Notification.created_at < date_to_delete_from,
Notification.key_type != KEY_TYPE_TEST
).order_by(
Notification.created_at
)
notifications_count = notification_query.count()
while offset < notifications_count:
stmt = insert(NotificationHistory).from_select(
NotificationHistory.__table__.c,
notification_query.limit(query_limit).offset(offset)
)
stmt = stmt.on_conflict_do_update(
constraint="notification_history_pkey",
set_={
"notification_status": stmt.excluded.status,
"reference": stmt.excluded.reference,
"billable_units": stmt.excluded.billable_units,
"updated_at": stmt.excluded.updated_at,
"sent_at": stmt.excluded.sent_at,
"sent_by": stmt.excluded.sent_by
}
)
db.session.connection().execute(stmt)
db.session.commit()
offset += query_limit
def _delete_letters_from_s3(
notification_type, service_id, date_to_delete_from, query_limit
):

View File

@@ -9,16 +9,15 @@ from freezegun import freeze_time
from app.dao.notifications_dao import (
delete_notifications_older_than_retention_by_type,
db,
insert_update_notification_history,
insert_notification_history_delete_notifications)
insert_notification_history_delete_notifications
)
from app.models import Notification, NotificationHistory
from tests.app.db import (
create_template,
create_notification,
create_service_data_retention,
create_service
)
create_service,
create_notification_history)
def create_test_data(notification_type, sample_service, days_of_retention=3):
@@ -157,17 +156,17 @@ def test_delete_notifications_inserts_notification_history(sample_service):
assert NotificationHistory.query.count() == 2
def test_delete_notifications_updates_notification_history(sample_email_template, mocker):
def test_delete_notifications_does_nothing_if_notification_history_row_already_exists(
sample_email_template, mocker
):
mocker.patch("app.dao.notifications_dao.get_s3_bucket_objects")
notification = create_notification(template=sample_email_template, created_at=datetime.utcnow() - timedelta(days=8))
Notification.query.filter_by(id=notification.id).update(
{"status": "delivered",
"reference": "ses_reference",
"billable_units": 1, # I know we don't update this for emails but this is a unit test
"updated_at": datetime.utcnow(),
"sent_at": datetime.utcnow(),
"sent_by": "ses"
}
notification = create_notification(
template=sample_email_template, created_at=datetime.utcnow() - timedelta(days=8),
status='temporary-failure'
)
create_notification_history(
id=notification.id, template=sample_email_template,
created_at=datetime.utcnow() - timedelta(days=8), status='delivered'
)
delete_notifications_older_than_retention_by_type("email")
@@ -175,10 +174,6 @@ def test_delete_notifications_updates_notification_history(sample_email_template
history = NotificationHistory.query.all()
assert len(history) == 1
assert history[0].status == 'delivered'
assert history[0].reference == 'ses_reference'
assert history[0].billable_units == 1
assert history[0].updated_at
assert history[0].sent_by == 'ses'
def test_delete_notifications_keep_data_for_days_of_retention_is_longer(sample_service):
@@ -227,25 +222,14 @@ def test_delete_notifications_does_try_to_delete_from_s3_when_letter_has_not_bee
mock_get_s3.assert_not_called()
@freeze_time("2016-01-10 12:00:00.000000")
def test_should_not_delete_notification_if_history_does_not_exist(sample_service, mocker):
mocker.patch("app.dao.notifications_dao.get_s3_bucket_objects")
mocker.patch("app.dao.notifications_dao.insert_update_notification_history")
with freeze_time('2016-01-01 12:00'):
email_template, letter_template, sms_template = _create_templates(sample_service)
create_notification(template=email_template, status='permanent-failure')
create_notification(template=sms_template, status='delivered')
create_notification(template=letter_template, status='temporary-failure')
assert Notification.query.count() == 3
delete_notifications_older_than_retention_by_type('sms')
assert Notification.query.count() == 3
assert NotificationHistory.query.count() == 0
@freeze_time('2020-03-25 00:01')
def test_delete_notifications_calls_subquery_multiple_times(sample_template):
create_notification(template=sample_template, created_at=datetime.now() - timedelta(days=8))
create_notification(template=sample_template, created_at=datetime.now() - timedelta(days=8))
create_notification(template=sample_template, created_at=datetime.now() - timedelta(days=8))
create_notification(template=sample_template, created_at=datetime.now() - timedelta(days=7, minutes=3),
status='delivered')
create_notification(template=sample_template, created_at=datetime.now() - timedelta(days=7, minutes=3),
status='delivered')
create_notification(template=sample_template, created_at=datetime.now() - timedelta(days=7, minutes=3),
status='delivered')
assert Notification.query.count() == 3
delete_notifications_older_than_retention_by_type('sms', qry_limit=1)
@@ -253,101 +237,18 @@ def test_delete_notifications_calls_subquery_multiple_times(sample_template):
def test_delete_notifications_returns_sum_correctly(sample_template):
create_notification(template=sample_template, created_at=datetime.now() - timedelta(days=8))
create_notification(template=sample_template, created_at=datetime.now() - timedelta(days=8))
create_notification(template=sample_template, created_at=datetime.now() - timedelta(days=8), status='delivered')
create_notification(template=sample_template, created_at=datetime.now() - timedelta(days=8), status='delivered')
s2 = create_service(service_name='s2')
t2 = create_template(s2, template_type='sms')
create_notification(template=t2, created_at=datetime.now() - timedelta(days=8))
create_notification(template=t2, created_at=datetime.now() - timedelta(days=8))
create_notification(template=t2, created_at=datetime.now() - timedelta(days=8), status='delivered')
create_notification(template=t2, created_at=datetime.now() - timedelta(days=8), status='delivered')
ret = delete_notifications_older_than_retention_by_type('sms', qry_limit=1)
assert ret == 4
def test_insert_update_notification_history(sample_service):
template = create_template(sample_service, template_type='sms')
notification_1 = create_notification(template=template, created_at=datetime.utcnow() - timedelta(days=3))
notification_2 = create_notification(template=template, created_at=datetime.utcnow() - timedelta(days=8))
notification_3 = create_notification(template=template, created_at=datetime.utcnow() - timedelta(days=9))
other_types = ['email', 'letter']
for template_type in other_types:
t = create_template(service=sample_service, template_type=template_type)
create_notification(template=t, created_at=datetime.utcnow() - timedelta(days=3))
create_notification(template=t, created_at=datetime.utcnow() - timedelta(days=8))
insert_update_notification_history(
notification_type='sms', date_to_delete_from=datetime.utcnow() - timedelta(days=7),
service_id=sample_service.id)
history = NotificationHistory.query.all()
assert len(history) == 2
history_ids = [x.id for x in history]
assert notification_1.id not in history_ids
assert notification_2.id in history_ids
assert notification_3.id in history_ids
def test_insert_update_notification_history_with_more_notifications_than_query_limit(mocker, sample_service):
template = create_template(sample_service, template_type='sms')
notification_1 = create_notification(template=template, created_at=datetime.utcnow() - timedelta(days=3))
notification_2 = create_notification(template=template, created_at=datetime.utcnow() - timedelta(days=8))
notification_3 = create_notification(template=template, created_at=datetime.utcnow() - timedelta(days=9))
other_types = ['email', 'letter']
for template_type in other_types:
t = create_template(service=sample_service, template_type=template_type)
create_notification(template=t, created_at=datetime.utcnow() - timedelta(days=3))
create_notification(template=t, created_at=datetime.utcnow() - timedelta(days=8))
db_connection_spy = mocker.spy(db.session, 'connection')
db_commit_spy = mocker.spy(db.session, 'commit')
insert_update_notification_history(
notification_type='sms', date_to_delete_from=datetime.utcnow() - timedelta(days=7),
service_id=sample_service.id, query_limit=1)
history = NotificationHistory.query.all()
assert len(history) == 2
history_ids = [x.id for x in history]
assert notification_1.id not in history_ids
assert notification_2.id in history_ids
assert notification_3.id in history_ids
assert db_connection_spy.call_count == 2
assert db_commit_spy.call_count == 2
def test_insert_update_notification_history_only_insert_update_given_service(sample_service):
other_service = create_service(service_name='another service')
other_template = create_template(service=other_service)
template = create_template(service=sample_service)
notification_1 = create_notification(template=template, created_at=datetime.utcnow() - timedelta(days=3))
notification_2 = create_notification(template=template, created_at=datetime.utcnow() - timedelta(days=8))
notification_3 = create_notification(template=other_template, created_at=datetime.utcnow() - timedelta(days=3))
notification_4 = create_notification(template=other_template, created_at=datetime.utcnow() - timedelta(days=8))
insert_update_notification_history('sms', datetime.utcnow() - timedelta(days=7), sample_service.id)
history = NotificationHistory.query.all()
assert len(history) == 1
history_ids = [x.id for x in history]
assert notification_1.id not in history_ids
assert notification_2.id in history_ids
assert notification_3.id not in history_ids
assert notification_4.id not in history_ids
def test_insert_update_notification_history_updates_history_with_new_status(sample_template):
notification_1 = create_notification(template=sample_template, created_at=datetime.utcnow() - timedelta(days=3))
notification_2 = create_notification(template=sample_template, created_at=datetime.utcnow() - timedelta(days=8),
status='delivered')
insert_update_notification_history(
'sms', datetime.utcnow() - timedelta(days=7), sample_template.service_id)
history = NotificationHistory.query.get(notification_2.id)
assert history.status == 'delivered'
assert not NotificationHistory.query.get(notification_1.id)
@freeze_time('2020-03-20 14:00')
def test_insert_notification_history_delete_notifications(sample_email_template):
# should be deleted
@@ -357,29 +258,78 @@ def test_insert_notification_history_delete_notifications(sample_email_template)
created_at=datetime.utcnow() + timedelta(minutes=20), status='permanent-failure')
create_notification(template=sample_email_template,
created_at=datetime.utcnow() + timedelta(minutes=30), status='temporary-failure')
create_notification(template=sample_email_template,
created_at=datetime.utcnow() + timedelta(minutes=59), status='temporary-failure')
# should NOT be deleted
create_notification(template=sample_email_template,
created_at=datetime.utcnow() - timedelta(minutes=59), status='temporary-failure')
created_at=datetime.utcnow() + timedelta(hours=1), status='delivered')
create_notification(template=sample_email_template,
created_at=datetime.utcnow() + timedelta(minutes=61), status='temporary-failure')
create_notification(template=sample_email_template,
created_at=datetime.utcnow() + timedelta(hours=1, seconds=1), status='temporary-failure')
create_notification(template=sample_email_template,
created_at=datetime.utcnow() - timedelta(hours=1), status='delivered')
create_notification(template=sample_email_template,
created_at=datetime.utcnow() + timedelta(minutes=20), status='created')
# should NOT be deleted - wrong status
create_notification(template=sample_email_template,
created_at=datetime.utcnow() - timedelta(days=1), status='sending')
create_notification(template=sample_email_template,
created_at=datetime.utcnow() - timedelta(days=1), status='technical-failure')
create_notification(template=sample_email_template,
created_at=datetime.utcnow() - timedelta(hours=1), status='created')
del_count = insert_notification_history_delete_notifications(
notification_type=sample_email_template.template_type,
service_id=sample_email_template.service_id,
start_time=datetime.utcnow(),
end_time=datetime.utcnow() + timedelta(hours=1))
timestamp_to_delete_backwards_from=datetime.utcnow() + timedelta(hours=1))
assert del_count == 3
assert del_count == 4
notifications = Notification.query.all()
history_rows = NotificationHistory.query.all()
assert len(history_rows) == 3
assert len(notifications) == 6
assert len(history_rows) == 4
assert len(notifications) == 7
def test_insert_notification_history_delete_notifications_more_notifications_than_query_limit(sample_template):
create_notification(template=sample_template,
created_at=datetime.utcnow() + timedelta(minutes=4), status='delivered')
create_notification(template=sample_template,
created_at=datetime.utcnow() + timedelta(minutes=20), status='permanent-failure')
create_notification(template=sample_template,
created_at=datetime.utcnow() + timedelta(minutes=30), status='temporary-failure')
del_count = insert_notification_history_delete_notifications(
notification_type=sample_template.template_type,
service_id=sample_template.service_id,
timestamp_to_delete_backwards_from=datetime.utcnow() + timedelta(hours=1),
qry_limit=1
)
assert del_count == 1
notifications = Notification.query.all()
history_rows = NotificationHistory.query.all()
assert len(history_rows) == 1
assert len(notifications) == 2
def test_insert_notification_history_delete_notifications_only_insert_delete_for_given_service(sample_email_template):
notification_to_move = create_notification(template=sample_email_template,
created_at=datetime.utcnow() + timedelta(minutes=4), status='delivered')
another_service = create_service(service_name='Another service')
another_template = create_template(service=another_service, template_type='email')
notification_to_stay = create_notification(template=another_template,
created_at=datetime.utcnow() + timedelta(minutes=4), status='delivered')
del_count = insert_notification_history_delete_notifications(
notification_type=sample_email_template.template_type,
service_id=sample_email_template.service_id,
timestamp_to_delete_backwards_from=datetime.utcnow() + timedelta(hours=1)
)
assert del_count == 1
notifications = Notification.query.all()
history_rows = NotificationHistory.query.all()
assert len(notifications) == 1
assert len(history_rows) == 1
assert notifications[0].id == notification_to_stay.id
assert history_rows[0], id == notification_to_move.id

View File

@@ -324,7 +324,8 @@ def create_notification_history(
international=False,
phone_prefix=None,
created_by_id=None,
postage=None
postage=None,
id=None
):
assert job or template
if job:
@@ -341,7 +342,7 @@ def create_notification_history(
postage = 'second'
data = {
'id': uuid.uuid4(),
'id': id or uuid.uuid4(),
'job_id': job and job.id,
'job': job,
'service_id': template.service.id,