split up reporting tasks in to separate tasks per day

to try and speed up overall time by parallelising
This commit is contained in:
Leo Hemsted
2019-08-15 16:57:31 +01:00
parent ad389e7252
commit 36dd750637
3 changed files with 141 additions and 124 deletions

View File

@@ -33,13 +33,35 @@ def create_nightly_billing(day_start=None):
for i in range(0, 4):
process_day = day_start - timedelta(days=i)
transit_data = fetch_billing_data_for_day(process_day=process_day)
create_nightly_billing_for_day.apply_async(
kwargs={'process_day': process_day.isoformat()},
queue=QueueNames.REPORTING
)
for data in transit_data:
update_fact_billing(data, process_day)
current_app.logger.info(
"create-nightly-billing task complete. {} rows updated for day: {}".format(len(transit_data), process_day))
@notify_celery.task(name="create-nightly-billing-for-day")
@statsd(namespace="tasks")
def create_nightly_billing_for_day(process_day):
process_day = datetime.strptime(process_day, "%Y-%m-%d").date()
start = datetime.utcnow()
transit_data = fetch_billing_data_for_day(process_day=process_day)
end = datetime.utcnow()
current_app.logger.info('create-nightly-billing-for-day {} fetched in {} seconds'.format(
process_day,
(end - start).seconds)
)
for data in transit_data:
update_fact_billing(data, process_day)
current_app.logger.info(
"create-nightly-billing-for-day task complete. {} rows updated for day: {}".format(
len(transit_data),
process_day
)
)
@notify_celery.task(name="create-nightly-notification-status")
@@ -56,17 +78,29 @@ def create_nightly_notification_status(day_start=None):
for i in range(0, 4):
process_day = day_start - timedelta(days=i)
transit_data = fetch_notification_status_for_day(process_day=process_day)
update_fact_notification_status(transit_data, process_day)
current_app.logger.info(
"create-nightly-notification-status task: {} rows updated for day: {}".format(
len(transit_data), process_day
)
create_nightly_notification_status_for_day.apply_async(
kwargs={'process_day': process_day.isoformat()},
queue=QueueNames.REPORTING
)
# delete jobs need to happen after nightly notification status is recorded to avoid conflict between the two tasks
delete_email_notifications_older_than_retention.apply_async(queue=QueueNames.PERIODIC)
delete_sms_notifications_older_than_retention.apply_async(queue=QueueNames.PERIODIC)
delete_letter_notifications_older_than_retention.apply_async(queue=QueueNames.PERIODIC)
@notify_celery.task(name="create-nightly-notification-status-for-day")
@statsd(namespace="tasks")
def create_nightly_notification_status_for_day(process_day):
process_day = datetime.strptime(process_day, "%Y-%m-%d").date()
start = datetime.utcnow()
transit_data = fetch_notification_status_for_day(process_day=process_day)
end = datetime.utcnow()
current_app.logger.info('create-nightly-notification-status-for-day {} fetched in {} seconds'.format(
process_day,
(end - start).seconds)
)
update_fact_notification_status(transit_data, process_day)
current_app.logger.info(
"create-nightly-notification-status-for-day task complete: {} rows updated for day: {}".format(
len(transit_data), process_day
)
)

View File

@@ -80,7 +80,8 @@ def query_for_fact_status_data(table, start_date, end_date, notification_type, s
table.created_at >= start_date,
table.created_at < end_date,
table.notification_type == notification_type,
table.service_id == service_id
table.service_id == service_id,
table.key_type != KEY_TYPE_TEST
).group_by(
table.template_id,
table.service_id,

View File

@@ -4,7 +4,12 @@ from decimal import Decimal
import pytest
from freezegun import freeze_time
from app.celery.reporting_tasks import create_nightly_billing, create_nightly_notification_status
from app.celery.reporting_tasks import (
create_nightly_billing,
create_nightly_notification_status,
create_nightly_billing_for_day,
create_nightly_notification_status_for_day,
)
from app.dao.fact_billing_dao import get_rate
from app.models import (
FactBilling,
@@ -28,10 +33,38 @@ def mocker_get_rate(
return Decimal(0)
@freeze_time('2019-08-01')
@pytest.mark.parametrize('day_start, expected_kwargs', [
(None, ['2019-07-31', '2019-07-30', '2019-07-29', '2019-07-28']),
('2019-07-21', ['2019-07-21', '2019-07-20', '2019-07-19', '2019-07-18']),
])
def test_create_nightly_billing_triggers_tasks_for_days(notify_api, mocker, day_start, expected_kwargs):
mock_celery = mocker.patch('app.celery.reporting_tasks.create_nightly_billing_for_day')
create_nightly_billing(day_start)
assert mock_celery.apply_async.call_count == 4
for i in range(4):
assert mock_celery.apply_async.call_args_list[i][1]['kwargs'] == {'process_day': expected_kwargs[i]}
@freeze_time('2019-08-01')
@pytest.mark.parametrize('day_start, expected_kwargs', [
(None, ['2019-07-31', '2019-07-30', '2019-07-29', '2019-07-28']),
('2019-07-21', ['2019-07-21', '2019-07-20', '2019-07-19', '2019-07-18']),
])
def test_create_nightly_notification_status_triggers_tasks_for_days(notify_api, mocker, day_start, expected_kwargs):
mock_celery = mocker.patch('app.celery.reporting_tasks.create_nightly_notification_status_for_day')
create_nightly_notification_status(day_start)
assert mock_celery.apply_async.call_count == 4
for i in range(4):
assert mock_celery.apply_async.call_args_list[i][1]['kwargs'] == {'process_day': expected_kwargs[i]}
@pytest.mark.parametrize('second_rate, records_num, billable_units, multiplier',
[(1.0, 1, 2, [1]),
(2.0, 2, 1, [1, 2])])
def test_create_nightly_billing_sms_rate_multiplier(
def test_create_nightly_billing_for_day_sms_rate_multiplier(
sample_service,
sample_template,
mocker,
@@ -69,7 +102,7 @@ def test_create_nightly_billing_sms_rate_multiplier(
# Celery expects the arguments to be a string or primitive type.
yesterday_str = datetime.strftime(yesterday, "%Y-%m-%d")
create_nightly_billing(yesterday_str)
create_nightly_billing_for_day(yesterday_str)
records = FactBilling.query.order_by('rate_multiplier').all()
assert len(records) == records_num
for i, record in enumerate(records):
@@ -79,7 +112,7 @@ def test_create_nightly_billing_sms_rate_multiplier(
assert record.rate_multiplier == multiplier[i]
def test_create_nightly_billing_different_templates(
def test_create_nightly_billing_for_day_different_templates(
sample_service,
sample_template,
sample_email_template,
@@ -111,7 +144,7 @@ def test_create_nightly_billing_different_templates(
assert len(records) == 0
# Celery expects the arguments to be a string or primitive type.
yesterday_str = datetime.strftime(yesterday, "%Y-%m-%d")
create_nightly_billing(yesterday_str)
create_nightly_billing_for_day(yesterday_str)
records = FactBilling.query.order_by('rate_multiplier').all()
assert len(records) == 2
@@ -125,7 +158,7 @@ def test_create_nightly_billing_different_templates(
assert record.rate_multiplier == multiplier[i]
def test_create_nightly_billing_different_sent_by(
def test_create_nightly_billing_for_day_different_sent_by(
sample_service,
sample_template,
sample_email_template,
@@ -159,7 +192,7 @@ def test_create_nightly_billing_different_sent_by(
# Celery expects the arguments to be a string or primitive type.
yesterday_str = datetime.strftime(yesterday, "%Y-%m-%d")
create_nightly_billing(yesterday_str)
create_nightly_billing_for_day(yesterday_str)
records = FactBilling.query.order_by('rate_multiplier').all()
assert len(records) == 2
@@ -170,7 +203,7 @@ def test_create_nightly_billing_different_sent_by(
assert record.rate_multiplier == 1.0
def test_create_nightly_billing_different_letter_postage(
def test_create_nightly_billing_for_day_different_letter_postage(
notify_db_session,
sample_letter_template,
mocker):
@@ -199,7 +232,7 @@ def test_create_nightly_billing_different_letter_postage(
assert len(records) == 0
# Celery expects the arguments to be a string or primitive type.
yesterday_str = datetime.strftime(yesterday, "%Y-%m-%d")
create_nightly_billing(yesterday_str)
create_nightly_billing_for_day(yesterday_str)
records = FactBilling.query.order_by('postage').all()
assert len(records) == 2
@@ -216,7 +249,7 @@ def test_create_nightly_billing_different_letter_postage(
assert records[1].billable_units == 2
def test_create_nightly_billing_letter(
def test_create_nightly_billing_for_day_letter(
sample_service,
sample_letter_template,
mocker):
@@ -238,7 +271,7 @@ def test_create_nightly_billing_letter(
assert len(records) == 0
# Celery expects the arguments to be a string or primitive type.
yesterday_str = datetime.strftime(yesterday, "%Y-%m-%d")
create_nightly_billing(yesterday_str)
create_nightly_billing_for_day(yesterday_str)
records = FactBilling.query.order_by('rate_multiplier').all()
assert len(records) == 1
record = records[0]
@@ -249,7 +282,7 @@ def test_create_nightly_billing_letter(
assert record.rate_multiplier == 2.0
def test_create_nightly_billing_null_sent_by_sms(
def test_create_nightly_billing_for_day_null_sent_by_sms(
sample_service,
sample_template,
mocker):
@@ -272,7 +305,7 @@ def test_create_nightly_billing_null_sent_by_sms(
# Celery expects the arguments to be a string or primitive type.
yesterday_str = datetime.strftime(yesterday, "%Y-%m-%d")
create_nightly_billing(yesterday_str)
create_nightly_billing_for_day(yesterday_str)
records = FactBilling.query.all()
assert len(records) == 1
@@ -284,39 +317,6 @@ def test_create_nightly_billing_null_sent_by_sms(
assert record.provider == 'unknown'
@freeze_time('2018-01-15T03:30:00')
def test_create_nightly_billing_consolidate_from_3_days_delta(
sample_template,
mocker):
mocker.patch('app.dao.fact_billing_dao.get_rate', side_effect=mocker_get_rate)
# create records from 11th to 15th
for i in range(0, 5):
create_notification(
created_at=datetime.now() - timedelta(days=i),
template=sample_template,
status='delivered',
sent_by=None,
international=False,
rate_multiplier=1.0,
billable_units=1,
)
notification = Notification.query.order_by(Notification.created_at).all()
assert datetime.date(notification[0].created_at) == date(2018, 1, 11)
records = FactBilling.query.all()
assert len(records) == 0
create_nightly_billing()
records = FactBilling.query.order_by(FactBilling.bst_date).all()
assert len(records) == 4
assert records[0].bst_date == date(2018, 1, 11)
assert records[-1].bst_date == date(2018, 1, 14)
def test_get_rate_for_letter_latest(notify_db_session):
# letter rates should be passed into the get_rate function as a tuple of start_date, crown, sheet_count,
# rate and post_class
@@ -341,50 +341,54 @@ def test_get_rate_for_sms_and_email(notify_db_session):
assert rate == Decimal(0)
@freeze_time('2018-03-26T23:30:00')
@freeze_time('2018-03-30T01:00:00')
# summer time starts on 2018-03-25
def test_create_nightly_billing_use_BST(
def test_create_nightly_billing_for_day_use_BST(
sample_service,
sample_template,
mocker):
mocker.patch('app.dao.fact_billing_dao.get_rate', side_effect=mocker_get_rate)
# too late
create_notification(
created_at=datetime(2018, 3, 25, 12, 0),
created_at=datetime(2018, 3, 25, 23, 1),
template=sample_template,
status='delivered',
sent_by=None,
international=False,
rate_multiplier=1.0,
billable_units=1,
)
create_notification(
created_at=datetime(2018, 3, 25, 23, 5),
created_at=datetime(2018, 3, 25, 22, 59),
template=sample_template,
status='delivered',
sent_by=None,
international=False,
rate_multiplier=1.0,
billable_units=1,
billable_units=2,
)
notifications = Notification.query.order_by(Notification.created_at).all()
assert len(notifications) == 2
records = FactBilling.query.all()
assert len(records) == 0
# too early
create_notification(
created_at=datetime(2018, 3, 24, 23, 59),
template=sample_template,
status='delivered',
rate_multiplier=1.0,
billable_units=4,
)
create_nightly_billing()
assert Notification.query.count() == 3
assert FactBilling.query.count() == 0
create_nightly_billing_for_day('2018-03-25')
records = FactBilling.query.order_by(FactBilling.bst_date).all()
assert len(records) == 2
assert len(records) == 1
assert records[0].bst_date == date(2018, 3, 25)
assert records[-1].bst_date == date(2018, 3, 26)
assert records[0].billable_units == 2
@freeze_time('2018-01-15T03:30:00')
def test_create_nightly_billing_update_when_record_exists(
def test_create_nightly_billing_for_day_update_when_record_exists(
sample_service,
sample_template,
mocker):
@@ -404,7 +408,7 @@ def test_create_nightly_billing_update_when_record_exists(
records = FactBilling.query.all()
assert len(records) == 0
create_nightly_billing()
create_nightly_billing_for_day('2018-01-14')
records = FactBilling.query.order_by(FactBilling.bst_date).all()
assert len(records) == 1
@@ -423,13 +427,14 @@ def test_create_nightly_billing_update_when_record_exists(
)
# run again, make sure create_nightly_billing() updates with no error
create_nightly_billing()
create_nightly_billing_for_day('2018-01-14')
assert len(records) == 1
assert records[0].billable_units == 2
assert records[0].updated_at
def test_create_nightly_notification_status(notify_db_session, mocker):
@freeze_time('2019-01-05')
def test_create_nightly_notification_status_for_day(notify_db_session, mocker):
mocks = [
mocker.patch('app.celery.reporting_tasks.delete_email_notifications_older_than_retention'),
mocker.patch('app.celery.reporting_tasks.delete_sms_notifications_older_than_retention'),
@@ -444,36 +449,24 @@ def test_create_nightly_notification_status(notify_db_session, mocker):
third_template = create_template(service=third_service, template_type='letter')
create_notification(template=first_template, status='delivered')
create_notification(template=first_template, status='delivered', created_at=datetime.utcnow() - timedelta(days=1))
create_notification(template=first_template, status='delivered', created_at=datetime.utcnow() - timedelta(days=2))
create_notification(template=first_template, status='delivered', created_at=datetime.utcnow() - timedelta(days=10))
create_notification(template=first_template, status='delivered', created_at=datetime.utcnow() - timedelta(days=10))
create_notification(template=first_template, status='delivered', created_at=datetime(2019, 1, 1, 12, 0))
create_notification(template=second_template, status='temporary-failure')
create_notification(template=second_template, status='temporary-failure', created_at=datetime.utcnow() - timedelta(days=1)) # noqa
create_notification(template=second_template, status='temporary-failure', created_at=datetime.utcnow() - timedelta(days=2)) # noqa
create_notification(template=second_template, status='temporary-failure', created_at=datetime.utcnow() - timedelta(days=10)) # noqa
create_notification(template=second_template, status='temporary-failure', created_at=datetime.utcnow() - timedelta(days=10)) # noqa
create_notification(template=second_template, status='temporary-failure', created_at=datetime(2019, 1, 1, 12, 0))
create_notification(template=third_template, status='created')
create_notification(template=third_template, status='created', created_at=datetime.utcnow() - timedelta(days=1))
create_notification(template=third_template, status='created', created_at=datetime.utcnow() - timedelta(days=2))
create_notification(template=third_template, status='created', created_at=datetime.utcnow() - timedelta(days=10))
create_notification(template=third_template, status='created', created_at=datetime.utcnow() - timedelta(days=10))
create_notification(template=third_template, status='created', created_at=datetime(2019, 1, 1, 12, 0))
assert len(FactNotificationStatus.query.all()) == 0
create_nightly_notification_status()
new_data = FactNotificationStatus.query.order_by(
FactNotificationStatus.bst_date,
).all()
assert len(new_data) == 6 #
assert str(new_data[0].bst_date) == datetime.strftime(datetime.utcnow() - timedelta(days=2), "%Y-%m-%d")
assert str(new_data[1].bst_date) == datetime.strftime(datetime.utcnow() - timedelta(days=2), "%Y-%m-%d")
assert str(new_data[2].bst_date) == datetime.strftime(datetime.utcnow() - timedelta(days=2), "%Y-%m-%d")
assert str(new_data[3].bst_date) == datetime.strftime(datetime.utcnow() - timedelta(days=1), "%Y-%m-%d")
assert str(new_data[4].bst_date) == datetime.strftime(datetime.utcnow() - timedelta(days=1), "%Y-%m-%d")
assert str(new_data[5].bst_date) == datetime.strftime(datetime.utcnow() - timedelta(days=1), "%Y-%m-%d")
create_nightly_notification_status_for_day('2019-01-01')
new_data = FactNotificationStatus.query.all()
assert len(new_data) == 3
assert new_data[0].bst_date == date(2019, 1, 1)
assert new_data[1].bst_date == date(2019, 1, 1)
assert new_data[2].bst_date == date(2019, 1, 1)
for mock in mocks:
mock.apply_async.assert_called_once_with(queue='periodic-tasks')
@@ -481,7 +474,7 @@ def test_create_nightly_notification_status(notify_db_session, mocker):
# the job runs at 12:30am London time. 04/01 is in BST.
@freeze_time('2019-04-01T23:30')
def test_create_nightly_notification_status_respects_bst(sample_template, mocker):
def test_create_nightly_notification_status_for_day_respects_bst(sample_template, mocker):
mocker.patch('app.celery.reporting_tasks.delete_email_notifications_older_than_retention')
mocker.patch('app.celery.reporting_tasks.delete_sms_notifications_older_than_retention')
mocker.patch('app.celery.reporting_tasks.delete_letter_notifications_older_than_retention')
@@ -491,23 +484,12 @@ def test_create_nightly_notification_status_respects_bst(sample_template, mocker
create_notification(sample_template, status='created', created_at=datetime(2019, 4, 1, 22, 59))
create_notification(sample_template, status='created', created_at=datetime(2019, 3, 31, 23, 0))
create_notification(sample_template, status='temporary-failure', created_at=datetime(2019, 3, 31, 22, 59))
create_notification(sample_template, status='delivered', created_at=datetime(2019, 3, 31, 22, 59)) # too old
# we create records for last ten days
create_notification(sample_template, status='sending', created_at=datetime(2019, 3, 29, 0, 0))
create_notification(sample_template, status='delivered', created_at=datetime(2019, 3, 22, 23, 59)) # too old
create_nightly_notification_status()
create_nightly_notification_status_for_day('2019-04-01')
noti_status = FactNotificationStatus.query.order_by(FactNotificationStatus.bst_date).all()
assert len(noti_status) == 3
assert len(noti_status) == 1
assert noti_status[0].bst_date == date(2019, 3, 29)
assert noti_status[0].notification_status == 'sending'
assert noti_status[1].bst_date == date(2019, 3, 31)
assert noti_status[1].notification_status == 'temporary-failure'
assert noti_status[2].bst_date == date(2019, 4, 1)
assert noti_status[2].notification_status == 'created'
assert noti_status[0].bst_date == date(2019, 4, 1)
assert noti_status[0].notification_status == 'created'