mirror of
https://github.com/GSA/notifications-api.git
synced 2025-12-23 00:41:35 -05:00
Revert "Revert running status aggregation in parallel"
This reverts commit 0f6dea0deb.
This commit is contained in:
@@ -3,7 +3,7 @@ from datetime import datetime, timedelta
|
|||||||
from flask import current_app
|
from flask import current_app
|
||||||
from notifications_utils.timezones import convert_utc_to_bst
|
from notifications_utils.timezones import convert_utc_to_bst
|
||||||
|
|
||||||
from app import notify_celery
|
from app import db, notify_celery
|
||||||
from app.config import QueueNames
|
from app.config import QueueNames
|
||||||
from app.cronitor import cronitor
|
from app.cronitor import cronitor
|
||||||
from app.dao.fact_billing_dao import (
|
from app.dao.fact_billing_dao import (
|
||||||
@@ -91,47 +91,46 @@ def create_nightly_notification_status():
|
|||||||
|
|
||||||
yesterday = convert_utc_to_bst(datetime.utcnow()).date() - timedelta(days=1)
|
yesterday = convert_utc_to_bst(datetime.utcnow()).date() - timedelta(days=1)
|
||||||
|
|
||||||
|
for (service_id,) in db.session.query(Service.id):
|
||||||
for notification_type in [SMS_TYPE, EMAIL_TYPE, LETTER_TYPE]:
|
for notification_type in [SMS_TYPE, EMAIL_TYPE, LETTER_TYPE]:
|
||||||
days = 10 if notification_type == LETTER_TYPE else 4
|
days = 10 if notification_type == LETTER_TYPE else 4
|
||||||
|
|
||||||
for i in range(days):
|
for i in range(days):
|
||||||
process_day = yesterday - timedelta(days=i)
|
process_day = yesterday - timedelta(days=i)
|
||||||
|
|
||||||
create_nightly_notification_status_for_day.apply_async(
|
create_nightly_notification_status_for_service_and_day.apply_async(
|
||||||
kwargs={
|
kwargs={
|
||||||
'process_day': process_day.isoformat(),
|
'process_day': process_day.isoformat(),
|
||||||
'notification_type': notification_type,
|
'notification_type': notification_type,
|
||||||
|
'service_id': service_id,
|
||||||
},
|
},
|
||||||
queue=QueueNames.REPORTING
|
queue=QueueNames.REPORTING
|
||||||
)
|
)
|
||||||
current_app.logger.info(
|
current_app.logger.info(
|
||||||
f"create-nightly-notification-status-for-day task created "
|
f"create-nightly-notification-status-for-day task created "
|
||||||
f"for {notification_type} and {process_day}"
|
f"for {service_id}, {notification_type} and {process_day}"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@notify_celery.task(name="create-nightly-notification-status-for-day")
|
@notify_celery.task(name="create-nightly-notification-status-for-service-and-day")
|
||||||
def create_nightly_notification_status_for_day(process_day, notification_type):
|
def create_nightly_notification_status_for_service_and_day(process_day, service_id, notification_type):
|
||||||
process_day = datetime.strptime(process_day, "%Y-%m-%d").date()
|
process_day = datetime.strptime(process_day, "%Y-%m-%d").date()
|
||||||
current_app.logger.info(
|
current_app.logger.info(
|
||||||
f'create-nightly-notification-status-for-day task started '
|
f'create-nightly-notification-status-for-day task started '
|
||||||
f'for {notification_type} for {process_day}'
|
f'for {service_id}, {notification_type} for {process_day}'
|
||||||
)
|
)
|
||||||
|
|
||||||
start = datetime.utcnow()
|
start = datetime.utcnow()
|
||||||
new_status_rows = []
|
new_status_rows = fetch_status_data_for_service_and_day(
|
||||||
|
|
||||||
for service in Service.query.all():
|
|
||||||
new_status_rows += fetch_status_data_for_service_and_day(
|
|
||||||
process_day=process_day,
|
process_day=process_day,
|
||||||
notification_type=notification_type,
|
notification_type=notification_type,
|
||||||
service_id=service.id,
|
service_id=service_id,
|
||||||
)
|
)
|
||||||
|
|
||||||
end = datetime.utcnow()
|
end = datetime.utcnow()
|
||||||
current_app.logger.info(
|
current_app.logger.info(
|
||||||
f'create-nightly-notification-status-for-day task fetch '
|
f'create-nightly-notification-status-for-day task fetch '
|
||||||
f'for {notification_type} for {process_day}: '
|
f'for {service_id}, {notification_type} for {process_day}: '
|
||||||
f'data fetched in {(end - start).seconds} seconds'
|
f'data fetched in {(end - start).seconds} seconds'
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -139,10 +138,11 @@ def create_nightly_notification_status_for_day(process_day, notification_type):
|
|||||||
new_status_rows=new_status_rows,
|
new_status_rows=new_status_rows,
|
||||||
process_day=process_day,
|
process_day=process_day,
|
||||||
notification_type=notification_type,
|
notification_type=notification_type,
|
||||||
|
service_id=service_id
|
||||||
)
|
)
|
||||||
|
|
||||||
current_app.logger.info(
|
current_app.logger.info(
|
||||||
f'create-nightly-notification-status-for-day task finished '
|
f'create-nightly-notification-status-for-day task finished '
|
||||||
f'for {notification_type} for {process_day}: '
|
f'for {service_id}, {notification_type} for {process_day}: '
|
||||||
f'{len(new_status_rows)} rows updated'
|
f'{len(new_status_rows)} rows updated'
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -45,7 +45,6 @@ def fetch_status_data_for_service_and_day(process_day, service_id, notification_
|
|||||||
|
|
||||||
return db.session.query(
|
return db.session.query(
|
||||||
table.template_id,
|
table.template_id,
|
||||||
table.service_id,
|
|
||||||
func.coalesce(table.job_id, '00000000-0000-0000-0000-000000000000').label('job_id'),
|
func.coalesce(table.job_id, '00000000-0000-0000-0000-000000000000').label('job_id'),
|
||||||
table.key_type,
|
table.key_type,
|
||||||
table.status,
|
table.status,
|
||||||
@@ -58,7 +57,6 @@ def fetch_status_data_for_service_and_day(process_day, service_id, notification_
|
|||||||
table.key_type.in_((KEY_TYPE_NORMAL, KEY_TYPE_TEAM)),
|
table.key_type.in_((KEY_TYPE_NORMAL, KEY_TYPE_TEAM)),
|
||||||
).group_by(
|
).group_by(
|
||||||
table.template_id,
|
table.template_id,
|
||||||
table.service_id,
|
|
||||||
'job_id',
|
'job_id',
|
||||||
table.key_type,
|
table.key_type,
|
||||||
table.status
|
table.status
|
||||||
@@ -66,11 +64,12 @@ def fetch_status_data_for_service_and_day(process_day, service_id, notification_
|
|||||||
|
|
||||||
|
|
||||||
@autocommit
|
@autocommit
|
||||||
def update_fact_notification_status(new_status_rows, process_day, notification_type):
|
def update_fact_notification_status(new_status_rows, process_day, notification_type, service_id):
|
||||||
table = FactNotificationStatus.__table__
|
table = FactNotificationStatus.__table__
|
||||||
FactNotificationStatus.query.filter(
|
FactNotificationStatus.query.filter(
|
||||||
FactNotificationStatus.bst_date == process_day,
|
FactNotificationStatus.bst_date == process_day,
|
||||||
FactNotificationStatus.notification_type == notification_type,
|
FactNotificationStatus.notification_type == notification_type,
|
||||||
|
FactNotificationStatus.service_id == service_id,
|
||||||
).delete()
|
).delete()
|
||||||
|
|
||||||
for row in new_status_rows:
|
for row in new_status_rows:
|
||||||
@@ -78,7 +77,7 @@ def update_fact_notification_status(new_status_rows, process_day, notification_t
|
|||||||
insert(table).values(
|
insert(table).values(
|
||||||
bst_date=process_day,
|
bst_date=process_day,
|
||||||
template_id=row.template_id,
|
template_id=row.template_id,
|
||||||
service_id=row.service_id,
|
service_id=service_id,
|
||||||
job_id=row.job_id,
|
job_id=row.job_id,
|
||||||
notification_type=notification_type,
|
notification_type=notification_type,
|
||||||
key_type=row.key_type,
|
key_type=row.key_type,
|
||||||
|
|||||||
@@ -10,7 +10,7 @@ from app.celery.reporting_tasks import (
|
|||||||
create_nightly_billing,
|
create_nightly_billing,
|
||||||
create_nightly_billing_for_day,
|
create_nightly_billing_for_day,
|
||||||
create_nightly_notification_status,
|
create_nightly_notification_status,
|
||||||
create_nightly_notification_status_for_day,
|
create_nightly_notification_status_for_service_and_day,
|
||||||
)
|
)
|
||||||
from app.config import QueueNames
|
from app.config import QueueNames
|
||||||
from app.dao.fact_billing_dao import get_rate
|
from app.dao.fact_billing_dao import get_rate
|
||||||
@@ -63,7 +63,7 @@ def test_create_nightly_billing_triggers_tasks_for_days(notify_api, mocker, day_
|
|||||||
|
|
||||||
@freeze_time('2019-08-01')
|
@freeze_time('2019-08-01')
|
||||||
def test_create_nightly_notification_status_triggers_tasks(notify_api, sample_service, mocker):
|
def test_create_nightly_notification_status_triggers_tasks(notify_api, sample_service, mocker):
|
||||||
mock_celery = mocker.patch('app.celery.reporting_tasks.create_nightly_notification_status_for_day')
|
mock_celery = mocker.patch('app.celery.reporting_tasks.create_nightly_notification_status_for_service_and_day')
|
||||||
create_nightly_notification_status()
|
create_nightly_notification_status()
|
||||||
|
|
||||||
assert mock_celery.apply_async.call_count == (
|
assert mock_celery.apply_async.call_count == (
|
||||||
@@ -80,6 +80,7 @@ def test_create_nightly_notification_status_triggers_tasks(notify_api, sample_se
|
|||||||
kwargs={
|
kwargs={
|
||||||
'process_day': process_date,
|
'process_day': process_date,
|
||||||
'notification_type': notification_type,
|
'notification_type': notification_type,
|
||||||
|
'service_id': sample_service.id,
|
||||||
},
|
},
|
||||||
queue=QueueNames.REPORTING
|
queue=QueueNames.REPORTING
|
||||||
)
|
)
|
||||||
@@ -89,6 +90,7 @@ def test_create_nightly_notification_status_triggers_tasks(notify_api, sample_se
|
|||||||
kwargs={
|
kwargs={
|
||||||
'process_day': process_date,
|
'process_day': process_date,
|
||||||
'notification_type': LETTER_TYPE,
|
'notification_type': LETTER_TYPE,
|
||||||
|
'service_id': sample_service.id,
|
||||||
},
|
},
|
||||||
queue=QueueNames.REPORTING
|
queue=QueueNames.REPORTING
|
||||||
)
|
)
|
||||||
@@ -507,7 +509,7 @@ def test_create_nightly_billing_for_day_update_when_record_exists(
|
|||||||
assert records[0].updated_at
|
assert records[0].updated_at
|
||||||
|
|
||||||
|
|
||||||
def test_create_nightly_notification_status_for_day(notify_db_session):
|
def test_create_nightly_notification_status_for_service_and_day(notify_db_session):
|
||||||
first_service = create_service(service_name='First Service')
|
first_service = create_service(service_name='First Service')
|
||||||
first_template = create_template(service=first_service)
|
first_template = create_template(service=first_service)
|
||||||
second_service = create_service(service_name='second Service')
|
second_service = create_service(service_name='second Service')
|
||||||
@@ -538,9 +540,9 @@ def test_create_nightly_notification_status_for_day(notify_db_session):
|
|||||||
|
|
||||||
assert len(FactNotificationStatus.query.all()) == 0
|
assert len(FactNotificationStatus.query.all()) == 0
|
||||||
|
|
||||||
create_nightly_notification_status_for_day(str(process_day), 'sms')
|
create_nightly_notification_status_for_service_and_day(str(process_day), first_service.id, 'sms')
|
||||||
create_nightly_notification_status_for_day(str(process_day), 'email')
|
create_nightly_notification_status_for_service_and_day(str(process_day), second_service.id, 'email')
|
||||||
create_nightly_notification_status_for_day(str(process_day), 'letter')
|
create_nightly_notification_status_for_service_and_day(str(process_day), third_service.id, 'letter')
|
||||||
|
|
||||||
new_fact_data = FactNotificationStatus.query.order_by(
|
new_fact_data = FactNotificationStatus.query.order_by(
|
||||||
FactNotificationStatus.notification_type
|
FactNotificationStatus.notification_type
|
||||||
@@ -575,13 +577,13 @@ def test_create_nightly_notification_status_for_day(notify_db_session):
|
|||||||
assert new_fact_data[2].key_type == KEY_TYPE_NORMAL
|
assert new_fact_data[2].key_type == KEY_TYPE_NORMAL
|
||||||
|
|
||||||
|
|
||||||
def test_create_nightly_notification_status_for_day_overwrites_old_data(notify_db_session):
|
def test_create_nightly_notification_status_for_service_and_day_overwrites_old_data(notify_db_session):
|
||||||
first_service = create_service(service_name='First Service')
|
first_service = create_service(service_name='First Service')
|
||||||
first_template = create_template(service=first_service)
|
first_template = create_template(service=first_service)
|
||||||
create_notification(template=first_template, status='delivered')
|
create_notification(template=first_template, status='delivered')
|
||||||
|
|
||||||
process_day = date.today()
|
process_day = date.today()
|
||||||
create_nightly_notification_status_for_day(str(process_day), 'sms')
|
create_nightly_notification_status_for_service_and_day(str(process_day), first_service.id, 'sms')
|
||||||
|
|
||||||
new_fact_data = FactNotificationStatus.query.order_by(
|
new_fact_data = FactNotificationStatus.query.order_by(
|
||||||
FactNotificationStatus.bst_date,
|
FactNotificationStatus.bst_date,
|
||||||
@@ -592,7 +594,7 @@ def test_create_nightly_notification_status_for_day_overwrites_old_data(notify_d
|
|||||||
assert new_fact_data[0].notification_count == 1
|
assert new_fact_data[0].notification_count == 1
|
||||||
|
|
||||||
create_notification(template=first_template, status='delivered')
|
create_notification(template=first_template, status='delivered')
|
||||||
create_nightly_notification_status_for_day(str(process_day), 'sms')
|
create_nightly_notification_status_for_service_and_day(str(process_day), first_service.id, 'sms')
|
||||||
|
|
||||||
updated_fact_data = FactNotificationStatus.query.order_by(
|
updated_fact_data = FactNotificationStatus.query.order_by(
|
||||||
FactNotificationStatus.bst_date,
|
FactNotificationStatus.bst_date,
|
||||||
@@ -605,7 +607,7 @@ def test_create_nightly_notification_status_for_day_overwrites_old_data(notify_d
|
|||||||
|
|
||||||
# the job runs at 12:30am London time. 04/01 is in BST.
|
# the job runs at 12:30am London time. 04/01 is in BST.
|
||||||
@freeze_time('2019-04-01T23:30')
|
@freeze_time('2019-04-01T23:30')
|
||||||
def test_create_nightly_notification_status_for_day_respects_bst(sample_template):
|
def test_create_nightly_notification_status_for_service_and_day_respects_bst(sample_template):
|
||||||
create_notification(sample_template, status='delivered', created_at=datetime(2019, 4, 1, 23, 0)) # too new
|
create_notification(sample_template, status='delivered', created_at=datetime(2019, 4, 1, 23, 0)) # too new
|
||||||
|
|
||||||
create_notification(sample_template, status='created', created_at=datetime(2019, 4, 1, 22, 59))
|
create_notification(sample_template, status='created', created_at=datetime(2019, 4, 1, 22, 59))
|
||||||
@@ -613,7 +615,7 @@ def test_create_nightly_notification_status_for_day_respects_bst(sample_template
|
|||||||
|
|
||||||
create_notification(sample_template, status='delivered', created_at=datetime(2019, 3, 31, 22, 59)) # too old
|
create_notification(sample_template, status='delivered', created_at=datetime(2019, 3, 31, 22, 59)) # too old
|
||||||
|
|
||||||
create_nightly_notification_status_for_day('2019-04-01', 'sms')
|
create_nightly_notification_status_for_service_and_day('2019-04-01', sample_template.service_id, 'sms')
|
||||||
|
|
||||||
noti_status = FactNotificationStatus.query.order_by(FactNotificationStatus.bst_date).all()
|
noti_status = FactNotificationStatus.query.order_by(FactNotificationStatus.bst_date).all()
|
||||||
assert len(noti_status) == 1
|
assert len(noti_status) == 1
|
||||||
|
|||||||
Reference in New Issue
Block a user