Files
notifications-api/app/dao/inbound_sms_dao.py

221 lines
6.4 KiB
Python
Raw Normal View History

from flask import current_app
2024-10-18 08:12:22 -07:00
from sqlalchemy import and_, delete, desc, func, select
from sqlalchemy.dialects.postgresql import insert
2024-10-17 12:27:17 -07:00
from sqlalchemy.orm import aliased
2017-05-22 11:26:47 +01:00
from app import db
from app.dao.dao_utils import autocommit
from app.enums import NotificationType
2024-04-24 16:27:20 -04:00
from app.models import InboundSms, InboundSmsHistory, ServiceDataRetention
from app.utils import midnight_n_days_ago
2017-05-22 11:26:47 +01:00
@autocommit
2017-05-22 11:26:47 +01:00
def dao_create_inbound_sms(inbound_sms):
db.session.add(inbound_sms)
2023-08-29 14:54:30 -07:00
def dao_get_inbound_sms_for_service(
service_id, user_number=None, *, limit_days=None, limit=None
):
2024-10-17 12:24:23 -07:00
q = (
select(InboundSms)
.filter(InboundSms.service_id == service_id)
.order_by(InboundSms.created_at.desc())
)
if limit_days is not None:
start_date = midnight_n_days_ago(limit_days)
q = q.filter(InboundSms.created_at >= start_date)
if user_number:
q = q.filter(InboundSms.user_number == user_number)
if limit:
q = q.limit(limit)
2024-10-17 12:24:23 -07:00
return db.session.execute(q).scalars().all()
2018-03-22 12:41:17 +00:00
def dao_get_paginated_inbound_sms_for_service_for_public_api(
2023-08-29 14:54:30 -07:00
service_id, older_than=None, page_size=None
):
if page_size is None:
2023-08-29 14:54:30 -07:00
page_size = current_app.config["PAGE_SIZE"]
filters = [InboundSms.service_id == service_id]
if older_than:
2023-08-29 14:54:30 -07:00
older_than_created_at = (
db.session.query(InboundSms.created_at)
.filter(InboundSms.id == older_than)
.as_scalar()
)
filters.append(InboundSms.created_at < older_than_created_at)
2024-10-18 08:45:31 -07:00
# As part of the move to sqlalchemy 2.0, we do this manual pagination
# 1.4 had a paginate() method which assumed 'page' was 1 if it was not specified,
# so we set page to 1 here to mimic that.
page = 1
query = db.session.query(InboundSms).filter(*filters)
paginated_items = (
query.order_by(desc(InboundSms.created_at))
.offset((page - 1) * page_size)
.limit(page_size)
2023-08-29 14:54:30 -07:00
)
2024-10-18 08:45:31 -07:00
return paginated_items
def dao_count_inbound_sms_for_service(service_id, limit_days):
2024-10-17 12:58:35 -07:00
stmt = (
select(func.count())
.select_from(InboundSms)
.filter(
InboundSms.service_id == service_id,
InboundSms.created_at >= midnight_n_days_ago(limit_days),
)
2024-10-17 12:24:23 -07:00
)
2024-10-17 12:44:28 -07:00
result = db.session.execute(stmt).scalar()
2024-10-17 12:58:35 -07:00
return result
def _insert_inbound_sms_history(subquery, query_limit=10000):
offset = 0
inbound_sms_query = db.session.query(
InboundSms.id,
InboundSms.created_at,
InboundSms.service_id,
InboundSms.notify_number,
InboundSms.provider_date,
InboundSms.provider_reference,
2023-08-29 14:54:30 -07:00
InboundSms.provider,
).filter(InboundSms.id.in_(subquery))
inbound_sms_count = inbound_sms_query.count()
while offset < inbound_sms_count:
statement = insert(InboundSmsHistory).from_select(
InboundSmsHistory.__table__.c,
2023-08-29 14:54:30 -07:00
inbound_sms_query.limit(query_limit).offset(offset),
)
statement = statement.on_conflict_do_nothing(
constraint="inbound_sms_history_pkey"
)
db.session.connection().execute(statement)
offset += query_limit
def _delete_inbound_sms(datetime_to_delete_from, query_filter):
query_limit = 10000
2023-08-29 14:54:30 -07:00
subquery = (
2024-10-18 08:12:22 -07:00
select(InboundSms.id)
2023-08-29 14:54:30 -07:00
.filter(InboundSms.created_at < datetime_to_delete_from, *query_filter)
.limit(query_limit)
.subquery()
)
deleted = 0
# set to nonzero just to enter the loop
number_deleted = 1
while number_deleted > 0:
_insert_inbound_sms_history(subquery, query_limit=query_limit)
2024-10-18 08:12:22 -07:00
stmt = delete(InboundSms).filter(InboundSms.id.in_(subquery))
number_deleted = db.session.execute(stmt).rowcount
db.session.commit()
deleted += number_deleted
return deleted
@autocommit
def delete_inbound_sms_older_than_retention():
2023-08-29 14:54:30 -07:00
current_app.logger.info(
"Deleting inbound sms for services with flexible data retention"
)
stmt = (
select(ServiceDataRetention)
.join(ServiceDataRetention.service)
.filter(ServiceDataRetention.notification_type == NotificationType.SMS)
2023-08-29 14:54:30 -07:00
)
flexible_data_retention = db.session.execute(stmt).scalars().all()
deleted = 0
for f in flexible_data_retention:
n_days_ago = midnight_n_days_ago(f.days_of_retention)
2023-08-29 14:54:30 -07:00
current_app.logger.info(
"Deleting inbound sms for service id: {}".format(f.service_id)
)
deleted += _delete_inbound_sms(
n_days_ago, query_filter=[InboundSms.service_id == f.service_id]
)
2023-08-29 14:54:30 -07:00
current_app.logger.info(
"Deleting inbound sms for services without flexible data retention"
)
seven_days_ago = midnight_n_days_ago(7)
2023-08-29 14:54:30 -07:00
deleted += _delete_inbound_sms(
seven_days_ago,
query_filter=[
InboundSms.service_id.notin_(x.service_id for x in flexible_data_retention),
],
)
2017-06-06 17:11:59 +01:00
2023-08-29 14:54:30 -07:00
current_app.logger.info("Deleted {} inbound sms".format(deleted))
2017-06-06 17:11:59 +01:00
return deleted
def dao_get_inbound_sms_by_id(service_id, inbound_id):
stmt = select(InboundSms).filter_by(id=inbound_id, service_id=service_id)
return db.session.execute(stmt).scalars().one()
2018-04-04 16:59:48 +01:00
def dao_get_paginated_most_recent_inbound_sms_by_user_number_for_service(
2023-08-29 14:54:30 -07:00
service_id, page, limit_days
2018-04-04 16:59:48 +01:00
):
"""
This query starts from inbound_sms and joins on to itself to find the most recent row for each user_number.
2018-04-04 16:59:48 +01:00
Equivalent sql:
SELECT t1.*
FROM inbound_sms t1
LEFT OUTER JOIN inbound_sms AS t2 ON (
-- identifying
t1.user_number = t2.user_number AND
t1.service_id = t2.service_id AND
-- ordering
t1.created_at < t2.created_at
)
WHERE t2.id IS NULL AND t1.service_id = :service_id
ORDER BY t1.created_at DESC;
LIMIT 50 OFFSET :page
"""
t2 = aliased(InboundSms)
2024-10-17 14:28:22 -07:00
q = (
db.session.query(InboundSms)
2023-08-29 14:54:30 -07:00
.outerjoin(
t2,
and_(
InboundSms.user_number == t2.user_number,
InboundSms.service_id == t2.service_id,
InboundSms.created_at < t2.created_at,
),
2018-04-04 16:59:48 +01:00
)
2023-08-29 14:54:30 -07:00
.filter(
t2.id == None, # noqa
InboundSms.service_id == service_id,
InboundSms.created_at >= midnight_n_days_ago(limit_days),
)
.order_by(InboundSms.created_at.desc())
2018-04-04 16:59:48 +01:00
)
2024-10-17 14:28:22 -07:00
return q.paginate(page=page, per_page=current_app.config["PAGE_SIZE"])