From 90e82aff3ec78fb080ed03fe6b9c1a6b73b9d7ff Mon Sep 17 00:00:00 2001 From: Leo Hemsted Date: Thu, 18 Feb 2021 11:51:38 +0000 Subject: [PATCH 1/2] properly log the lambda response correctly boto returns a `StreamingBody`[1] response rather than a json struct. We're currently just logging things like "Error calling lambda o2-1-proxy with function error " which is obviously less than ideal. Also make the tests properly reflect this - annoyingly it appears like we can't use moto to reliably test this interface as the moto `mock_lambda` decorator needs you to be running inside a docker container?? [1] https://botocore.amazonaws.com/v1/documentation/api/latest/reference/response.html#botocore.response.StreamingBody --- app/clients/cbc_proxy.py | 2 +- tests/app/clients/test_cbc_proxy.py | 11 +++-------- 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/app/clients/cbc_proxy.py b/app/clients/cbc_proxy.py index 707d6df35..3177f0a37 100644 --- a/app/clients/cbc_proxy.py +++ b/app/clients/cbc_proxy.py @@ -147,7 +147,7 @@ class CBCProxyClientBase(ABC): elif 'FunctionError' in result: current_app.logger.info( - f"Error calling lambda {lambda_name} with function error { result['Payload'] }" + f"Error calling lambda {lambda_name} with function error { result['Payload'].read() }" ) success = False diff --git a/tests/app/clients/test_cbc_proxy.py b/tests/app/clients/test_cbc_proxy.py index 62a6f4f06..320ff044d 100644 --- a/tests/app/clients/test_cbc_proxy.py +++ b/tests/app/clients/test_cbc_proxy.py @@ -1,3 +1,4 @@ +from io import BytesIO import json import uuid from collections import namedtuple @@ -380,10 +381,7 @@ def test_cbc_proxy_will_failover_to_second_lambda_if_function_error( { 'StatusCode': 200, 'FunctionError': 'Handled', - 'Payload': { - "errorMessage": "", - "errorType": "CBCNewConnectionError" - } + 'Payload': BytesIO(json.dumps({"errorMessage": "", "errorType": "CBCNewConnectionError"}).encode('utf-8')), }, { 'StatusCode': 200 @@ -522,10 +520,7 @@ def test_cbc_proxy_create_and_send_tries_failover_lambda_on_function_error_and_r ld_client_mock.invoke.return_value = { 'StatusCode': 200, 'FunctionError': 'something', - 'Payload': { - 'errorMessage': 'some message', - 'errorType': 'SomeErrorType' - } + 'Payload': BytesIO(json.dumps({"errorMessage": "some message", "errorType": "SomeErrorType"}).encode('utf-8')), } with pytest.raises(CBCProxyRetryableException) as e: From 0088bcd98b68be97e00c65512a3ed174bc93fed5 Mon Sep 17 00:00:00 2001 From: Leo Hemsted Date: Thu, 18 Feb 2021 12:02:34 +0000 Subject: [PATCH 2/2] only retry if the broadcast message task is in sending previously we would retry if the task was queued up for retry but the status is in "received-ack" or "received-err". We don't expect that a task will be retried after getting this status, but if there are duplicate tasks that could happen. Lets plan for the worst by saying "only process a retry if the task is currently in sending". this way, if a duplicate task is on retry and the first task goes through succesfully, the duplicate task will give up. --- app/__init__.py | 2 +- app/celery/broadcast_message_tasks.py | 4 ++-- tests/app/celery/test_broadcast_message_tasks.py | 13 +++++++++---- 3 files changed, 12 insertions(+), 7 deletions(-) diff --git a/app/__init__.py b/app/__init__.py index dd4d89b2c..9cc2e64fb 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -381,7 +381,7 @@ def setup_sqlalchemy_events(app): 'host': current_app.config['NOTIFY_APP_NAME'], # worker name 'url_rule': current_task.name, # task name } - # anything else. migrations possibly. + # anything else. migrations possibly, or flask cli commands. else: current_app.logger.warning('Checked out sqlalchemy connection from outside of request/task') connection_record.info['request_data'] = { diff --git a/app/celery/broadcast_message_tasks.py b/app/celery/broadcast_message_tasks.py index 1458852bc..0713ec381 100644 --- a/app/celery/broadcast_message_tasks.py +++ b/app/celery/broadcast_message_tasks.py @@ -52,11 +52,11 @@ def check_provider_message_should_send(broadcast_event, provider): """ current_provider_message = broadcast_event.get_provider_message(provider) # if this is the first time a task is being executed, it won't have a provider message yet - if current_provider_message and current_provider_message.status == BroadcastProviderMessageStatus.TECHNICAL_FAILURE: + if current_provider_message and current_provider_message.status != BroadcastProviderMessageStatus.SENDING: raise CBCProxyFatalException( f'Cannot send broadcast_event {broadcast_event.id} ' + f'to provider {provider}: ' + - 'It is already in status technical-failure' + f'It is in status {current_provider_message.status}' ) if broadcast_event.transmitted_finishes_at < datetime.utcnow(): diff --git a/tests/app/celery/test_broadcast_message_tasks.py b/tests/app/celery/test_broadcast_message_tasks.py index 5e4f167dd..8b859c10c 100644 --- a/tests/app/celery/test_broadcast_message_tasks.py +++ b/tests/app/celery/test_broadcast_message_tasks.py @@ -701,15 +701,20 @@ def test_check_provider_message_should_send_doesnt_raise_if_newer_event_not_acke @pytest.mark.parametrize('existing_message_status', [ BroadcastProviderMessageStatus.SENDING, - BroadcastProviderMessageStatus.ACK, - BroadcastProviderMessageStatus.ERR, - + pytest.param( + BroadcastProviderMessageStatus.ACK, + marks=pytest.mark.xfail(raises=CBCProxyFatalException) + ), + pytest.param( + BroadcastProviderMessageStatus.ERR, + marks=pytest.mark.xfail(raises=CBCProxyFatalException) + ), pytest.param( BroadcastProviderMessageStatus.TECHNICAL_FAILURE, marks=pytest.mark.xfail(raises=CBCProxyFatalException) ), ]) -def test_check_provider_message_should_send_doesnt_raise_if_current_event_already_has_provider_message( +def test_check_provider_message_should_send_raises_if_current_event_already_has_provider_message_not_in_sending( sample_template, existing_message_status ):