2017-11-06 18:19:02 +00:00
|
|
|
from flask import current_app
|
2018-02-06 09:35:33 +00:00
|
|
|
from notifications_utils.statsd_decorators import statsd
|
2018-04-04 16:59:48 +01:00
|
|
|
from sqlalchemy import desc, and_
|
|
|
|
|
from sqlalchemy.orm import aliased
|
2019-12-17 18:04:22 +00:00
|
|
|
from sqlalchemy.dialects.postgresql import insert
|
2017-06-02 12:21:12 +01:00
|
|
|
|
2017-05-22 11:26:47 +01:00
|
|
|
from app import db
|
|
|
|
|
from app.dao.dao_utils import transactional
|
2019-12-17 18:04:22 +00:00
|
|
|
from app.models import InboundSms, InboundSmsHistory, Service, ServiceDataRetention, SMS_TYPE
|
2019-02-26 17:57:55 +00:00
|
|
|
from app.utils import midnight_n_days_ago
|
2017-05-22 11:26:47 +01:00
|
|
|
|
|
|
|
|
|
|
|
|
|
@transactional
|
|
|
|
|
def dao_create_inbound_sms(inbound_sms):
|
|
|
|
|
db.session.add(inbound_sms)
|
2017-05-31 14:49:14 +01:00
|
|
|
|
|
|
|
|
|
2019-03-28 15:36:12 +00:00
|
|
|
def dao_get_inbound_sms_for_service(service_id, user_number=None, *, limit_days=None, limit=None):
|
2017-05-31 14:49:14 +01:00
|
|
|
q = InboundSms.query.filter(
|
2019-02-27 17:17:35 +00:00
|
|
|
InboundSms.service_id == service_id
|
2017-05-31 14:49:14 +01:00
|
|
|
).order_by(
|
|
|
|
|
InboundSms.created_at.desc()
|
|
|
|
|
)
|
2019-02-28 13:59:28 +00:00
|
|
|
if limit_days is not None:
|
|
|
|
|
start_date = midnight_n_days_ago(limit_days)
|
2019-02-27 17:17:35 +00:00
|
|
|
q = q.filter(InboundSms.created_at >= start_date)
|
2017-05-31 14:49:14 +01:00
|
|
|
|
|
|
|
|
if user_number:
|
|
|
|
|
q = q.filter(InboundSms.user_number == user_number)
|
|
|
|
|
|
|
|
|
|
if limit:
|
|
|
|
|
q = q.limit(limit)
|
|
|
|
|
|
|
|
|
|
return q.all()
|
|
|
|
|
|
|
|
|
|
|
2018-03-22 12:41:17 +00:00
|
|
|
def dao_get_paginated_inbound_sms_for_service_for_public_api(
|
2017-11-06 18:19:02 +00:00
|
|
|
service_id,
|
|
|
|
|
older_than=None,
|
|
|
|
|
page_size=None
|
|
|
|
|
):
|
|
|
|
|
if page_size is None:
|
|
|
|
|
page_size = current_app.config['PAGE_SIZE']
|
|
|
|
|
|
|
|
|
|
filters = [InboundSms.service_id == service_id]
|
|
|
|
|
|
2017-11-07 11:55:51 +00:00
|
|
|
if older_than:
|
2017-11-06 18:19:02 +00:00
|
|
|
older_than_created_at = db.session.query(
|
|
|
|
|
InboundSms.created_at).filter(InboundSms.id == older_than).as_scalar()
|
|
|
|
|
filters.append(InboundSms.created_at < older_than_created_at)
|
|
|
|
|
|
|
|
|
|
query = InboundSms.query.filter(*filters)
|
|
|
|
|
|
|
|
|
|
return query.order_by(desc(InboundSms.created_at)).paginate(
|
|
|
|
|
per_page=page_size
|
|
|
|
|
).items
|
|
|
|
|
|
|
|
|
|
|
2019-03-27 17:44:53 +00:00
|
|
|
def dao_count_inbound_sms_for_service(service_id, limit_days):
|
2017-05-31 14:49:14 +01:00
|
|
|
return InboundSms.query.filter(
|
2018-04-30 16:58:26 +01:00
|
|
|
InboundSms.service_id == service_id,
|
2019-03-27 17:44:53 +00:00
|
|
|
InboundSms.created_at >= midnight_n_days_ago(limit_days)
|
2017-05-31 14:49:14 +01:00
|
|
|
).count()
|
2017-06-02 12:21:12 +01:00
|
|
|
|
|
|
|
|
|
2019-02-26 17:57:55 +00:00
|
|
|
def _delete_inbound_sms(datetime_to_delete_from, query_filter):
|
|
|
|
|
query_limit = 10000
|
|
|
|
|
|
|
|
|
|
subquery = db.session.query(
|
|
|
|
|
InboundSms.id
|
|
|
|
|
).filter(
|
|
|
|
|
InboundSms.created_at < datetime_to_delete_from,
|
|
|
|
|
*query_filter
|
|
|
|
|
).limit(
|
|
|
|
|
query_limit
|
|
|
|
|
).subquery()
|
|
|
|
|
|
|
|
|
|
deleted = 0
|
|
|
|
|
# set to nonzero just to enter the loop
|
|
|
|
|
number_deleted = 1
|
|
|
|
|
while number_deleted > 0:
|
2019-12-17 18:04:22 +00:00
|
|
|
|
|
|
|
|
offset = 0
|
|
|
|
|
inbound_sms_query = db.session.query(
|
|
|
|
|
*[x.name for x in InboundSmsHistory.__table__.c]
|
|
|
|
|
).filter(InboundSms.id.in_(subquery))
|
|
|
|
|
inbound_sms_count = inbound_sms_query.count()
|
|
|
|
|
|
|
|
|
|
while offset < inbound_sms_count:
|
|
|
|
|
statement = insert(InboundSmsHistory).from_select(
|
|
|
|
|
InboundSmsHistory.__table__.c,
|
|
|
|
|
inbound_sms_query.limit(query_limit).offset(offset)
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
statement = statement.on_conflict_do_update(
|
|
|
|
|
constraint="inbound_sms_history_pkey",
|
|
|
|
|
set_={
|
|
|
|
|
"created_at": statement.excluded.created_at,
|
|
|
|
|
"service_id": statement.excluded.service_id,
|
|
|
|
|
"notify_number": statement.excluded.notify_number,
|
|
|
|
|
"provider_date": statement.excluded.provider_date,
|
|
|
|
|
"provider_reference": statement.excluded.provider_reference,
|
|
|
|
|
"provider": statement.excluded.provider
|
|
|
|
|
}
|
|
|
|
|
)
|
|
|
|
|
db.session.connection().execute(statement)
|
|
|
|
|
|
|
|
|
|
offset += query_limit
|
2019-02-26 17:57:55 +00:00
|
|
|
number_deleted = InboundSms.query.filter(InboundSms.id.in_(subquery)).delete(synchronize_session='fetch')
|
|
|
|
|
deleted += number_deleted
|
|
|
|
|
|
|
|
|
|
return deleted
|
|
|
|
|
|
|
|
|
|
|
2017-06-02 12:21:12 +01:00
|
|
|
@statsd(namespace="dao")
|
|
|
|
|
@transactional
|
2019-02-26 17:57:55 +00:00
|
|
|
def delete_inbound_sms_older_than_retention():
|
|
|
|
|
current_app.logger.info('Deleting inbound sms for services with flexible data retention')
|
|
|
|
|
|
|
|
|
|
flexible_data_retention = ServiceDataRetention.query.join(
|
|
|
|
|
ServiceDataRetention.service,
|
|
|
|
|
Service.inbound_number
|
|
|
|
|
).filter(
|
|
|
|
|
ServiceDataRetention.notification_type == SMS_TYPE
|
|
|
|
|
).all()
|
|
|
|
|
deleted = 0
|
2019-12-17 18:04:22 +00:00
|
|
|
|
2019-02-26 17:57:55 +00:00
|
|
|
for f in flexible_data_retention:
|
|
|
|
|
n_days_ago = midnight_n_days_ago(f.days_of_retention)
|
|
|
|
|
|
|
|
|
|
current_app.logger.info("Deleting inbound sms for service id: {}".format(f.service_id))
|
|
|
|
|
deleted += _delete_inbound_sms(n_days_ago, query_filter=[InboundSms.service_id == f.service_id])
|
|
|
|
|
|
|
|
|
|
current_app.logger.info('Deleting inbound sms for services without flexible data retention')
|
|
|
|
|
|
|
|
|
|
seven_days_ago = midnight_n_days_ago(7)
|
|
|
|
|
|
|
|
|
|
deleted += _delete_inbound_sms(seven_days_ago, query_filter=[
|
|
|
|
|
InboundSms.service_id.notin_(x.service_id for x in flexible_data_retention),
|
|
|
|
|
])
|
|
|
|
|
current_app.logger.info('Deleted {} inbound sms'.format(deleted))
|
2017-06-02 12:21:12 +01:00
|
|
|
return deleted
|
2017-06-06 17:11:59 +01:00
|
|
|
|
|
|
|
|
|
2019-12-17 18:04:22 +00:00
|
|
|
def insert_update_inbound_sms_history(date_to_delete_from, service_id, query_limit=10000):
|
|
|
|
|
offset = 0
|
|
|
|
|
inbound_sms_query = db.session.query(
|
|
|
|
|
*[x.name for x in InboundSmsHistory.__table__.c]
|
|
|
|
|
).filter(
|
|
|
|
|
InboundSms.service_id == service_id,
|
|
|
|
|
InboundSms.created_at < date_to_delete_from,
|
|
|
|
|
)
|
|
|
|
|
inbound_sms_count = inbound_sms_query.count()
|
|
|
|
|
|
|
|
|
|
while offset < inbound_sms_count:
|
|
|
|
|
statement = insert(InboundSmsHistory).from_select(
|
|
|
|
|
InboundSmsHistory.__table__.c,
|
|
|
|
|
inbound_sms_query.limit(query_limit).offset(offset)
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
statement = statement.on_conflict_do_update(
|
|
|
|
|
constraint="inbound_sms_history_pkey",
|
|
|
|
|
set_={
|
|
|
|
|
"created_at": statement.excluded.created_at,
|
|
|
|
|
"service_id": statement.excluded.service_id,
|
|
|
|
|
"notify_number": statement.excluded.notify_number,
|
|
|
|
|
"provider_date": statement.excluded.provider_date,
|
|
|
|
|
"provider_reference": statement.excluded.provider_reference,
|
|
|
|
|
"provider": statement.excluded.provider
|
|
|
|
|
}
|
|
|
|
|
)
|
|
|
|
|
db.session.connection().execute(statement)
|
|
|
|
|
|
|
|
|
|
offset += query_limit
|
|
|
|
|
|
|
|
|
|
|
2017-06-07 14:23:31 +01:00
|
|
|
def dao_get_inbound_sms_by_id(service_id, inbound_id):
|
|
|
|
|
return InboundSms.query.filter_by(
|
|
|
|
|
id=inbound_id,
|
|
|
|
|
service_id=service_id
|
|
|
|
|
).one()
|
2018-04-04 16:59:48 +01:00
|
|
|
|
|
|
|
|
|
|
|
|
|
def dao_get_paginated_most_recent_inbound_sms_by_user_number_for_service(
|
|
|
|
|
service_id,
|
2019-03-27 17:44:53 +00:00
|
|
|
page,
|
|
|
|
|
limit_days
|
2018-04-04 16:59:48 +01:00
|
|
|
):
|
|
|
|
|
"""
|
2019-03-27 17:44:53 +00:00
|
|
|
This query starts from inbound_sms and joins on to itself to find the most recent row for each user_number.
|
2018-04-04 16:59:48 +01:00
|
|
|
|
|
|
|
|
Equivalent sql:
|
|
|
|
|
|
|
|
|
|
SELECT t1.*
|
|
|
|
|
FROM inbound_sms t1
|
|
|
|
|
LEFT OUTER JOIN inbound_sms AS t2 ON (
|
|
|
|
|
-- identifying
|
|
|
|
|
t1.user_number = t2.user_number AND
|
|
|
|
|
t1.service_id = t2.service_id AND
|
|
|
|
|
-- ordering
|
|
|
|
|
t1.created_at < t2.created_at
|
|
|
|
|
)
|
|
|
|
|
WHERE t2.id IS NULL AND t1.service_id = :service_id
|
|
|
|
|
ORDER BY t1.created_at DESC;
|
|
|
|
|
LIMIT 50 OFFSET :page
|
|
|
|
|
"""
|
|
|
|
|
t2 = aliased(InboundSms)
|
|
|
|
|
q = db.session.query(
|
|
|
|
|
InboundSms
|
|
|
|
|
).outerjoin(
|
|
|
|
|
t2,
|
|
|
|
|
and_(
|
|
|
|
|
InboundSms.user_number == t2.user_number,
|
|
|
|
|
InboundSms.service_id == t2.service_id,
|
2018-04-30 16:58:26 +01:00
|
|
|
InboundSms.created_at < t2.created_at,
|
2018-04-04 16:59:48 +01:00
|
|
|
)
|
|
|
|
|
).filter(
|
|
|
|
|
t2.id == None, # noqa
|
2018-04-30 16:58:26 +01:00
|
|
|
InboundSms.service_id == service_id,
|
2019-03-27 17:44:53 +00:00
|
|
|
InboundSms.created_at >= midnight_n_days_ago(limit_days)
|
2018-04-04 16:59:48 +01:00
|
|
|
).order_by(
|
|
|
|
|
InboundSms.created_at.desc()
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
return q.paginate(
|
|
|
|
|
page=page,
|
|
|
|
|
per_page=current_app.config['PAGE_SIZE']
|
|
|
|
|
)
|