mirror of
https://github.com/GSA/notifications-api.git
synced 2026-02-02 09:26:08 -05:00
Merge pull request #3425 from alphagov/parallelise-ft-status-180693991
Parallelise status aggregation by service and day
This commit is contained in:
@@ -3,7 +3,7 @@ from datetime import datetime, timedelta
|
||||
from flask import current_app
|
||||
from notifications_utils.timezones import convert_utc_to_bst
|
||||
|
||||
from app import notify_celery
|
||||
from app import db, notify_celery
|
||||
from app.config import QueueNames
|
||||
from app.cronitor import cronitor
|
||||
from app.dao.fact_billing_dao import (
|
||||
@@ -11,10 +11,10 @@ from app.dao.fact_billing_dao import (
|
||||
update_fact_billing,
|
||||
)
|
||||
from app.dao.fact_notification_status_dao import (
|
||||
fetch_notification_status_for_day,
|
||||
fetch_status_data_for_service_and_day,
|
||||
update_fact_notification_status,
|
||||
)
|
||||
from app.models import EMAIL_TYPE, LETTER_TYPE, SMS_TYPE
|
||||
from app.models import EMAIL_TYPE, LETTER_TYPE, SMS_TYPE, Service
|
||||
|
||||
|
||||
@notify_celery.task(name="create-nightly-billing")
|
||||
@@ -91,50 +91,58 @@ def create_nightly_notification_status():
|
||||
|
||||
yesterday = convert_utc_to_bst(datetime.utcnow()).date() - timedelta(days=1)
|
||||
|
||||
# email and sms
|
||||
for i in range(4):
|
||||
process_day = yesterday - timedelta(days=i)
|
||||
for notification_type in [SMS_TYPE, EMAIL_TYPE]:
|
||||
create_nightly_notification_status_for_day.apply_async(
|
||||
kwargs={'process_day': process_day.isoformat(), 'notification_type': notification_type},
|
||||
queue=QueueNames.REPORTING
|
||||
)
|
||||
current_app.logger.info(
|
||||
f"create-nightly-notification-status task: create-nightly-notification-status-for-day task created "
|
||||
f"for type {notification_type} for {process_day}"
|
||||
)
|
||||
for (service_id,) in db.session.query(Service.id):
|
||||
for notification_type in [SMS_TYPE, EMAIL_TYPE, LETTER_TYPE]:
|
||||
days = 10 if notification_type == LETTER_TYPE else 4
|
||||
|
||||
# letters
|
||||
for i in range(10):
|
||||
process_day = yesterday - timedelta(days=i)
|
||||
create_nightly_notification_status_for_day.apply_async(
|
||||
kwargs={'process_day': process_day.isoformat(), 'notification_type': LETTER_TYPE},
|
||||
queue=QueueNames.REPORTING
|
||||
)
|
||||
current_app.logger.info(
|
||||
f"create-nightly-notification-status task: create-nightly-notification-status-for-day task created "
|
||||
f"for type letter for {process_day}"
|
||||
)
|
||||
for i in range(days):
|
||||
process_day = yesterday - timedelta(days=i)
|
||||
|
||||
create_nightly_notification_status_for_service_and_day.apply_async(
|
||||
kwargs={
|
||||
'process_day': process_day.isoformat(),
|
||||
'notification_type': notification_type,
|
||||
'service_id': service_id,
|
||||
},
|
||||
queue=QueueNames.REPORTING
|
||||
)
|
||||
current_app.logger.info(
|
||||
f"create-nightly-notification-status-for-day task created "
|
||||
f"for {service_id}, {notification_type} and {process_day}"
|
||||
)
|
||||
|
||||
|
||||
@notify_celery.task(name="create-nightly-notification-status-for-day")
|
||||
def create_nightly_notification_status_for_day(process_day, notification_type):
|
||||
@notify_celery.task(name="create-nightly-notification-status-for-service-and-day")
|
||||
def create_nightly_notification_status_for_service_and_day(process_day, service_id, notification_type):
|
||||
process_day = datetime.strptime(process_day, "%Y-%m-%d").date()
|
||||
current_app.logger.info(
|
||||
f'create-nightly-notification-status-for-day task for {process_day} type {notification_type}: started'
|
||||
f'create-nightly-notification-status-for-day task started '
|
||||
f'for {service_id}, {notification_type} for {process_day}'
|
||||
)
|
||||
|
||||
start = datetime.utcnow()
|
||||
transit_data = fetch_notification_status_for_day(process_day=process_day, notification_type=notification_type)
|
||||
new_status_rows = fetch_status_data_for_service_and_day(
|
||||
process_day=process_day,
|
||||
notification_type=notification_type,
|
||||
service_id=service_id,
|
||||
)
|
||||
|
||||
end = datetime.utcnow()
|
||||
current_app.logger.info(
|
||||
f'create-nightly-notification-status-for-day task for {process_day} type {notification_type}: '
|
||||
f'create-nightly-notification-status-for-day task fetch '
|
||||
f'for {service_id}, {notification_type} for {process_day}: '
|
||||
f'data fetched in {(end - start).seconds} seconds'
|
||||
)
|
||||
|
||||
update_fact_notification_status(transit_data, process_day, notification_type)
|
||||
update_fact_notification_status(
|
||||
new_status_rows=new_status_rows,
|
||||
process_day=process_day,
|
||||
notification_type=notification_type,
|
||||
service_id=service_id
|
||||
)
|
||||
|
||||
current_app.logger.info(
|
||||
f'create-nightly-notification-status-for-day task for {process_day} type {notification_type}: '
|
||||
f'task complete - {len(transit_data)} rows updated'
|
||||
f'create-nightly-notification-status-for-day task finished '
|
||||
f'for {service_id}, {notification_type} for {process_day}: '
|
||||
f'{len(new_status_rows)} rows updated'
|
||||
)
|
||||
|
||||
109
app/commands.py
109
app/commands.py
@@ -23,9 +23,6 @@ from app.celery.letters_pdf_tasks import (
|
||||
get_pdf_for_templated_letter,
|
||||
resanitise_pdf,
|
||||
)
|
||||
from app.celery.reporting_tasks import (
|
||||
create_nightly_notification_status_for_day,
|
||||
)
|
||||
from app.celery.tasks import process_row, record_daily_sorted_counts
|
||||
from app.config import QueueNames
|
||||
from app.dao.annual_billing_dao import (
|
||||
@@ -62,9 +59,7 @@ from app.dao.users_dao import (
|
||||
get_user_by_email,
|
||||
)
|
||||
from app.models import (
|
||||
EMAIL_TYPE,
|
||||
KEY_TYPE_TEST,
|
||||
LETTER_TYPE,
|
||||
NOTIFICATION_CREATED,
|
||||
PROVIDERS,
|
||||
SMS_TYPE,
|
||||
@@ -281,87 +276,6 @@ def setup_commands(application):
|
||||
application.cli.add_command(command_group)
|
||||
|
||||
|
||||
@notify_command(name='migrate-data-to-ft-billing')
|
||||
@click.option('-s', '--start_date', required=True, help="start date inclusive", type=click_dt(format='%Y-%m-%d'))
|
||||
@click.option('-e', '--end_date', required=True, help="end date inclusive", type=click_dt(format='%Y-%m-%d'))
|
||||
@statsd(namespace="tasks")
|
||||
def migrate_data_to_ft_billing(start_date, end_date):
|
||||
|
||||
current_app.logger.info('Billing migration from date {} to {}'.format(start_date, end_date))
|
||||
|
||||
process_date = start_date
|
||||
total_updated = 0
|
||||
|
||||
while process_date < end_date:
|
||||
start_time = datetime.utcnow()
|
||||
# migrate data into ft_billing, upserting the data if it the record already exists
|
||||
sql = \
|
||||
"""
|
||||
insert into ft_billing (bst_date, template_id, service_id, notification_type, provider, rate_multiplier,
|
||||
international, billable_units, notifications_sent, rate, postage, created_at)
|
||||
select bst_date, template_id, service_id, notification_type, provider, rate_multiplier, international,
|
||||
sum(billable_units) as billable_units, sum(notifications_sent) as notification_sent,
|
||||
case when notification_type = 'sms' then sms_rate else letter_rate end as rate, postage, created_at
|
||||
from (
|
||||
select
|
||||
n.id,
|
||||
(n.created_at at time zone 'UTC' at time zone 'Europe/London')::timestamp::date as bst_date,
|
||||
coalesce(n.template_id, '00000000-0000-0000-0000-000000000000') as template_id,
|
||||
coalesce(n.service_id, '00000000-0000-0000-0000-000000000000') as service_id,
|
||||
n.notification_type,
|
||||
coalesce(n.sent_by, (
|
||||
case
|
||||
when notification_type = 'sms' then
|
||||
coalesce(sent_by, 'unknown')
|
||||
when notification_type = 'letter' then
|
||||
coalesce(sent_by, 'dvla')
|
||||
else
|
||||
coalesce(sent_by, 'ses')
|
||||
end )) as provider,
|
||||
coalesce(n.rate_multiplier,1) as rate_multiplier,
|
||||
s.crown,
|
||||
coalesce((select rates.rate from rates
|
||||
where n.notification_type = rates.notification_type and n.created_at > rates.valid_from
|
||||
order by rates.valid_from desc limit 1), 0) as sms_rate,
|
||||
coalesce((select l.rate from letter_rates l where n.billable_units = l.sheet_count
|
||||
and s.crown = l.crown and n.postage = l.post_class and n.created_at >= l.start_date
|
||||
and n.created_at < coalesce(l.end_date, now()) and n.notification_type='letter'), 0)
|
||||
as letter_rate,
|
||||
coalesce(n.international, false) as international,
|
||||
n.billable_units,
|
||||
1 as notifications_sent,
|
||||
coalesce(n.postage, 'none') as postage,
|
||||
now() as created_at
|
||||
from public.notification_history n
|
||||
left join services s on s.id = n.service_id
|
||||
where n.key_type!='test'
|
||||
and n.notification_status in
|
||||
('sending', 'sent', 'delivered', 'temporary-failure', 'permanent-failure', 'failed')
|
||||
and n.created_at >= (date :start + time '00:00:00') at time zone 'Europe/London'
|
||||
at time zone 'UTC'
|
||||
and n.created_at < (date :end + time '00:00:00') at time zone 'Europe/London' at time zone 'UTC'
|
||||
) as individual_record
|
||||
group by bst_date, template_id, service_id, notification_type, provider, rate_multiplier, international,
|
||||
sms_rate, letter_rate, postage, created_at
|
||||
order by bst_date
|
||||
on conflict on constraint ft_billing_pkey do update set
|
||||
billable_units = excluded.billable_units,
|
||||
notifications_sent = excluded.notifications_sent,
|
||||
rate = excluded.rate,
|
||||
updated_at = now()
|
||||
"""
|
||||
|
||||
result = db.session.execute(sql, {"start": process_date, "end": process_date + timedelta(days=1)})
|
||||
db.session.commit()
|
||||
current_app.logger.info('ft_billing: --- Completed took {}ms. Migrated {} rows for {}'.format(
|
||||
datetime.now() - start_time, result.rowcount, process_date))
|
||||
|
||||
process_date += timedelta(days=1)
|
||||
|
||||
total_updated += result.rowcount
|
||||
current_app.logger.info('Total inserted/updated records = {}'.format(total_updated))
|
||||
|
||||
|
||||
@notify_command(name='rebuild-ft-billing-for-day')
|
||||
@click.option('-s', '--service_id', required=False, type=click.UUID)
|
||||
@click.option('-d', '--day', help="The date to recalculate, as YYYY-MM-DD", required=True,
|
||||
@@ -401,29 +315,6 @@ def rebuild_ft_billing_for_day(service_id, day):
|
||||
rebuild_ft_data(day, row.service_id)
|
||||
|
||||
|
||||
@notify_command(name='migrate-data-to-ft-notification-status')
|
||||
@click.option('-s', '--start_date', required=True, help="start date inclusive", type=click_dt(format='%Y-%m-%d'))
|
||||
@click.option('-e', '--end_date', required=True, help="end date inclusive", type=click_dt(format='%Y-%m-%d'))
|
||||
@click.option('-t', '--notification-type', required=False, help="notification type (or leave blank for all types)")
|
||||
@statsd(namespace="tasks")
|
||||
def migrate_data_to_ft_notification_status(start_date, end_date, notification_type=None):
|
||||
notification_types = [SMS_TYPE, LETTER_TYPE, EMAIL_TYPE] if notification_type is None else [notification_type]
|
||||
|
||||
start_date = start_date.date()
|
||||
end_date = end_date.date()
|
||||
for day_diff in range((end_date - start_date).days + 1):
|
||||
process_day = start_date + timedelta(days=day_diff)
|
||||
for notification_type in notification_types:
|
||||
print('create_nightly_notification_status_for_day triggered for {} and {}'.format(
|
||||
process_day,
|
||||
notification_type
|
||||
))
|
||||
create_nightly_notification_status_for_day.apply_async(
|
||||
kwargs={'process_day': process_day.strftime('%Y-%m-%d'), 'notification_type': notification_type},
|
||||
queue=QueueNames.REPORTING
|
||||
)
|
||||
|
||||
|
||||
@notify_command(name='bulk-invite-user-to-service')
|
||||
@click.option('-f', '--file_name', required=True,
|
||||
help="Full path of the file containing a list of email address for people to invite to a service")
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
from datetime import datetime, time, timedelta
|
||||
|
||||
from flask import current_app
|
||||
from notifications_utils.timezones import convert_bst_to_utc
|
||||
from sqlalchemy import Date, case, func
|
||||
from sqlalchemy.dialects.postgresql import insert
|
||||
@@ -36,35 +35,16 @@ from app.utils import (
|
||||
)
|
||||
|
||||
|
||||
def fetch_notification_status_for_day(process_day, notification_type):
|
||||
def fetch_status_data_for_service_and_day(process_day, service_id, notification_type):
|
||||
start_date = convert_bst_to_utc(datetime.combine(process_day, time.min))
|
||||
end_date = convert_bst_to_utc(datetime.combine(process_day + timedelta(days=1), time.min))
|
||||
|
||||
current_app.logger.info("Fetch ft_notification_status for {} to {}".format(start_date, end_date))
|
||||
# query notifications or notification_history for the day, depending on their data retention
|
||||
service = Service.query.get(service_id)
|
||||
table = get_notification_table_to_use(service, notification_type, process_day, has_delete_task_run=False)
|
||||
|
||||
all_data_for_process_day = []
|
||||
services = Service.query.all()
|
||||
# for each service query notifications or notification_history for the day, depending on their data retention
|
||||
for service in services:
|
||||
table = get_notification_table_to_use(service, notification_type, process_day, has_delete_task_run=False)
|
||||
|
||||
data_for_service_and_type = query_for_fact_status_data(
|
||||
table=table,
|
||||
start_date=start_date,
|
||||
end_date=end_date,
|
||||
notification_type=notification_type,
|
||||
service_id=service.id
|
||||
)
|
||||
|
||||
all_data_for_process_day += data_for_service_and_type
|
||||
|
||||
return all_data_for_process_day
|
||||
|
||||
|
||||
def query_for_fact_status_data(table, start_date, end_date, notification_type, service_id):
|
||||
query = db.session.query(
|
||||
return db.session.query(
|
||||
table.template_id,
|
||||
table.service_id,
|
||||
func.coalesce(table.job_id, '00000000-0000-0000-0000-000000000000').label('job_id'),
|
||||
table.key_type,
|
||||
table.status,
|
||||
@@ -77,34 +57,34 @@ def query_for_fact_status_data(table, start_date, end_date, notification_type, s
|
||||
table.key_type.in_((KEY_TYPE_NORMAL, KEY_TYPE_TEAM)),
|
||||
).group_by(
|
||||
table.template_id,
|
||||
table.service_id,
|
||||
'job_id',
|
||||
table.key_type,
|
||||
table.status
|
||||
)
|
||||
return query.all()
|
||||
).all()
|
||||
|
||||
|
||||
@autocommit
|
||||
def update_fact_notification_status(data, process_day, notification_type):
|
||||
def update_fact_notification_status(new_status_rows, process_day, notification_type, service_id):
|
||||
table = FactNotificationStatus.__table__
|
||||
FactNotificationStatus.query.filter(
|
||||
FactNotificationStatus.bst_date == process_day,
|
||||
FactNotificationStatus.notification_type == notification_type
|
||||
FactNotificationStatus.notification_type == notification_type,
|
||||
FactNotificationStatus.service_id == service_id,
|
||||
).delete()
|
||||
|
||||
for row in data:
|
||||
stmt = insert(table).values(
|
||||
bst_date=process_day,
|
||||
template_id=row.template_id,
|
||||
service_id=row.service_id,
|
||||
job_id=row.job_id,
|
||||
notification_type=notification_type,
|
||||
key_type=row.key_type,
|
||||
notification_status=row.status,
|
||||
notification_count=row.notification_count,
|
||||
for row in new_status_rows:
|
||||
db.session.connection().execute(
|
||||
insert(table).values(
|
||||
bst_date=process_day,
|
||||
template_id=row.template_id,
|
||||
service_id=service_id,
|
||||
job_id=row.job_id,
|
||||
notification_type=notification_type,
|
||||
key_type=row.key_type,
|
||||
notification_status=row.status,
|
||||
notification_count=row.notification_count,
|
||||
)
|
||||
)
|
||||
db.session.connection().execute(stmt)
|
||||
|
||||
|
||||
def fetch_notification_status_for_service_by_month(start_date, end_date, service_id):
|
||||
|
||||
Reference in New Issue
Block a user