Merge pull request #3137 from alphagov/revert-revert-revert

Bring back retry logic
This commit is contained in:
Leo Hemsted
2021-02-15 12:21:13 +00:00
committed by GitHub
9 changed files with 492 additions and 69 deletions

View File

@@ -1,7 +1,9 @@
import uuid
from datetime import datetime
from unittest.mock import call, ANY
from freezegun import freeze_time
from celery.exceptions import Retry
import pytest
from app.models import (
@@ -11,7 +13,14 @@ from app.models import (
BroadcastProviderMessageStatus,
ServiceBroadcastSettings,
)
from app.celery.broadcast_message_tasks import send_broadcast_event, send_broadcast_provider_message, trigger_link_test
from app.clients.cbc_proxy import CBCProxyRetryableException, CBCProxyFatalException
from app.celery.broadcast_message_tasks import (
check_provider_message_should_send,
get_retry_delay,
send_broadcast_event,
send_broadcast_provider_message,
trigger_link_test,
)
from tests.app.db import (
create_template,
@@ -133,7 +142,7 @@ def test_send_broadcast_provider_message_sends_data_correctly(
send_broadcast_provider_message(provider=provider, broadcast_event_id=str(event.id))
broadcast_provider_message = event.get_provider_message(provider)
assert broadcast_provider_message.status == BroadcastProviderMessageStatus.SENDING
assert broadcast_provider_message.status == BroadcastProviderMessageStatus.ACK
mock_create_broadcast.assert_called_once_with(
identifier=str(broadcast_provider_message.id),
@@ -240,6 +249,48 @@ def test_send_broadcast_provider_message_defaults_to_test_channel_if_no_service_
)
def test_send_broadcast_provider_message_works_if_we_retried_previously(mocker, sample_service):
template = create_template(sample_service, BROADCAST_TYPE)
broadcast_message = create_broadcast_message(
template,
areas={'areas': [], 'simple_polygons': [],},
status=BroadcastStatusType.BROADCASTING
)
event = create_broadcast_event(broadcast_message)
# an existing provider message already exists, and previously failed
existing_provider_message = create_broadcast_provider_message(
broadcast_event=event,
provider='ee',
status=BroadcastProviderMessageStatus.SENDING
)
mock_create_broadcast = mocker.patch(
f'app.clients.cbc_proxy.CBCProxyEE.create_and_send_broadcast',
)
send_broadcast_provider_message(provider='ee', broadcast_event_id=str(event.id))
# make sure we haven't completed a duplicate event - we shouldn't record the failure
assert len(event.provider_messages) == 1
broadcast_provider_message = event.get_provider_message('ee')
assert broadcast_provider_message.status == BroadcastProviderMessageStatus.ACK
assert broadcast_provider_message.updated_at is not None
mock_create_broadcast.assert_called_once_with(
identifier=str(broadcast_provider_message.id),
message_number=mocker.ANY,
headline='GOV.UK Notify Broadcast',
description='this is an emergency broadcast message',
areas=[],
sent=event.sent_at_as_cap_datetime_string,
expires=event.transmitted_finishes_at_as_cap_datetime_string,
channel='test',
)
@freeze_time('2020-08-01 12:00')
@pytest.mark.parametrize('provider,provider_capitalised', [
['ee', 'EE'],
@@ -308,7 +359,7 @@ def test_send_broadcast_provider_message_sends_update_with_references(
)
alert_event = create_broadcast_event(broadcast_message, message_type=BroadcastEventMessageType.ALERT)
create_broadcast_provider_message(alert_event, provider)
create_broadcast_provider_message(alert_event, provider, status=BroadcastProviderMessageStatus.ACK)
update_event = create_broadcast_event(broadcast_message, message_type=BroadcastEventMessageType.UPDATE)
mock_update_broadcast = mocker.patch(
@@ -318,7 +369,7 @@ def test_send_broadcast_provider_message_sends_update_with_references(
send_broadcast_provider_message(provider=provider, broadcast_event_id=str(update_event.id))
broadcast_provider_message = update_event.get_provider_message(provider)
assert broadcast_provider_message.status == BroadcastProviderMessageStatus.SENDING
assert broadcast_provider_message.status == BroadcastProviderMessageStatus.ACK
mock_update_broadcast.assert_called_once_with(
identifier=str(broadcast_provider_message.id),
@@ -363,8 +414,8 @@ def test_send_broadcast_provider_message_sends_cancel_with_references(
update_event = create_broadcast_event(broadcast_message, message_type=BroadcastEventMessageType.UPDATE)
cancel_event = create_broadcast_event(broadcast_message, message_type=BroadcastEventMessageType.CANCEL)
create_broadcast_provider_message(alert_event, provider)
create_broadcast_provider_message(update_event, provider)
create_broadcast_provider_message(alert_event, provider, status=BroadcastProviderMessageStatus.ACK)
create_broadcast_provider_message(update_event, provider, status=BroadcastProviderMessageStatus.ACK)
mock_cancel_broadcast = mocker.patch(
f'app.clients.cbc_proxy.CBCProxy{provider_capitalised}.cancel_broadcast',
@@ -373,7 +424,7 @@ def test_send_broadcast_provider_message_sends_cancel_with_references(
send_broadcast_provider_message(provider=provider, broadcast_event_id=str(cancel_event.id))
broadcast_provider_message = cancel_event.get_provider_message(provider)
assert broadcast_provider_message.status == BroadcastProviderMessageStatus.SENDING
assert broadcast_provider_message.status == BroadcastProviderMessageStatus.ACK
mock_cancel_broadcast.assert_called_once_with(
identifier=str(broadcast_provider_message.id),
@@ -410,14 +461,16 @@ def test_send_broadcast_provider_message_errors(mocker, sample_service, provider
mock_create_broadcast = mocker.patch(
f'app.clients.cbc_proxy.CBCProxy{provider_capitalised}.create_and_send_broadcast',
side_effect=Exception('oh no'),
side_effect=CBCProxyRetryableException('oh no'),
)
mock_retry = mocker.patch(
'app.celery.broadcast_message_tasks.send_broadcast_provider_message.retry',
side_effect=Retry
)
with pytest.raises(Exception) as ex:
with pytest.raises(Retry):
send_broadcast_provider_message(provider=provider, broadcast_event_id=str(event.id))
assert ex.match('oh no')
mock_create_broadcast.assert_called_once_with(
identifier=ANY,
message_number=mocker.ANY,
@@ -434,6 +487,63 @@ def test_send_broadcast_provider_message_errors(mocker, sample_service, provider
expires=event.transmitted_finishes_at_as_cap_datetime_string,
channel="test"
)
mock_retry.assert_called_once_with(
countdown=1,
exc=mock_create_broadcast.side_effect,
queue='broadcast-tasks'
)
broadcast_provider_message = event.get_provider_message(provider)
assert broadcast_provider_message.status == BroadcastProviderMessageStatus.SENDING
@pytest.mark.parametrize('num_retries, expected_countdown', [
(0, 1),
(5, 32),
(20, 240),
])
def test_send_broadcast_provider_message_delays_retry_exponentially(
mocker,
sample_service,
num_retries,
expected_countdown
):
template = create_template(sample_service, BROADCAST_TYPE)
broadcast_message = create_broadcast_message(template, status=BroadcastStatusType.BROADCASTING)
event = create_broadcast_event(broadcast_message)
mock_create_broadcast = mocker.patch(
'app.clients.cbc_proxy.CBCProxyEE.create_and_send_broadcast',
side_effect=CBCProxyRetryableException('oh no'),
)
mock_retry = mocker.patch(
'app.celery.broadcast_message_tasks.send_broadcast_provider_message.retry',
side_effect=Retry
)
# patch celery request context as shown here: https://stackoverflow.com/a/59870468
mock_celery_task_request_context = mocker.patch("celery.app.task.Task.request")
mock_celery_task_request_context.retries = num_retries
with pytest.raises(Retry):
send_broadcast_provider_message(provider='ee', broadcast_event_id=str(event.id))
mock_create_broadcast.assert_called_once_with(
identifier=ANY,
message_number=mocker.ANY,
headline="GOV.UK Notify Broadcast",
description='this is an emergency broadcast message',
areas=[],
sent=event.sent_at_as_cap_datetime_string,
expires=event.transmitted_finishes_at_as_cap_datetime_string,
channel='test',
)
mock_retry.assert_called_once_with(
countdown=expected_countdown,
exc=mock_create_broadcast.side_effect,
queue='broadcast-tasks'
)
@pytest.mark.parametrize("provider,provider_capitalised", [
@@ -466,3 +576,146 @@ def test_trigger_link_tests_invokes_cbc_proxy_client(
assert len(mock_send_link_test.mock_calls[0][1][1]) == 8
else:
assert not mock_send_link_test.mock_calls[0][1][1]
@pytest.mark.parametrize('retry_count, expected_delay', [
(0, 1),
(1, 2),
(2, 4),
(7, 128),
(8, 240),
(9, 240),
(1000, 240),
])
def test_get_retry_delay_has_capped_backoff(retry_count, expected_delay):
assert get_retry_delay(retry_count) == expected_delay
@freeze_time('2021-01-01 12:00')
def test_check_provider_message_should_send_doesnt_raise_if_event_hasnt_expired_yet(sample_template):
broadcast_message = create_broadcast_message(sample_template)
current_event = create_broadcast_event(
broadcast_message,
transmitted_starts_at=datetime(2021, 1, 1, 0, 0),
transmitted_finishes_at=datetime(2021, 1, 1, 12, 1),
)
check_provider_message_should_send(current_event, 'ee')
@freeze_time('2021-01-01 12:00')
def test_check_provider_message_should_send_raises_if_event_has_expired(sample_template):
broadcast_message = create_broadcast_message(sample_template)
current_event = create_broadcast_event(
broadcast_message,
transmitted_starts_at=datetime(2021, 1, 1, 0, 0),
transmitted_finishes_at=datetime(2021, 1, 1, 11, 59),
)
with pytest.raises(CBCProxyFatalException) as exc:
check_provider_message_should_send(current_event, 'ee')
assert 'The expiry time of 2021-01-01 11:59:00 has already passed' in str(exc.value)
@freeze_time('2021-01-01 12:00')
def test_check_provider_message_should_send_raises_if_older_event_still_sending(sample_template):
broadcast_message = create_broadcast_message(sample_template)
# event approved at midnight
past_succesful_event = create_broadcast_event(
broadcast_message,
message_type='alert',
sent_at=datetime(2021, 1, 1, 0, 0),
)
# event updated at 5am (this event is still sending)
past_still_sending_event = create_broadcast_event(
broadcast_message,
message_type='update',
sent_at=datetime(2021, 1, 1, 5, 0),
)
# event updated again at 7am
current_event = create_broadcast_event(
broadcast_message,
message_type='update',
sent_at=datetime(2021, 1, 1, 7, 0),
)
create_broadcast_provider_message(past_succesful_event, provider='ee', status=BroadcastProviderMessageStatus.ACK)
create_broadcast_provider_message(past_still_sending_event, provider='ee', status=BroadcastProviderMessageStatus.SENDING) # noqa
# we havent sent the previous update yet - it's still in sending - so don't try and send this one.
with pytest.raises(CBCProxyFatalException) as exc:
check_provider_message_should_send(current_event, 'ee')
assert f'Previous event {past_still_sending_event.id} (type update) has not finished sending to provider ee' in str(exc.value) # noqa
@freeze_time('2021-01-01 12:00')
def test_check_provider_message_should_send_raises_if_older_event_hasnt_started_sending_yet(sample_template):
broadcast_message = create_broadcast_message(sample_template)
# event approved at midnight
past_succesful_event = create_broadcast_event(
broadcast_message,
message_type='alert',
sent_at=datetime(2021, 1, 1, 0, 0),
)
# event updated at 5am
past_still_sending_event = create_broadcast_event(
broadcast_message,
message_type='update',
sent_at=datetime(2021, 1, 1, 5, 0),
)
# event updated at 7am
current_event = create_broadcast_event(
broadcast_message,
message_type='update',
sent_at=datetime(2021, 1, 1, 7, 0),
)
# no provider message for past_still_sending_event
create_broadcast_provider_message(past_succesful_event, provider='ee', status=BroadcastProviderMessageStatus.ACK)
# we shouldn't send the update now, because a previous event is still stuck in sending
with pytest.raises(CBCProxyFatalException) as exc:
check_provider_message_should_send(current_event, 'ee')
assert f'Previous event {past_still_sending_event.id} (type update) has no provider_message for provider ee' in str(exc.value) # noqa
@freeze_time('2021-01-01 12:00')
def test_check_provider_message_should_send_doesnt_raise_if_newer_event_not_acked_yet(sample_template):
broadcast_message = create_broadcast_message(sample_template)
# event approved at midnight
current_event = create_broadcast_event(
broadcast_message,
message_type='alert',
sent_at=datetime(2021, 1, 1, 0, 0),
)
future_event = create_broadcast_event(
broadcast_message,
message_type='cancel',
sent_at=datetime(2021, 1, 1, 10, 0),
)
# this doesn't raise, because the alert event got an ack. The cancel doesn't have an event yet
# but this task is only interested in the current task (the update) so doesn't worry about that
check_provider_message_should_send(current_event, 'ee')
@pytest.mark.parametrize('existing_message_status', [
BroadcastProviderMessageStatus.SENDING,
BroadcastProviderMessageStatus.ACK,
BroadcastProviderMessageStatus.ERR,
pytest.param(
BroadcastProviderMessageStatus.TECHNICAL_FAILURE,
marks=pytest.mark.xfail(raises=CBCProxyFatalException)
),
])
def test_check_provider_message_should_send_doesnt_raise_if_current_event_already_has_provider_message(
sample_template,
existing_message_status
):
broadcast_message = create_broadcast_message(sample_template)
current_event = create_broadcast_event(broadcast_message, message_type='alert')
create_broadcast_provider_message(current_event, provider='ee', status=existing_message_status)
check_provider_message_should_send(current_event, 'ee')

View File

@@ -4,10 +4,11 @@ from collections import namedtuple
from datetime import datetime
from unittest.mock import Mock, call
from botocore.exceptions import ClientError as BotoClientError
import pytest
from app.clients.cbc_proxy import (
CBCProxyClient, CBCProxyException, CBCProxyEE, CBCProxyCanary, CBCProxyVodafone, CBCProxyThree, CBCProxyO2
CBCProxyClient, CBCProxyRetryableException, CBCProxyEE, CBCProxyCanary, CBCProxyVodafone, CBCProxyThree, CBCProxyO2
)
from app.utils import DATETIME_FORMAT
@@ -316,6 +317,50 @@ def test_cbc_proxy_vodafone_cancel_invokes_function(mocker, cbc_proxy_vodafone):
assert payload['sent'] == sent
@pytest.mark.parametrize('cbc', ['ee', 'vodafone', 'three', 'o2'])
def test_cbc_proxy_will_failover_to_second_lambda_if_boto_client_error(
mocker,
cbc_proxy_client,
cbc
):
cbc_proxy = cbc_proxy_client.get_proxy(cbc)
ld_client_mock = mocker.patch.object(
cbc_proxy,
'_lambda_client',
create=True,
)
ld_client_mock.invoke.side_effect = BotoClientError({}, 'error')
with pytest.raises(CBCProxyRetryableException) as e:
cbc_proxy.create_and_send_broadcast(
identifier='my-identifier',
message_number='0000007b',
headline='my-headline',
description='test-description',
areas=EXAMPLE_AREAS,
sent='a-passed-through-sent-value',
expires='a-passed-through-expires-value',
channel="severe",
)
assert e.match(f'Lambda failed for both {cbc}-1-proxy and {cbc}-2-proxy')
assert ld_client_mock.invoke.call_args_list == [
call(
FunctionName=f'{cbc}-1-proxy',
InvocationType='RequestResponse',
Payload=mocker.ANY,
),
call(
FunctionName=f'{cbc}-2-proxy',
InvocationType='RequestResponse',
Payload=mocker.ANY,
)
]
@pytest.mark.parametrize('cbc', ['ee', 'vodafone', 'three', 'o2'])
def test_cbc_proxy_will_failover_to_second_lambda_if_function_error(
mocker,
@@ -433,7 +478,7 @@ def test_cbc_proxy_create_and_send_tries_failover_lambda_on_invoke_error_and_rai
'StatusCode': 400,
}
with pytest.raises(CBCProxyException) as e:
with pytest.raises(CBCProxyRetryableException) as e:
cbc_proxy.create_and_send_broadcast(
identifier='my-identifier',
message_number='0000007b',
@@ -482,7 +527,7 @@ def test_cbc_proxy_create_and_send_tries_failover_lambda_on_function_error_and_r
}
}
with pytest.raises(CBCProxyException) as e:
with pytest.raises(CBCProxyRetryableException) as e:
cbc_proxy.create_and_send_broadcast(
identifier='my-identifier',
message_number='0000007b',

View File

@@ -1049,7 +1049,7 @@ def create_broadcast_message(
starts_at=starts_at,
finishes_at=finishes_at,
created_by_id=created_by.id if created_by else service.created_by_id,
areas=areas or {},
areas=areas or {'areas': [], 'simple_polygons': []},
content=content,
stubbed=stubbed
)
@@ -1077,7 +1077,7 @@ def create_broadcast_event(
transmitted_areas=transmitted_areas or broadcast_message.areas,
transmitted_sender=transmitted_sender or 'www.notifications.service.gov.uk',
transmitted_starts_at=transmitted_starts_at,
transmitted_finishes_at=transmitted_finishes_at or datetime.utcnow(),
transmitted_finishes_at=transmitted_finishes_at or datetime.utcnow() + timedelta(hours=24),
)
db.session.add(b_e)
db.session.commit()
@@ -1105,4 +1105,4 @@ def create_broadcast_provider_message(
broadcast_provider_message_id=broadcast_provider_message_id)
db.session.add(provider_message_number)
db.session.commit()
return provider_message, provider_message_number
return provider_message