Revert "Merge pull request #3101 from alphagov/retry-broadcasts"

This reverts commit 1bd99c779d, reversing
changes made to d390eb2cac.
This commit is contained in:
Leo Hemsted
2021-02-08 11:02:34 +00:00
parent 49e6ec1ead
commit bee0059e53
8 changed files with 60 additions and 421 deletions

View File

@@ -56,7 +56,7 @@ def _update_broadcast_message(broadcast_message, new_status, updating_user):
f'User {updating_user.id} cannot approve their own broadcast_message {broadcast_message.id}',
status_code=400
)
elif len(broadcast_message.areas['simple_polygons']) == 0:
elif not broadcast_message.areas:
raise InvalidRequest(
f'broadcast_message {broadcast_message.id} has no selected areas and so cannot be broadcasted.',
status_code=400

View File

@@ -1,94 +1,17 @@
import uuid
from datetime import datetime
from flask import current_app
from notifications_utils.statsd_decorators import statsd
from sqlalchemy.schema import Sequence
from celery.exceptions import MaxRetriesExceededError
from app import cbc_proxy_client, db, notify_celery
from app.clients.cbc_proxy import CBCProxyFatalException, CBCProxyRetryableException
from app.config import QueueNames
from app.models import BroadcastEventMessageType, BroadcastProvider, BroadcastProviderMessageStatus
from app.dao.broadcast_message_dao import (
dao_get_broadcast_event_by_id,
create_broadcast_provider_message,
update_broadcast_provider_message_status
)
from app.models import BroadcastEventMessageType, BroadcastProvider
from app.dao.broadcast_message_dao import dao_get_broadcast_event_by_id, create_broadcast_provider_message
from app.utils import format_sequential_number
def get_retry_delay(retry_count):
"""
Given a count of retries so far, return a delay for the next one.
`retry_count` should be 0 the first time a task fails.
"""
# TODO: replace with celery's built in exponential backoff
# 2 to the power of x. 1, 2, 4, 8, 16, 32, ...
delay = 2**retry_count
# never wait longer than 5 minutes
return min(delay, 300)
def check_provider_message_should_send(broadcast_event, provider):
"""
If any previous event hasn't sent yet for that provider, then we shouldn't send the current event. Instead, fail and
raise a P1 - so that a notify team member can assess the state of the previous messages, and if necessary, can
replay the `send_broadcast_provider_message` task if the previous message has now been sent.
Note: This is called before the new broadcast_provider_message is created.
# Help, I've come across this code following a pagerduty alert, what should I do?
1. Find the failing broadcast_provider_message associated with the previous event that caused this to trip.
2. If that provider message is still failing to send, fix the issue causing that. The task to send that previous
message might still be retrying in the background - look for logs related to that task.
3. If that provider message has sent succesfully, you might need to send this task off depending on context. This
might not always be true though, for example, it may not be necessary to send a cancel if the original alert has
already expired.
4. If you need to re-send this task off again, you'll need to run the following command on paas:
`send_broadcast_provider_message.apply_async(args=(broadcast_event_id, provider), queue=QueueNames.BROADCASTS)`
"""
if broadcast_event.transmitted_finishes_at < datetime.utcnow():
# TODO: This should be a different kind of exception to distinguish "We should know something went wrong, but
# no immediate action" from "We need to fix this immediately"
raise CBCProxyFatalException(
f'Cannot send broadcast_event {broadcast_event.id} ' +
f'to provider {provider}: ' +
f'The expiry time of {broadcast_event.transmitted_finishes_at} has already passed'
)
# get events sorted from earliest to latest
events = sorted(broadcast_event.broadcast_message.events, key=lambda x: x.sent_at)
for prev_event in events:
if prev_event.id != broadcast_event.id and prev_event.sent_at < broadcast_event.sent_at:
# get the record from when that event was sent to the same provider
prev_provider_message = prev_event.get_provider_message(provider)
# the previous message hasn't even got round to running `send_broadcast_provider_message` yet.
if not prev_provider_message:
raise CBCProxyFatalException(
f'Cannot send {broadcast_event.id}. Previous event {prev_event.id} ' +
f'(type {prev_event.message_type}) has no provider_message for provider {provider} yet.\n' +
f'You must ensure that the other event sends succesfully, then manually kick off this event ' +
f'again by re-running send_broadcast_provider_message for this event and provider.'
)
# if there's a previous message that has started but not finished sending (whether it fatally errored or is
# currently retrying)
if prev_provider_message.status != BroadcastProviderMessageStatus.ACK:
raise CBCProxyFatalException(
f'Cannot send {broadcast_event.id}. Previous event {prev_event.id} ' +
f'(type {prev_event.message_type}) has not finished sending to provider {provider} yet.\n' +
f'It is currently in status "{prev_provider_message.status}".\n' +
f'You must ensure that the other event sends succesfully, then manually kick off this event ' +
f'again by re-running send_broadcast_provider_message for this event and provider.'
)
@notify_celery.task(name="send-broadcast-event")
@statsd(namespace="tasks")
def send_broadcast_event(broadcast_event_id):
@@ -104,19 +27,12 @@ def send_broadcast_event(broadcast_event_id):
)
# max_retries=None: retry forever
@notify_celery.task(bind=True, name="send-broadcast-provider-message", max_retries=None)
@notify_celery.task(name="send-broadcast-provider-message")
@statsd(namespace="tasks")
def send_broadcast_provider_message(self, broadcast_event_id, provider):
def send_broadcast_provider_message(broadcast_event_id, provider):
broadcast_event = dao_get_broadcast_event_by_id(broadcast_event_id)
check_provider_message_should_send(broadcast_event, provider)
# the broadcast_provider_message may already exist if we retried previously
broadcast_provider_message = broadcast_event.get_provider_message(provider)
if broadcast_provider_message is None:
broadcast_provider_message = create_broadcast_provider_message(broadcast_event, provider)
formatted_message_number = None
if provider == BroadcastProvider.VODAFONE:
formatted_message_number = format_sequential_number(broadcast_provider_message.message_number)
@@ -138,7 +54,6 @@ def send_broadcast_provider_message(self, broadcast_event_id, provider):
cbc_proxy_provider_client = cbc_proxy_client.get_proxy(provider)
try:
if broadcast_event.message_type == BroadcastEventMessageType.ALERT:
cbc_proxy_provider_client.create_and_send_broadcast(
identifier=str(broadcast_provider_message.id),
@@ -175,20 +90,6 @@ def send_broadcast_provider_message(self, broadcast_event_id, provider):
previous_provider_messages=broadcast_event.get_earlier_provider_messages(provider),
sent=broadcast_event.sent_at_as_cap_datetime_string,
)
except CBCProxyRetryableException as exc:
delay = get_retry_delay(self.request.retries)
current_app.logger.exception(
f'Retrying send_broadcast_provider_message for broadcast_event {broadcast_event_id} and ' +
f'provider {provider} in {delay} seconds'
)
self.retry(
exc=exc,
countdown=delay,
queue=QueueNames.BROADCASTS,
)
update_broadcast_provider_message_status(broadcast_provider_message, status=BroadcastProviderMessageStatus.ACK)
@notify_celery.task(name='trigger-link-test')

View File

@@ -25,11 +25,7 @@ from app.utils import DATETIME_FORMAT, format_sequential_number
# the preceeding Alert message in the previous_provider_messages field
class CBCProxyFatalException(Exception):
pass
class CBCProxyRetryableException(Exception):
class CBCProxyException(Exception):
pass
@@ -119,9 +115,7 @@ class CBCProxyClientBase(ABC):
if not result:
failover_result = self._invoke_lambda(self.failover_lambda_name, payload)
if not failover_result:
raise CBCProxyRetryableException(
f'Lambda failed for both {self.lambda_name} and {self.failover_lambda_name}'
)
raise CBCProxyException(f'Lambda failed for both {self.lambda_name} and {self.failover_lambda_name}')
return result

View File

@@ -61,8 +61,3 @@ def create_broadcast_provider_message(broadcast_event, provider):
db.session.add(provider_message_number)
db.session.commit()
return provider_message
@transactional
def update_broadcast_provider_message_status(broadcast_provider_message, *, status):
broadcast_provider_message.status = status

View File

@@ -1,9 +1,7 @@
import uuid
from datetime import datetime
from unittest.mock import call, ANY
from freezegun import freeze_time
from celery.exceptions import Retry
import pytest
from app.models import (
@@ -14,14 +12,7 @@ from app.models import (
ServiceBroadcastProviderRestriction,
ServiceBroadcastSettings,
)
from app.clients.cbc_proxy import CBCProxyRetryableException, CBCProxyFatalException
from app.celery.broadcast_message_tasks import (
check_provider_message_should_send,
get_retry_delay,
send_broadcast_event,
send_broadcast_provider_message,
trigger_link_test,
)
from app.celery.broadcast_message_tasks import send_broadcast_event, send_broadcast_provider_message, trigger_link_test
from tests.app.db import (
create_template,
@@ -147,7 +138,7 @@ def test_send_broadcast_provider_message_sends_data_correctly(
send_broadcast_provider_message(provider=provider, broadcast_event_id=str(event.id))
broadcast_provider_message = event.get_provider_message(provider)
assert broadcast_provider_message.status == BroadcastProviderMessageStatus.ACK
assert broadcast_provider_message.status == BroadcastProviderMessageStatus.SENDING
mock_create_broadcast.assert_called_once_with(
identifier=str(broadcast_provider_message.id),
@@ -254,48 +245,6 @@ def test_send_broadcast_provider_message_defaults_to_test_channel_if_no_service_
)
def test_send_broadcast_provider_message_works_if_we_retried_previously(mocker, sample_service):
template = create_template(sample_service, BROADCAST_TYPE)
broadcast_message = create_broadcast_message(
template,
areas={'areas': [], 'simple_polygons': [],},
status=BroadcastStatusType.BROADCASTING
)
event = create_broadcast_event(broadcast_message)
# an existing provider message already exists, and previously failed
existing_provider_message = create_broadcast_provider_message(
broadcast_event=event,
provider='ee',
status=BroadcastProviderMessageStatus.SENDING
)
mock_create_broadcast = mocker.patch(
f'app.clients.cbc_proxy.CBCProxyEE.create_and_send_broadcast',
)
send_broadcast_provider_message(provider='ee', broadcast_event_id=str(event.id))
# make sure we haven't completed a duplicate event - we shouldn't record the failure
assert len(event.provider_messages) == 1
broadcast_provider_message = event.get_provider_message('ee')
assert broadcast_provider_message.status == BroadcastProviderMessageStatus.ACK
assert broadcast_provider_message.updated_at is not None
mock_create_broadcast.assert_called_once_with(
identifier=str(broadcast_provider_message.id),
message_number=mocker.ANY,
headline='GOV.UK Notify Broadcast',
description='this is an emergency broadcast message',
areas=[],
sent=event.sent_at_as_cap_datetime_string,
expires=event.transmitted_finishes_at_as_cap_datetime_string,
channel='test',
)
@freeze_time('2020-08-01 12:00')
@pytest.mark.parametrize('provider,provider_capitalised', [
['ee', 'EE'],
@@ -364,7 +313,7 @@ def test_send_broadcast_provider_message_sends_update_with_references(
)
alert_event = create_broadcast_event(broadcast_message, message_type=BroadcastEventMessageType.ALERT)
create_broadcast_provider_message(alert_event, provider, status=BroadcastProviderMessageStatus.ACK)
create_broadcast_provider_message(alert_event, provider)
update_event = create_broadcast_event(broadcast_message, message_type=BroadcastEventMessageType.UPDATE)
mock_update_broadcast = mocker.patch(
@@ -374,7 +323,7 @@ def test_send_broadcast_provider_message_sends_update_with_references(
send_broadcast_provider_message(provider=provider, broadcast_event_id=str(update_event.id))
broadcast_provider_message = update_event.get_provider_message(provider)
assert broadcast_provider_message.status == BroadcastProviderMessageStatus.ACK
assert broadcast_provider_message.status == BroadcastProviderMessageStatus.SENDING
mock_update_broadcast.assert_called_once_with(
identifier=str(broadcast_provider_message.id),
@@ -419,8 +368,8 @@ def test_send_broadcast_provider_message_sends_cancel_with_references(
update_event = create_broadcast_event(broadcast_message, message_type=BroadcastEventMessageType.UPDATE)
cancel_event = create_broadcast_event(broadcast_message, message_type=BroadcastEventMessageType.CANCEL)
create_broadcast_provider_message(alert_event, provider, status=BroadcastProviderMessageStatus.ACK)
create_broadcast_provider_message(update_event, provider, status=BroadcastProviderMessageStatus.ACK)
create_broadcast_provider_message(alert_event, provider)
create_broadcast_provider_message(update_event, provider)
mock_cancel_broadcast = mocker.patch(
f'app.clients.cbc_proxy.CBCProxy{provider_capitalised}.cancel_broadcast',
@@ -429,7 +378,7 @@ def test_send_broadcast_provider_message_sends_cancel_with_references(
send_broadcast_provider_message(provider=provider, broadcast_event_id=str(cancel_event.id))
broadcast_provider_message = cancel_event.get_provider_message(provider)
assert broadcast_provider_message.status == BroadcastProviderMessageStatus.ACK
assert broadcast_provider_message.status == BroadcastProviderMessageStatus.SENDING
mock_cancel_broadcast.assert_called_once_with(
identifier=str(broadcast_provider_message.id),
@@ -466,16 +415,14 @@ def test_send_broadcast_provider_message_errors(mocker, sample_service, provider
mock_create_broadcast = mocker.patch(
f'app.clients.cbc_proxy.CBCProxy{provider_capitalised}.create_and_send_broadcast',
side_effect=CBCProxyRetryableException('oh no'),
)
mock_retry = mocker.patch(
'app.celery.broadcast_message_tasks.send_broadcast_provider_message.retry',
side_effect=Retry
side_effect=Exception('oh no'),
)
with pytest.raises(Retry):
with pytest.raises(Exception) as ex:
send_broadcast_provider_message(provider=provider, broadcast_event_id=str(event.id))
assert ex.match('oh no')
mock_create_broadcast.assert_called_once_with(
identifier=ANY,
message_number=mocker.ANY,
@@ -492,63 +439,6 @@ def test_send_broadcast_provider_message_errors(mocker, sample_service, provider
expires=event.transmitted_finishes_at_as_cap_datetime_string,
channel="test"
)
mock_retry.assert_called_once_with(
countdown=1,
exc=mock_create_broadcast.side_effect,
queue='broadcast-tasks'
)
broadcast_provider_message = event.get_provider_message(provider)
assert broadcast_provider_message.status == BroadcastProviderMessageStatus.SENDING
@pytest.mark.parametrize('num_retries, expected_countdown', [
(0, 1),
(5, 32),
(20, 300),
])
def test_send_broadcast_provider_message_delays_retry_exponentially(
mocker,
sample_service,
num_retries,
expected_countdown
):
template = create_template(sample_service, BROADCAST_TYPE)
broadcast_message = create_broadcast_message(template, status=BroadcastStatusType.BROADCASTING)
event = create_broadcast_event(broadcast_message)
mock_create_broadcast = mocker.patch(
'app.clients.cbc_proxy.CBCProxyEE.create_and_send_broadcast',
side_effect=CBCProxyRetryableException('oh no'),
)
mock_retry = mocker.patch(
'app.celery.broadcast_message_tasks.send_broadcast_provider_message.retry',
side_effect=Retry
)
# patch celery request context as shown here: https://stackoverflow.com/a/59870468
mock_celery_task_request_context = mocker.patch("celery.app.task.Task.request")
mock_celery_task_request_context.retries = num_retries
with pytest.raises(Retry):
send_broadcast_provider_message(provider='ee', broadcast_event_id=str(event.id))
mock_create_broadcast.assert_called_once_with(
identifier=ANY,
message_number=mocker.ANY,
headline="GOV.UK Notify Broadcast",
description='this is an emergency broadcast message',
areas=[],
sent=event.sent_at_as_cap_datetime_string,
expires=event.transmitted_finishes_at_as_cap_datetime_string,
channel='test',
)
mock_retry.assert_called_once_with(
countdown=expected_countdown,
exc=mock_create_broadcast.side_effect,
queue='broadcast-tasks'
)
@pytest.mark.parametrize("provider,provider_capitalised", [
@@ -581,144 +471,3 @@ def test_trigger_link_tests_invokes_cbc_proxy_client(
assert len(mock_send_link_test.mock_calls[0][1][1]) == 8
else:
assert not mock_send_link_test.mock_calls[0][1][1]
@pytest.mark.parametrize('retry_count, expected_delay', [
(0, 1),
(1, 2),
(2, 4),
(8, 256),
(9, 300),
(10, 300),
(1000, 300),
])
def test_get_retry_delay_has_capped_backoff(retry_count, expected_delay):
assert get_retry_delay(retry_count) == expected_delay
@freeze_time('2021-01-01 12:00')
def test_check_provider_message_should_send_doesnt_raise_if_event_hasnt_expired_yet(sample_template):
broadcast_message = create_broadcast_message(sample_template)
current_event = create_broadcast_event(
broadcast_message,
transmitted_starts_at=datetime(2021, 1, 1, 0, 0),
transmitted_finishes_at=datetime(2021, 1, 1, 12, 1),
)
check_provider_message_should_send(current_event, 'ee')
@freeze_time('2021-01-01 12:00')
def test_check_provider_message_should_send_raises_if_event_has_expired(sample_template):
broadcast_message = create_broadcast_message(sample_template)
current_event = create_broadcast_event(
broadcast_message,
transmitted_starts_at=datetime(2021, 1, 1, 0, 0),
transmitted_finishes_at=datetime(2021, 1, 1, 11, 59),
)
with pytest.raises(CBCProxyFatalException) as exc:
check_provider_message_should_send(current_event, 'ee')
assert 'The expiry time of 2021-01-01 11:59:00 has already passed' in str(exc.value)
@freeze_time('2021-01-01 12:00')
def test_check_provider_message_should_send_raises_if_older_event_still_sending(sample_template):
broadcast_message = create_broadcast_message(sample_template)
# event approved at midnight
past_succesful_event = create_broadcast_event(
broadcast_message,
message_type='alert',
sent_at=datetime(2021, 1, 1, 0, 0),
)
# event updated at 5am (this event is still sending)
past_still_sending_event = create_broadcast_event(
broadcast_message,
message_type='update',
sent_at=datetime(2021, 1, 1, 5, 0),
)
# event updated again at 7am
current_event = create_broadcast_event(
broadcast_message,
message_type='update',
sent_at=datetime(2021, 1, 1, 7, 0),
)
create_broadcast_provider_message(past_succesful_event, provider='ee', status=BroadcastProviderMessageStatus.ACK)
create_broadcast_provider_message(past_still_sending_event, provider='ee', status=BroadcastProviderMessageStatus.SENDING) # noqa
# we havent sent the previous update yet - it's still in sending - so don't try and send this one.
with pytest.raises(CBCProxyFatalException) as exc:
check_provider_message_should_send(current_event, 'ee')
assert f'Previous event {past_still_sending_event.id} (type update) has not finished sending to provider ee' in str(exc.value) # noqa
@freeze_time('2021-01-01 12:00')
def test_check_provider_message_should_send_raises_if_older_event_hasnt_started_sending_yet(sample_template):
broadcast_message = create_broadcast_message(sample_template)
# event approved at midnight
past_succesful_event = create_broadcast_event(
broadcast_message,
message_type='alert',
sent_at=datetime(2021, 1, 1, 0, 0),
)
# event updated at 5am
past_still_sending_event = create_broadcast_event(
broadcast_message,
message_type='update',
sent_at=datetime(2021, 1, 1, 5, 0),
)
# event updated at 7am
current_event = create_broadcast_event(
broadcast_message,
message_type='update',
sent_at=datetime(2021, 1, 1, 7, 0),
)
# no provider message for past_still_sending_event
create_broadcast_provider_message(past_succesful_event, provider='ee', status=BroadcastProviderMessageStatus.ACK)
# we shouldn't send the update now, because a previous event is still stuck in sending
with pytest.raises(CBCProxyFatalException) as exc:
check_provider_message_should_send(current_event, 'ee')
assert f'Previous event {past_still_sending_event.id} (type update) has no provider_message for provider ee' in str(exc.value) # noqa
@freeze_time('2021-01-01 12:00')
def test_check_provider_message_should_send_doesnt_raise_if_newer_event_not_acked_yet(sample_template):
broadcast_message = create_broadcast_message(sample_template)
# event approved at midnight
current_event = create_broadcast_event(
broadcast_message,
message_type='alert',
sent_at=datetime(2021, 1, 1, 0, 0),
)
future_event = create_broadcast_event(
broadcast_message,
message_type='cancel',
sent_at=datetime(2021, 1, 1, 10, 0),
)
# this doesn't raise, because the alert event got an ack. The cancel doesn't have an event yet
# but this task is only interested in the current task (the update) so doesn't worry about that
check_provider_message_should_send(current_event, 'ee')
@pytest.mark.parametrize('existing_message_status', [
BroadcastProviderMessageStatus.SENDING,
BroadcastProviderMessageStatus.ACK,
BroadcastProviderMessageStatus.ERR,
# TODO: Make this case fail - so we have a way of aborting a send if it's stuck in retry loop
BroadcastProviderMessageStatus.TECHNICAL_FAILURE,
])
def test_check_provider_message_should_send_doesnt_raise_if_current_event_already_has_provider_message(
sample_template,
existing_message_status
):
broadcast_message = create_broadcast_message(sample_template)
current_event = create_broadcast_event(broadcast_message, message_type='alert')
create_broadcast_provider_message(current_event, provider='ee', status=existing_message_status)
check_provider_message_should_send(current_event, 'ee')

View File

@@ -7,7 +7,7 @@ from unittest.mock import Mock, call
import pytest
from app.clients.cbc_proxy import (
CBCProxyClient, CBCProxyRetryableException, CBCProxyEE, CBCProxyCanary, CBCProxyVodafone, CBCProxyThree, CBCProxyO2
CBCProxyClient, CBCProxyException, CBCProxyEE, CBCProxyCanary, CBCProxyVodafone, CBCProxyThree, CBCProxyO2
)
from app.utils import DATETIME_FORMAT
@@ -433,7 +433,7 @@ def test_cbc_proxy_create_and_send_tries_failover_lambda_on_invoke_error_and_rai
'StatusCode': 400,
}
with pytest.raises(CBCProxyRetryableException) as e:
with pytest.raises(CBCProxyException) as e:
cbc_proxy.create_and_send_broadcast(
identifier='my-identifier',
message_number='0000007b',
@@ -482,7 +482,7 @@ def test_cbc_proxy_create_and_send_tries_failover_lambda_on_function_error_and_r
}
}
with pytest.raises(CBCProxyRetryableException) as e:
with pytest.raises(CBCProxyException) as e:
cbc_proxy.create_and_send_broadcast(
identifier='my-identifier',
message_number='0000007b',

View File

@@ -1049,7 +1049,7 @@ def create_broadcast_message(
starts_at=starts_at,
finishes_at=finishes_at,
created_by_id=created_by.id if created_by else service.created_by_id,
areas=areas or {'areas': [], 'simple_polygons': []},
areas=areas or {},
content=content,
stubbed=stubbed
)
@@ -1077,7 +1077,7 @@ def create_broadcast_event(
transmitted_areas=transmitted_areas or broadcast_message.areas,
transmitted_sender=transmitted_sender or 'www.notifications.service.gov.uk',
transmitted_starts_at=transmitted_starts_at,
transmitted_finishes_at=transmitted_finishes_at or datetime.utcnow() + timedelta(hours=24),
transmitted_finishes_at=transmitted_finishes_at or datetime.utcnow(),
)
db.session.add(b_e)
db.session.commit()
@@ -1105,4 +1105,4 @@ def create_broadcast_provider_message(
broadcast_provider_message_id=broadcast_provider_message_id)
db.session.add(provider_message_number)
db.session.commit()
return provider_message
return provider_message, provider_message_number