Integrate inbound history insert into delete inbound sms function

This commit is contained in:
Pea Tyczynska
2019-12-20 11:19:36 +00:00
parent a6b4675ae7
commit 448cd1e94e
2 changed files with 59 additions and 150 deletions

View File

@@ -63,93 +63,11 @@ def dao_count_inbound_sms_for_service(service_id, limit_days):
).count() ).count()
def _delete_inbound_sms(datetime_to_delete_from, query_filter): def _insert_update_notification_history(subquery, query_limit=10000):
query_limit = 10000
subquery = db.session.query(
InboundSms.id
).filter(
InboundSms.created_at < datetime_to_delete_from,
*query_filter
).limit(
query_limit
).subquery()
deleted = 0
# set to nonzero just to enter the loop
number_deleted = 1
while number_deleted > 0:
offset = 0
inbound_sms_query = db.session.query(
*[x.name for x in InboundSmsHistory.__table__.c]
).filter(InboundSms.id.in_(subquery))
inbound_sms_count = inbound_sms_query.count()
while offset < inbound_sms_count:
statement = insert(InboundSmsHistory).from_select(
InboundSmsHistory.__table__.c,
inbound_sms_query.limit(query_limit).offset(offset)
)
statement = statement.on_conflict_do_update(
constraint="inbound_sms_history_pkey",
set_={
"created_at": statement.excluded.created_at,
"service_id": statement.excluded.service_id,
"notify_number": statement.excluded.notify_number,
"provider_date": statement.excluded.provider_date,
"provider_reference": statement.excluded.provider_reference,
"provider": statement.excluded.provider
}
)
db.session.connection().execute(statement)
offset += query_limit
number_deleted = InboundSms.query.filter(InboundSms.id.in_(subquery)).delete(synchronize_session='fetch')
deleted += number_deleted
return deleted
@statsd(namespace="dao")
@transactional
def delete_inbound_sms_older_than_retention():
current_app.logger.info('Deleting inbound sms for services with flexible data retention')
flexible_data_retention = ServiceDataRetention.query.join(
ServiceDataRetention.service,
Service.inbound_number
).filter(
ServiceDataRetention.notification_type == SMS_TYPE
).all()
deleted = 0
for f in flexible_data_retention:
n_days_ago = midnight_n_days_ago(f.days_of_retention)
current_app.logger.info("Deleting inbound sms for service id: {}".format(f.service_id))
deleted += _delete_inbound_sms(n_days_ago, query_filter=[InboundSms.service_id == f.service_id])
current_app.logger.info('Deleting inbound sms for services without flexible data retention')
seven_days_ago = midnight_n_days_ago(7)
deleted += _delete_inbound_sms(seven_days_ago, query_filter=[
InboundSms.service_id.notin_(x.service_id for x in flexible_data_retention),
])
current_app.logger.info('Deleted {} inbound sms'.format(deleted))
return deleted
def insert_update_inbound_sms_history(date_to_delete_from, service_id, query_limit=10000):
offset = 0 offset = 0
inbound_sms_query = db.session.query( inbound_sms_query = db.session.query(
*[x.name for x in InboundSmsHistory.__table__.c] *[x.name for x in InboundSmsHistory.__table__.c]
).filter( ).filter(InboundSms.id.in_(subquery))
InboundSms.service_id == service_id,
InboundSms.created_at < date_to_delete_from,
)
inbound_sms_count = inbound_sms_query.count() inbound_sms_count = inbound_sms_query.count()
while offset < inbound_sms_count: while offset < inbound_sms_count:
@@ -174,6 +92,63 @@ def insert_update_inbound_sms_history(date_to_delete_from, service_id, query_lim
offset += query_limit offset += query_limit
def _delete_inbound_sms(datetime_to_delete_from, query_filter):
query_limit = 10000
subquery = db.session.query(
InboundSms.id
).filter(
InboundSms.created_at < datetime_to_delete_from,
*query_filter
).limit(
query_limit
).subquery()
deleted = 0
# set to nonzero just to enter the loop
number_deleted = 1
while number_deleted > 0:
_insert_update_notification_history(subquery, query_limit=query_limit)
number_deleted = InboundSms.query.filter(InboundSms.id.in_(subquery)).delete(synchronize_session='fetch')
deleted += number_deleted
return deleted
@statsd(namespace="dao")
@transactional
def delete_inbound_sms_older_than_retention():
current_app.logger.info('Deleting inbound sms for services with flexible data retention')
flexible_data_retention = ServiceDataRetention.query.join(
ServiceDataRetention.service,
Service.inbound_number
).filter(
ServiceDataRetention.notification_type == SMS_TYPE
).all()
deleted = 0
for f in flexible_data_retention:
n_days_ago = midnight_n_days_ago(f.days_of_retention)
current_app.logger.info("Deleting inbound sms for service id: {}".format(f.service_id))
deleted += _delete_inbound_sms(n_days_ago, query_filter=[InboundSms.service_id == f.service_id])
current_app.logger.info('Deleting inbound sms for services without flexible data retention')
seven_days_ago = midnight_n_days_ago(7)
deleted += _delete_inbound_sms(seven_days_ago, query_filter=[
InboundSms.service_id.notin_(x.service_id for x in flexible_data_retention),
])
current_app.logger.info('Deleted {} inbound sms'.format(deleted))
return deleted
def dao_get_inbound_sms_by_id(service_id, inbound_id): def dao_get_inbound_sms_by_id(service_id, inbound_id):
return InboundSms.query.filter_by( return InboundSms.query.filter_by(
id=inbound_id, id=inbound_id,

View File

@@ -10,14 +10,10 @@ from app.dao.inbound_sms_dao import (
dao_get_inbound_sms_by_id, dao_get_inbound_sms_by_id,
dao_get_paginated_inbound_sms_for_service_for_public_api, dao_get_paginated_inbound_sms_for_service_for_public_api,
dao_get_paginated_most_recent_inbound_sms_by_user_number_for_service, dao_get_paginated_most_recent_inbound_sms_by_user_number_for_service,
db,
insert_update_inbound_sms_history
) )
from app.models import InboundSmsHistory from app.models import InboundSmsHistory
from app.utils import midnight_n_days_ago
from tests.conftest import set_config from tests.conftest import set_config
from tests.app.db import create_inbound_sms, create_service, create_service_data_retention from tests.app.db import create_inbound_sms, create_service, create_service_data_retention
@@ -281,65 +277,3 @@ def test_most_recent_inbound_sms_only_returns_values_within_7_days(sample_servic
assert len(res.items) == 1 assert len(res.items) == 1
assert res.items[0].content == 'new' assert res.items[0].content == 'new'
def test_insert_update_inbound_sms_history_limits_by_query_by_date(sample_service):
# becoming history
inbound_1 = create_inbound_sms(
sample_service, user_number='1', content='old', created_at=datetime(2017, 4, 2, 22, 59, 59)
)
# not becoming history yet
create_inbound_sms(sample_service, user_number='2', content='new', created_at=datetime(2017, 4, 2, 23, 0, 0))
with freeze_time('Monday 10th April 2017 12:00:00'):
insert_update_inbound_sms_history(
date_to_delete_from=midnight_n_days_ago(7),
service_id=sample_service.id)
history = InboundSmsHistory.query.all()
assert len(history) == 1
assert history[0].id == inbound_1.id
def test_insert_update_inbound_sms_history_above_query_limit(sample_service, mocker):
# becoming history
create_inbound_sms(
sample_service, user_number='1', content='old', created_at=datetime(2017, 4, 2, 22, 59, 59)
)
create_inbound_sms(
sample_service, user_number='1', content='old', created_at=datetime(2017, 4, 2, 20, 59, 59)
)
# not becoming history yet
create_inbound_sms(sample_service, user_number='2', content='new', created_at=datetime(2017, 4, 2, 23, 0, 0))
db_connection_spy = mocker.spy(db.session, 'connection')
db_commit_spy = mocker.spy(db.session, 'commit')
with freeze_time('Monday 10th April 2017 12:00:00'):
insert_update_inbound_sms_history(
date_to_delete_from=midnight_n_days_ago(7),
service_id=sample_service.id,
query_limit=1
)
history = InboundSmsHistory.query.all()
assert len(history) == 2
assert db_connection_spy.call_count == 2
assert db_commit_spy.call_count == 2
def test_insert_update_inbound_sms_history_only_updates_given_service(sample_service):
inbound_1 = create_inbound_sms(
sample_service, user_number='1', content='old', created_at=datetime(2017, 4, 2, 22, 59, 59)
)
other_service = create_service(service_name='another service')
create_inbound_sms(other_service, user_number='2', content='new', created_at=datetime(2017, 4, 2, 22, 0, 0))
with freeze_time('Monday 10th April 2017 12:00:00'):
insert_update_inbound_sms_history(
date_to_delete_from=midnight_n_days_ago(7),
service_id=sample_service.id)
history = InboundSmsHistory.query.all()
assert len(history) == 1
assert history[0].id == inbound_1.id