mirror of
https://github.com/GSA/notifications-api.git
synced 2026-02-04 10:21:14 -05:00
Merge pull request #2669 from alphagov/ft-status-letter-fix
split create_nightly_notification_status by notification type
This commit is contained in:
@@ -12,6 +12,11 @@ from app.dao.fact_billing_dao import (
|
||||
update_fact_billing
|
||||
)
|
||||
from app.dao.fact_notification_status_dao import fetch_notification_status_for_day, update_fact_notification_status
|
||||
from app.models import (
|
||||
SMS_TYPE,
|
||||
EMAIL_TYPE,
|
||||
LETTER_TYPE,
|
||||
)
|
||||
|
||||
|
||||
@notify_celery.task(name="create-nightly-billing")
|
||||
@@ -72,30 +77,31 @@ def create_nightly_notification_status(day_start=None):
|
||||
day_start = datetime.strptime(day_start, "%Y-%m-%d").date()
|
||||
for i in range(0, 4):
|
||||
process_day = day_start - timedelta(days=i)
|
||||
|
||||
create_nightly_notification_status_for_day.apply_async(
|
||||
kwargs={'process_day': process_day.isoformat()},
|
||||
queue=QueueNames.REPORTING
|
||||
)
|
||||
for notification_type in [SMS_TYPE, EMAIL_TYPE, LETTER_TYPE]:
|
||||
create_nightly_notification_status_for_day.apply_async(
|
||||
kwargs={'process_day': process_day.isoformat(), 'notification_type': notification_type},
|
||||
queue=QueueNames.REPORTING
|
||||
)
|
||||
|
||||
|
||||
@notify_celery.task(name="create-nightly-notification-status-for-day")
|
||||
@statsd(namespace="tasks")
|
||||
def create_nightly_notification_status_for_day(process_day):
|
||||
def create_nightly_notification_status_for_day(process_day, notification_type):
|
||||
process_day = datetime.strptime(process_day, "%Y-%m-%d").date()
|
||||
|
||||
start = datetime.utcnow()
|
||||
transit_data = fetch_notification_status_for_day(process_day=process_day)
|
||||
transit_data = fetch_notification_status_for_day(process_day=process_day, notification_type=notification_type)
|
||||
end = datetime.utcnow()
|
||||
current_app.logger.info('create-nightly-notification-status-for-day {} fetched in {} seconds'.format(
|
||||
current_app.logger.info('create-nightly-notification-status-for-day {} type {} fetched in {} seconds'.format(
|
||||
process_day,
|
||||
notification_type,
|
||||
(end - start).seconds)
|
||||
)
|
||||
|
||||
update_fact_notification_status(transit_data, process_day)
|
||||
update_fact_notification_status(transit_data, process_day, notification_type)
|
||||
|
||||
current_app.logger.info(
|
||||
"create-nightly-notification-status-for-day task complete: {} rows updated for day: {}".format(
|
||||
len(transit_data), process_day
|
||||
"create-nightly-notification-status-for-day task complete: {} rows updated for type {} for day: {}".format(
|
||||
len(transit_data), process_day, notification_type
|
||||
)
|
||||
)
|
||||
|
||||
@@ -21,6 +21,7 @@ from app.celery.tasks import record_daily_sorted_counts, get_template_class, pro
|
||||
from app.celery.nightly_tasks import send_total_sent_notifications_to_performance_platform
|
||||
from app.celery.service_callback_tasks import send_delivery_status_to_service
|
||||
from app.celery.letters_pdf_tasks import create_letters_pdf
|
||||
from app.celery.reporting_tasks import create_nightly_notification_status_for_day
|
||||
from app.config import QueueNames
|
||||
from app.dao.annual_billing_dao import dao_create_or_update_annual_billing_for_year
|
||||
from app.dao.fact_billing_dao import (
|
||||
@@ -43,11 +44,19 @@ from app.dao.services_dao import (
|
||||
from app.dao.templates_dao import dao_get_template_by_id
|
||||
from app.dao.users_dao import delete_model_user, delete_user_verify_codes, get_user_by_email
|
||||
from app.models import (
|
||||
PROVIDERS, User, Notification, Organisation, Domain, Service, SMS_TYPE,
|
||||
PROVIDERS,
|
||||
NOTIFICATION_CREATED,
|
||||
KEY_TYPE_TEST,
|
||||
SMS_TYPE,
|
||||
EMAIL_TYPE,
|
||||
LETTER_TYPE,
|
||||
User,
|
||||
Notification,
|
||||
Organisation,
|
||||
Domain,
|
||||
Service,
|
||||
EmailBranding,
|
||||
LetterBranding
|
||||
LetterBranding,
|
||||
)
|
||||
from app.performance_platform.processing_time import send_processing_time_for_start_and_end
|
||||
from app.utils import get_london_midnight_in_utc, get_midnight_for_day_before
|
||||
@@ -489,54 +498,23 @@ def rebuild_ft_billing_for_day(service_id, day):
|
||||
@notify_command(name='migrate-data-to-ft-notification-status')
|
||||
@click.option('-s', '--start_date', required=True, help="start date inclusive", type=click_dt(format='%Y-%m-%d'))
|
||||
@click.option('-e', '--end_date', required=True, help="end date inclusive", type=click_dt(format='%Y-%m-%d'))
|
||||
@click.option('-t', '--notification-type', required=False, help="notification type (or leave blank for all types)")
|
||||
@statsd(namespace="tasks")
|
||||
def migrate_data_to_ft_notification_status(start_date, end_date):
|
||||
def migrate_data_to_ft_notification_status(start_date, end_date, notification_type=None):
|
||||
notification_types = [SMS_TYPE, LETTER_TYPE, EMAIL_TYPE] if notification_type is None else [notification_type]
|
||||
|
||||
print('Notification statuses migration from date {} to {}'.format(start_date, end_date))
|
||||
|
||||
process_date = start_date
|
||||
total_updated = 0
|
||||
|
||||
while process_date < end_date:
|
||||
start_time = datetime.now()
|
||||
# migrate data into ft_notification_status and update if record already exists
|
||||
|
||||
db.session.execute(
|
||||
'delete from ft_notification_status where bst_date = :process_date',
|
||||
{"process_date": process_date}
|
||||
)
|
||||
|
||||
sql = \
|
||||
"""
|
||||
insert into ft_notification_status (bst_date, template_id, service_id, job_id, notification_type, key_type,
|
||||
notification_status, created_at, notification_count)
|
||||
select
|
||||
(n.created_at at time zone 'UTC' at time zone 'Europe/London')::timestamp::date as bst_date,
|
||||
coalesce(n.template_id, '00000000-0000-0000-0000-000000000000') as template_id,
|
||||
n.service_id,
|
||||
coalesce(n.job_id, '00000000-0000-0000-0000-000000000000') as job_id,
|
||||
n.notification_type,
|
||||
n.key_type,
|
||||
n.notification_status,
|
||||
now() as created_at,
|
||||
count(*) as notification_count
|
||||
from notification_history n
|
||||
where n.created_at >= (date :start + time '00:00:00') at time zone 'Europe/London' at time zone 'UTC'
|
||||
and n.created_at < (date :end + time '00:00:00') at time zone 'Europe/London' at time zone 'UTC'
|
||||
group by bst_date, template_id, service_id, job_id, notification_type, key_type, notification_status
|
||||
order by bst_date
|
||||
"""
|
||||
result = db.session.execute(sql, {"start": process_date, "end": process_date + timedelta(days=1)})
|
||||
db.session.commit()
|
||||
print('ft_notification_status: --- Completed took {}ms. Migrated {} rows for {}.'.format(
|
||||
datetime.now() - start_time,
|
||||
result.rowcount,
|
||||
process_date
|
||||
))
|
||||
process_date += timedelta(days=1)
|
||||
|
||||
total_updated += result.rowcount
|
||||
print('Total inserted/updated records = {}'.format(total_updated))
|
||||
start_date = start_date.date()
|
||||
for day_diff in range((end_date - start_date).days):
|
||||
process_day = start_date + timedelta(days=day_diff)
|
||||
for notification_type in notification_types:
|
||||
print('create_nightly_notification_status_for_day triggered for {} and {}'.format(
|
||||
process_day,
|
||||
notification_type
|
||||
))
|
||||
create_nightly_notification_status_for_day.apply_async(
|
||||
kwargs={'process_day': process_day.strftime('%Y-%m-%d'), 'notification_type': notification_type},
|
||||
queue=QueueNames.REPORTING
|
||||
)
|
||||
|
||||
|
||||
@notify_command(name='bulk-invite-user-to-service')
|
||||
|
||||
@@ -9,10 +9,8 @@ from sqlalchemy.types import DateTime, Integer
|
||||
|
||||
from app import db
|
||||
from app.models import (
|
||||
EMAIL_TYPE,
|
||||
FactNotificationStatus,
|
||||
KEY_TYPE_TEST,
|
||||
LETTER_TYPE,
|
||||
Notification,
|
||||
NOTIFICATION_CANCELLED,
|
||||
NOTIFICATION_CREATED,
|
||||
@@ -24,7 +22,6 @@ from app.models import (
|
||||
NOTIFICATION_TEMPORARY_FAILURE,
|
||||
NOTIFICATION_PERMANENT_FAILURE,
|
||||
Service,
|
||||
SMS_TYPE,
|
||||
Template,
|
||||
)
|
||||
from app.dao.dao_utils import transactional
|
||||
@@ -36,7 +33,7 @@ from app.utils import (
|
||||
)
|
||||
|
||||
|
||||
def fetch_notification_status_for_day(process_day):
|
||||
def fetch_notification_status_for_day(process_day, notification_type):
|
||||
start_date = convert_bst_to_utc(datetime.combine(process_day, time.min))
|
||||
end_date = convert_bst_to_utc(datetime.combine(process_day + timedelta(days=1), time.min))
|
||||
|
||||
@@ -44,23 +41,19 @@ def fetch_notification_status_for_day(process_day):
|
||||
|
||||
all_data_for_process_day = []
|
||||
services = Service.query.all()
|
||||
# for each service
|
||||
# for each notification type
|
||||
# query notifications for day
|
||||
# if no rows try notificationHistory
|
||||
# for each service query notifications or notification_history for the day, depending on their data retention
|
||||
for service in services:
|
||||
for notification_type in [EMAIL_TYPE, SMS_TYPE, LETTER_TYPE]:
|
||||
table = get_notification_table_to_use(service, notification_type, process_day, has_delete_task_run=False)
|
||||
table = get_notification_table_to_use(service, notification_type, process_day, has_delete_task_run=False)
|
||||
|
||||
data_for_service_and_type = query_for_fact_status_data(
|
||||
table=table,
|
||||
start_date=start_date,
|
||||
end_date=end_date,
|
||||
notification_type=notification_type,
|
||||
service_id=service.id
|
||||
)
|
||||
data_for_service_and_type = query_for_fact_status_data(
|
||||
table=table,
|
||||
start_date=start_date,
|
||||
end_date=end_date,
|
||||
notification_type=notification_type,
|
||||
service_id=service.id
|
||||
)
|
||||
|
||||
all_data_for_process_day += data_for_service_and_type
|
||||
all_data_for_process_day += data_for_service_and_type
|
||||
|
||||
return all_data_for_process_day
|
||||
|
||||
@@ -70,7 +63,6 @@ def query_for_fact_status_data(table, start_date, end_date, notification_type, s
|
||||
table.template_id,
|
||||
table.service_id,
|
||||
func.coalesce(table.job_id, '00000000-0000-0000-0000-000000000000').label('job_id'),
|
||||
table.notification_type,
|
||||
table.key_type,
|
||||
table.status,
|
||||
func.count().label('notification_count')
|
||||
@@ -84,7 +76,6 @@ def query_for_fact_status_data(table, start_date, end_date, notification_type, s
|
||||
table.template_id,
|
||||
table.service_id,
|
||||
'job_id',
|
||||
table.notification_type,
|
||||
table.key_type,
|
||||
table.status
|
||||
)
|
||||
@@ -92,10 +83,11 @@ def query_for_fact_status_data(table, start_date, end_date, notification_type, s
|
||||
|
||||
|
||||
@transactional
|
||||
def update_fact_notification_status(data, process_day):
|
||||
def update_fact_notification_status(data, process_day, notification_type):
|
||||
table = FactNotificationStatus.__table__
|
||||
FactNotificationStatus.query.filter(
|
||||
FactNotificationStatus.bst_date == process_day
|
||||
FactNotificationStatus.bst_date == process_day,
|
||||
FactNotificationStatus.notification_type == notification_type
|
||||
).delete()
|
||||
|
||||
for row in data:
|
||||
@@ -104,7 +96,7 @@ def update_fact_notification_status(data, process_day):
|
||||
template_id=row.template_id,
|
||||
service_id=row.service_id,
|
||||
job_id=row.job_id,
|
||||
notification_type=row.notification_type,
|
||||
notification_type=notification_type,
|
||||
key_type=row.key_type,
|
||||
notification_status=row.status,
|
||||
notification_count=row.notification_count,
|
||||
|
||||
@@ -1,9 +1,11 @@
|
||||
import itertools
|
||||
from datetime import datetime, timedelta, date
|
||||
from decimal import Decimal
|
||||
|
||||
import pytest
|
||||
from freezegun import freeze_time
|
||||
|
||||
from app.config import QueueNames
|
||||
from app.celery.reporting_tasks import (
|
||||
create_nightly_billing,
|
||||
create_nightly_notification_status,
|
||||
@@ -56,9 +58,12 @@ def test_create_nightly_notification_status_triggers_tasks_for_days(notify_api,
|
||||
mock_celery = mocker.patch('app.celery.reporting_tasks.create_nightly_notification_status_for_day')
|
||||
create_nightly_notification_status(day_start)
|
||||
|
||||
assert mock_celery.apply_async.call_count == 4
|
||||
for i in range(4):
|
||||
assert mock_celery.apply_async.call_args_list[i][1]['kwargs'] == {'process_day': expected_kwargs[i]}
|
||||
assert mock_celery.apply_async.call_count == 4 * 3 # four days, three notification types
|
||||
for process_date, notification_type in itertools.product(expected_kwargs, ['sms', 'email', 'letter']):
|
||||
mock_celery.apply_async.assert_any_call(
|
||||
kwargs={'process_day': process_date, 'notification_type': notification_type},
|
||||
queue=QueueNames.REPORTING
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize('second_rate, records_num, billable_units, multiplier',
|
||||
@@ -453,15 +458,25 @@ def test_create_nightly_notification_status_for_day(notify_db_session):
|
||||
|
||||
assert len(FactNotificationStatus.query.all()) == 0
|
||||
|
||||
create_nightly_notification_status_for_day('2019-01-01')
|
||||
create_nightly_notification_status_for_day('2019-01-01', 'sms')
|
||||
create_nightly_notification_status_for_day('2019-01-01', 'email')
|
||||
create_nightly_notification_status_for_day('2019-01-01', 'letter')
|
||||
|
||||
new_data = FactNotificationStatus.query.all()
|
||||
new_data = FactNotificationStatus.query.order_by(FactNotificationStatus.created_at).all()
|
||||
|
||||
assert len(new_data) == 3
|
||||
assert new_data[0].bst_date == date(2019, 1, 1)
|
||||
assert new_data[1].bst_date == date(2019, 1, 1)
|
||||
assert new_data[2].bst_date == date(2019, 1, 1)
|
||||
|
||||
assert new_data[0].notification_type == 'sms'
|
||||
assert new_data[1].notification_type == 'email'
|
||||
assert new_data[2].notification_type == 'letter'
|
||||
|
||||
assert new_data[0].notification_status == 'delivered'
|
||||
assert new_data[1].notification_status == 'temporary-failure'
|
||||
assert new_data[2].notification_status == 'created'
|
||||
|
||||
|
||||
# the job runs at 12:30am London time. 04/01 is in BST.
|
||||
@freeze_time('2019-04-01T23:30')
|
||||
@@ -473,7 +488,7 @@ def test_create_nightly_notification_status_for_day_respects_bst(sample_template
|
||||
|
||||
create_notification(sample_template, status='delivered', created_at=datetime(2019, 3, 31, 22, 59)) # too old
|
||||
|
||||
create_nightly_notification_status_for_day('2019-04-01')
|
||||
create_nightly_notification_status_for_day('2019-04-01', 'sms')
|
||||
|
||||
noti_status = FactNotificationStatus.query.order_by(FactNotificationStatus.bst_date).all()
|
||||
assert len(noti_status) == 1
|
||||
|
||||
@@ -66,8 +66,9 @@ def test_update_fact_notification_status(notify_db_session):
|
||||
create_notification_history(template=second_template)
|
||||
create_notification(template=third_template)
|
||||
|
||||
data = fetch_notification_status_for_day(process_day=process_day)
|
||||
update_fact_notification_status(data=data, process_day=process_day)
|
||||
for notification_type in ('letter', 'sms', 'email'):
|
||||
data = fetch_notification_status_for_day(process_day=process_day, notification_type=notification_type)
|
||||
update_fact_notification_status(data=data, process_day=process_day, notification_type=notification_type)
|
||||
|
||||
new_fact_data = FactNotificationStatus.query.order_by(FactNotificationStatus.bst_date,
|
||||
FactNotificationStatus.notification_type
|
||||
@@ -105,8 +106,8 @@ def test__update_fact_notification_status_updates_row(notify_db_session):
|
||||
create_notification(template=first_template, status='delivered')
|
||||
|
||||
process_day = date.today()
|
||||
data = fetch_notification_status_for_day(process_day=process_day)
|
||||
update_fact_notification_status(data=data, process_day=process_day)
|
||||
data = fetch_notification_status_for_day(process_day=process_day, notification_type='sms')
|
||||
update_fact_notification_status(data=data, process_day=process_day, notification_type='sms')
|
||||
|
||||
new_fact_data = FactNotificationStatus.query.order_by(FactNotificationStatus.bst_date,
|
||||
FactNotificationStatus.notification_type
|
||||
@@ -116,8 +117,8 @@ def test__update_fact_notification_status_updates_row(notify_db_session):
|
||||
|
||||
create_notification(template=first_template, status='delivered')
|
||||
|
||||
data = fetch_notification_status_for_day(process_day=process_day)
|
||||
update_fact_notification_status(data=data, process_day=process_day)
|
||||
data = fetch_notification_status_for_day(process_day=process_day, notification_type='sms')
|
||||
update_fact_notification_status(data=data, process_day=process_day, notification_type='sms')
|
||||
|
||||
updated_fact_data = FactNotificationStatus.query.order_by(FactNotificationStatus.bst_date,
|
||||
FactNotificationStatus.notification_type
|
||||
|
||||
@@ -664,8 +664,10 @@ def test_update_service_permission_creates_a_history_record_with_current_data(no
|
||||
EMAIL_TYPE, INTERNATIONAL_SMS_TYPE, LETTER_TYPE,
|
||||
))
|
||||
|
||||
assert len(Service.get_history_model().query.filter_by(name='service_name').all()) == 3
|
||||
assert Service.get_history_model().query.filter_by(name='service_name').all()[2].version == 3
|
||||
history = Service.get_history_model().query.filter_by(name='service_name').order_by('version').all()
|
||||
|
||||
assert len(history) == 3
|
||||
assert history[2].version == 3
|
||||
|
||||
|
||||
def test_create_service_and_history_is_transactional(notify_db_session):
|
||||
|
||||
Reference in New Issue
Block a user