Changed the update/insert to a postgres upsert to avoid concurrency issues.

This commit is contained in:
Rebecca Law
2018-05-15 11:21:10 +01:00
parent 3615f3d00f
commit 271ce6d76e
2 changed files with 56 additions and 25 deletions

View File

@@ -1,5 +1,6 @@
from datetime import datetime, timedelta, time from datetime import datetime, timedelta, time
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy import func, case, desc, Date from sqlalchemy import func, case, desc, Date
from app import db from app import db
@@ -149,33 +150,48 @@ def get_rate(non_letter_rates, letter_rates, notification_type, date, crown=None
def update_fact_billing(data, process_day): def update_fact_billing(data, process_day):
inserted_records = 0
updated_records = 0
non_letter_rates, letter_rates = get_rates_for_billing() non_letter_rates, letter_rates = get_rates_for_billing()
update_count = FactBilling.query.filter( rate = get_rate(non_letter_rates,
FactBilling.bst_date == datetime.date(process_day), letter_rates,
FactBilling.template_id == data.template_id, data.notification_type,
FactBilling.service_id == data.service_id, process_day,
FactBilling.provider == data.sent_by, # This could be zero - this is a bug that needs to be fixed. data.crown,
FactBilling.rate_multiplier == data.rate_multiplier, data.rate_multiplier)
FactBilling.notification_type == data.notification_type, billing_record = create_billing_record(data, rate, process_day)
FactBilling.international == data.international
).update(
{"notifications_sent": data.notifications_sent,
"billable_units": data.billable_units},
synchronize_session=False)
if update_count == 0: table = FactBilling.__table__
rate = get_rate(non_letter_rates, '''
letter_rates, This uses the Postgres upsert to avoid race conditions when two threads try to insert
data.notification_type, at the same row. The excluded object refers to values that we tried to insert but were
process_day, rejected.
data.crown, http://docs.sqlalchemy.org/en/latest/dialects/postgresql.html#insert-on-conflict-upsert
data.rate_multiplier) '''
billing_record = create_billing_record(data, rate, process_day) stmt = insert(table).values(
db.session.add(billing_record) bst_date=billing_record.bst_date,
inserted_records += 1 template_id=billing_record.template_id,
updated_records += update_count service_id=billing_record.service_id,
provider=billing_record.provider,
rate_multiplier=billing_record.rate_multiplier,
notification_type=billing_record.notification_type,
international=billing_record.international,
billable_units=billing_record.billable_units,
notifications_sent=billing_record.notifications_sent,
rate=billing_record.rate
)
stmt = stmt.on_conflict_do_update(
index_elements=[table.c.bst_date,
table.c.template_id,
table.c.service_id,
table.c.provider,
table.c.rate_multiplier,
table.c.notification_type,
table.c.international],
set_={"notifications_sent": stmt.excluded.notifications_sent,
"billable_units": stmt.excluded.billable_units
}
)
db.session.connection().execute(stmt)
db.session.commit() db.session.commit()

View File

@@ -438,7 +438,22 @@ def test_create_nightly_billing_update_when_record_exists(
assert len(records) == 1 assert len(records) == 1
assert records[0].bst_date == date(2018, 1, 14) assert records[0].bst_date == date(2018, 1, 14)
assert records[0].billable_units == 1
sample_notification(
notify_db,
notify_db_session,
created_at=datetime.now() - timedelta(days=1),
service=sample_service,
template=sample_template,
status='delivered',
sent_by=None,
international=False,
rate_multiplier=1.0,
billable_units=1,
)
# run again, make sure create_nightly_billing() updates with no error # run again, make sure create_nightly_billing() updates with no error
create_nightly_billing() create_nightly_billing()
assert len(records) == 1 assert len(records) == 1
assert records[0].billable_units == 2