From b1636b7a1a3472208d60068aa3339b43a9bc5435 Mon Sep 17 00:00:00 2001 From: Leo Hemsted Date: Tue, 1 Mar 2022 15:07:57 +0000 Subject: [PATCH] split returned letters tasks into a max count of returned letters if we have too many returned letters, we'll exceed SQS's max task size of 256kb. Cap it to 5000 - this is probably a bit conservative but follows the initial values we used when implementing this for the collate-letters-task[^1]. Also follow the pattern of compressing the sqs payload just to reduce it a little more. [^1]: https://github.com/alphagov/notifications-api/pull/1536 --- app/letters/rest.py | 14 +++++++++++--- tests/app/letters/test_returned_letters.py | 21 ++++++++++++++++++++- 2 files changed, 31 insertions(+), 4 deletions(-) diff --git a/app/letters/rest.py b/app/letters/rest.py index e8e724b06..55bf3d2fe 100644 --- a/app/letters/rest.py +++ b/app/letters/rest.py @@ -9,11 +9,19 @@ from app.v2.errors import register_errors letter_job = Blueprint("letter-job", __name__) register_errors(letter_job) +# too many references will make SQS error (as the task can only be 256kb) +MAX_REFERENCES_PER_TASK = 5000 + @letter_job.route('/letters/returned', methods=['POST']) def create_process_returned_letters_job(): - references = validate(request.get_json(), letter_references) + references = validate(request.get_json(), letter_references)['references'] - process_returned_letters_list.apply_async([references['references']], queue=QueueNames.DATABASE) + for start_index in range(0, len(references), MAX_REFERENCES_PER_TASK): + process_returned_letters_list.apply_async( + args=(references[start_index:start_index + MAX_REFERENCES_PER_TASK], ), + queue=QueueNames.DATABASE, + compression='zlib' + ) - return jsonify(references=references['references']), 200 + return jsonify(references=references), 200 diff --git a/tests/app/letters/test_returned_letters.py b/tests/app/letters/test_returned_letters.py index a5e6dc6f9..cdda37c33 100644 --- a/tests/app/letters/test_returned_letters.py +++ b/tests/app/letters/test_returned_letters.py @@ -20,4 +20,23 @@ def test_process_returned_letters(status, references, admin_request, mocker): if status != 200: assert '{} does not match'.format(references[0]) in response['errors'][0]['message'] else: - mock_celery.assert_called_once_with([references], queue='database-tasks') + mock_celery.assert_called_once_with(args=(references,), queue='database-tasks', compression='zlib') + + +def test_process_returned_letters_splits_tasks_up(admin_request, mocker): + mock_celery = mocker.patch("app.letters.rest.process_returned_letters_list.apply_async") + mocker.patch("app.letters.rest.MAX_REFERENCES_PER_TASK", 3) + + references = [f'{x:016}' for x in range(10)] + + admin_request.post( + 'letter-job.create_process_returned_letters_job', + _data={"references": references}, + ) + + assert mock_celery.call_count == 4 + + assert mock_celery.call_args_list[0][1]['args'][0] == ['0000000000000000', '0000000000000001', '0000000000000002'] + assert mock_celery.call_args_list[1][1]['args'][0] == ['0000000000000003', '0000000000000004', '0000000000000005'] + assert mock_celery.call_args_list[2][1]['args'][0] == ['0000000000000006', '0000000000000007', '0000000000000008'] + assert mock_celery.call_args_list[3][1]['args'][0] == ['0000000000000009']