From ed182c2a221c403bafeb15217b6fb4a9d3041427 Mon Sep 17 00:00:00 2001 From: Leo Hemsted Date: Fri, 23 Oct 2020 20:01:16 +0100 Subject: [PATCH 1/2] return just the columns we need for collating letters previously we were returning the entire ORM object. Returning columns has a couple of benefits: * Means we can join on to services there and then, avoiding second queries to get the crown status of the service later in the collate flow. * Massively reduces the amount of data we return - particularly free text fields like personalisation that could be potentially quite big. 5 columns rather than 26 columns. * Minor thing, but will skip some CPU cycles as sqlalchemy will no longer construct an ORM object and try and keep track of changes. We know this function doesn't change any of the values to persist them back, so this is an unnecessary step from sqlalchemy. Disadvantages are: * The dao_get_letters_to_be_printed return interface is now much more tightly coupled to the get_key_and_size_of_letters_to_be_sent_to_print function that calls it. --- app/celery/letters_pdf_tasks.py | 6 +++--- app/dao/notifications_dao.py | 10 +++++++++- .../app/dao/notification_dao/test_notification_dao.py | 2 +- 3 files changed, 13 insertions(+), 5 deletions(-) diff --git a/app/celery/letters_pdf_tasks.py b/app/celery/letters_pdf_tasks.py index 5f715c97d..75c2e5c62 100644 --- a/app/celery/letters_pdf_tasks.py +++ b/app/celery/letters_pdf_tasks.py @@ -182,15 +182,15 @@ def get_key_and_size_of_letters_to_be_sent_to_print(print_run_deadline, postage) try: letter_file_name = get_letter_pdf_filename( reference=letter.reference, - crown=letter.service.crown, + crown=letter.crown, created_at=letter.created_at, - postage=letter.postage + postage=postage ) letter_head = s3.head_s3_object(current_app.config['LETTERS_PDF_BUCKET_NAME'], letter_file_name) yield { "Key": letter_file_name, "Size": letter_head['ContentLength'], - "ServiceId": str(letter.service.id) + "ServiceId": str(letter.service_id) } except BotoClientError as e: current_app.logger.exception( diff --git a/app/dao/notifications_dao.py b/app/dao/notifications_dao.py index 7c8564489..5346ed63a 100644 --- a/app/dao/notifications_dao.py +++ b/app/dao/notifications_dao.py @@ -721,7 +721,15 @@ def dao_get_letters_to_be_printed(print_run_deadline, postage): """ Return all letters created before the print run deadline that have not yet been sent """ - notifications = Notification.query.filter( + notifications = db.session.query( + Notification.id, + Notification.created_at, + Notification.reference, + Notification.service_id, + Service.crown, + ).join( + Notification.service + ).filter( Notification.created_at < convert_bst_to_utc(print_run_deadline), Notification.notification_type == LETTER_TYPE, Notification.status == NOTIFICATION_CREATED, diff --git a/tests/app/dao/notification_dao/test_notification_dao.py b/tests/app/dao/notification_dao/test_notification_dao.py index 8b8717740..3e74ca0e9 100644 --- a/tests/app/dao/notification_dao/test_notification_dao.py +++ b/tests/app/dao/notification_dao/test_notification_dao.py @@ -1694,4 +1694,4 @@ def test_letters_to_be_printed_sort_by_service(notify_db_session): results = list(dao_get_letters_to_be_printed(print_run_deadline=datetime(2020, 12, 1, 17, 30), postage='second')) assert len(results) == 3 - assert results == [notification_1, notification_2, notification_3] + assert [x.id for x in results] == [notification_1.id, notification_2.id, notification_3.id] From 3bc3ed88b300d5648379b8380ff2f719d1abda30 Mon Sep 17 00:00:00 2001 From: Leo Hemsted Date: Fri, 23 Oct 2020 20:06:24 +0100 Subject: [PATCH 2/2] use yield_per instead of limit limit means we only return 50k letters, if there are more than that for a service we'll skip them and they won't be picked up until the next day. If you remove the limit, sqlalchemy prefetches query results so it can build up ORM results, for example collapsing joined rows into single objects with chidren. SQLAlchemy streams the data into a buffer, and normally will still prefetch the entire resultset so it can ensure integrity of the session, (so that if you modify one result that is duplicated further down in the results, both rows are updated in the session for example). However, we don't care about that, but we do care about preventing the result set taking up too much memory. We can use `yield_per` to yield from sqlalchemy to the iterator (in this case the `for letter in letters_awaiting_sending` loop in letters_pdf_tasks.py) - this means every time we hit 10000 rows, we go back to the database to get the next 10k. This way, we only ever need 10k rows in memory at a time. This has some caveats, mostly around how we handle the data the query returns. They're a bit hard to parse but I'm pretty sure the notable limitations are: * It's dangerous to modify ORM objects returned by yield_per queries * It's dangerous to join in a yield_per query if you think there will be more than one row per item (for example, if you join from notification to service, there'll be multiple result rows containing the same service, and if these are split over different yield chunks, then we may experience undefined behaviour. These two limitations are focused around there being no guarantee of having one unique row per item. For more reading: https://docs.sqlalchemy.org/en/13/orm/query.html?highlight=yield_per#sqlalchemy.orm.query.Query.yield_per https://www.mail-archive.com/sqlalchemy@googlegroups.com/msg12443.html --- app/dao/notifications_dao.py | 17 +++++++++++--- .../notification_dao/test_notification_dao.py | 22 ++++++++++++++----- 2 files changed, 30 insertions(+), 9 deletions(-) diff --git a/app/dao/notifications_dao.py b/app/dao/notifications_dao.py index 5346ed63a..aa57779f7 100644 --- a/app/dao/notifications_dao.py +++ b/app/dao/notifications_dao.py @@ -717,9 +717,20 @@ def notifications_not_yet_sent(should_be_sending_after_seconds, notification_typ return notifications -def dao_get_letters_to_be_printed(print_run_deadline, postage): +def dao_get_letters_to_be_printed(print_run_deadline, postage, query_limit=10000): """ - Return all letters created before the print run deadline that have not yet been sent + Return all letters created before the print run deadline that have not yet been sent. This yields in batches of 10k + to prevent the query taking too long and eating up too much memory. As each 10k batch is yielded, the + get_key_and_size_of_letters_to_be_sent_to_print function will go and fetch the s3 data, andhese start sending off + tasks to the notify-ftp app to send them. + + CAUTION! Modify this query with caution. Modifying filters etc is fine, but if we join onto another table, then + there may be undefined behaviour. Essentially we need each ORM object returned for each row to be unique, + and we should avoid modifying state of returned objects. + + For more reading: + https://docs.sqlalchemy.org/en/13/orm/query.html?highlight=yield_per#sqlalchemy.orm.query.Query.yield_per + https://www.mail-archive.com/sqlalchemy@googlegroups.com/msg12443.html """ notifications = db.session.query( Notification.id, @@ -738,7 +749,7 @@ def dao_get_letters_to_be_printed(print_run_deadline, postage): ).order_by( Notification.service_id, Notification.created_at - ).limit(50000) + ).yield_per(query_limit) return notifications diff --git a/tests/app/dao/notification_dao/test_notification_dao.py b/tests/app/dao/notification_dao/test_notification_dao.py index 3e74ca0e9..b0e4938b2 100644 --- a/tests/app/dao/notification_dao/test_notification_dao.py +++ b/tests/app/dao/notification_dao/test_notification_dao.py @@ -1688,10 +1688,20 @@ def test_letters_to_be_printed_sort_by_service(notify_db_session): second_service = create_service(service_name='second service', service_id='642bf33b-54b5-45f2-8c13-942a46616704') first_template = create_template(service=first_service, template_type='letter', postage='second') second_template = create_template(service=second_service, template_type='letter', postage='second') - notification_1 = create_notification(template=first_template, created_at=datetime(2020, 12, 1, 9, 30)) - notification_2 = create_notification(template=first_template, created_at=datetime(2020, 12, 1, 12, 30)) - notification_3 = create_notification(template=second_template, created_at=datetime(2020, 12, 1, 8, 30)) + letters_ordered_by_service_then_time = [ + create_notification(template=first_template, created_at=datetime(2020, 12, 1, 9, 30)), + create_notification(template=first_template, created_at=datetime(2020, 12, 1, 12, 30)), + create_notification(template=first_template, created_at=datetime(2020, 12, 1, 13, 30)), + create_notification(template=first_template, created_at=datetime(2020, 12, 1, 14, 30)), + create_notification(template=first_template, created_at=datetime(2020, 12, 1, 15, 30)), + create_notification(template=second_template, created_at=datetime(2020, 12, 1, 8, 30)), + create_notification(template=second_template, created_at=datetime(2020, 12, 1, 8, 31)), + create_notification(template=second_template, created_at=datetime(2020, 12, 1, 8, 32)), + create_notification(template=second_template, created_at=datetime(2020, 12, 1, 8, 33)), + create_notification(template=second_template, created_at=datetime(2020, 12, 1, 8, 34)) + ] - results = list(dao_get_letters_to_be_printed(print_run_deadline=datetime(2020, 12, 1, 17, 30), postage='second')) - assert len(results) == 3 - assert [x.id for x in results] == [notification_1.id, notification_2.id, notification_3.id] + results = list( + dao_get_letters_to_be_printed(print_run_deadline=datetime(2020, 12, 1, 17, 30), postage='second', query_limit=4) + ) + assert [x.id for x in results] == [x.id for x in letters_ordered_by_service_then_time]