check provider message status etc when sending rather than when retrying

previously if we were deciding whether to retry or not, it meant that
future events wouldn't have context of what the task is doing. We'd
run into issues with not knowing what references to include when
updating/cancelling in future events.

Instead of deciding whether to retry or not, always retry. Instead, when
any event sends, regardless of whether it is a first time or a retry,
check the status of previous events for that broadcast message. There
are a few things that will mean we don't send.

* If the finishes_at time has already elapsed (ie: we have been trying
  to resend this message and haven't had any luck and now the data is
  obselete)
* A previous event has no provider message (this means that we never
  picked the previous event off the queue for some reason)
* A previous event has a provider message that has anything other than
  an ack response. This includes sending (the old message is currently
  being sent), and technical-failure/returned-error (the old message is
  currently in the retry loop, having experienced issues).
This commit is contained in:
Leo Hemsted
2021-02-01 18:54:01 +00:00
parent 96a0935d1c
commit bbae209200
3 changed files with 145 additions and 64 deletions

View File

@@ -32,24 +32,60 @@ def get_retry_delay(retry_count):
return min(delay, 300)
def check_provider_message_should_retry(broadcast_provider_message):
this_event = broadcast_provider_message.broadcast_event
def check_provider_message_should_send(broadcast_event, provider):
"""
If any previous event hasn't sent yet for that provider, then we shouldn't send the current event. Instead, fail and
raise a P1 - so that a notify team member can assess the state of the previous messages, and if necessary, can
replay the `send_broadcast_provider_message` task if the previous message has now been sent.
if this_event.transmitted_finishes_at < datetime.utcnow():
print(this_event.transmitted_finishes_at, datetime.utcnow(),)
raise MaxRetriesExceededError(
f'Given up sending broadcast_event {this_event.id} ' +
f'to provider {broadcast_provider_message.provider}: ' +
f'The expiry time of {this_event.transmitted_finishes_at} has already passed'
Note: This is called before the new broadcast_provider_message is created.
# Help, I've come across this code following a pagerduty alert, what should I do?
1. Find the failing broadcast_provider_message associated with the previous event that caused this to trip.
2. If that provider message is still failing to send, fix the issue causing that. The task to send that previous
message might still be retrying in the background - look for logs related to that task.
3. If that provider message has sent succesfully, you might need to send this task off depending on context. This
might not always be true though, for example, it may not be necessary to send a cancel if the original alert has
already expired.
4. If you need to re-send this task off again, you'll need to run the following command on paas:
`send_broadcast_provider_message.apply_async(args=(broadcast_event_id, provider), queue=QueueNames.BROADCASTS)`
"""
if broadcast_event.transmitted_finishes_at < datetime.utcnow():
# TODO: This should be a different kind of exception to distinguish "We should know something went wrong, but
# no immediate action" from "We need to fix this immediately"
raise CBCProxyFatalException(
f'Cannot send broadcast_event {broadcast_event.id} ' +
f'to provider {provider}: ' +
f'The expiry time of {broadcast_event.transmitted_finishes_at} has already passed'
)
newest_event = max(this_event.broadcast_message.events, key=lambda x: x.sent_at)
# get events sorted from earliest to latest
events = sorted(broadcast_event.broadcast_message.events, key=lambda x: x.sent_at)
if this_event != newest_event:
raise MaxRetriesExceededError(
f'Given up sending broadcast_event {this_event.id} ' +
f'to provider {broadcast_provider_message.provider}: ' +
f'This event has been superceeded by {newest_event.message_type} broadcast_event {newest_event.id}'
for prev_event in events:
if prev_event.id != broadcast_event.id and prev_event.sent_at < broadcast_event.sent_at:
# get the record from when that event was sent to the same provider
prev_provider_message = prev_event.get_provider_message(provider)
# the previous message hasn't even got round to running `send_broadcast_provider_message` yet.
if not prev_provider_message:
raise CBCProxyFatalException(
f'Cannot send {broadcast_event.id}. Previous event {prev_event.id} ' +
f'(type {prev_event.message_type}) has no provider_message for provider {provider} yet.\n' +
f'You must ensure that the other event sends succesfully, then manually kick off this event ' +
f'again by re-running send_broadcast_provider_message for this event and provider.'
)
# if there's a previous message that has started but not finished sending (whether it fatally errored or is
# currently retrying)
if prev_provider_message.status != BroadcastProviderMessageStatus.ACK:
raise CBCProxyFatalException(
f'Cannot send {broadcast_event.id}. Previous event {prev_event.id} ' +
f'(type {prev_event.message_type}) has not finished sending to provider {provider} yet.\n' +
f'It is currently in status "{prev_provider_message.status}".\n' +
f'You must ensure that the other event sends succesfully, then manually kick off this event ' +
f'again by re-running send_broadcast_provider_message for this event and provider.'
)
@@ -74,7 +110,9 @@ def send_broadcast_event(broadcast_event_id):
def send_broadcast_provider_message(self, broadcast_event_id, provider):
broadcast_event = dao_get_broadcast_event_by_id(broadcast_event_id)
# the broadcast_provider_message will already exist if we retried previously
check_provider_message_should_send(broadcast_event, provider)
# the broadcast_provider_message may already exist if we retried previously
broadcast_provider_message = broadcast_event.get_provider_message(provider)
if broadcast_provider_message is None:
broadcast_provider_message = create_broadcast_provider_message(broadcast_event, provider)
@@ -138,19 +176,15 @@ def send_broadcast_provider_message(self, broadcast_event_id, provider):
sent=broadcast_event.sent_at_as_cap_datetime_string,
)
except CBCProxyRetryableException as exc:
# this will raise MaxRetriesExceededError if we no longer want to retry
# (because the message has expired)
check_provider_message_should_retry(broadcast_provider_message)
# TODO: Decide whether to set to TECHNICAL_FAILURE or ERROR based on response codes from cbc proxy
update_broadcast_provider_message_status(
broadcast_provider_message,
status=BroadcastProviderMessageStatus.TECHNICAL_FAILURE
delay = get_retry_delay(self.request.retries)
current_app.logger.exception(
f'Retrying send_broadcast_provider_message for broadcast_event {broadcast_event_id} and ' +
f'provider {provider} in {delay} seconds'
)
self.retry(
exc=exc,
countdown=get_retry_delay(self.request.retries),
countdown=delay,
queue=QueueNames.BROADCASTS,
)

View File

@@ -64,5 +64,5 @@ def create_broadcast_provider_message(broadcast_event, provider):
@transactional
def update_broadcast_provider_message_status(broadcast_provider_message, status):
def update_broadcast_provider_message_status(broadcast_provider_message, *, status):
broadcast_provider_message.status = status

View File

@@ -3,7 +3,7 @@ from datetime import datetime
from unittest.mock import call, ANY
from freezegun import freeze_time
from celery.exceptions import MaxRetriesExceededError, Retry
from celery.exceptions import Retry
import pytest
from app.models import (
@@ -14,9 +14,9 @@ from app.models import (
ServiceBroadcastProviderRestriction,
ServiceBroadcastSettings,
)
from app.clients.cbc_proxy import CBCProxyRetryableException
from app.clients.cbc_proxy import CBCProxyRetryableException, CBCProxyFatalException
from app.celery.broadcast_message_tasks import (
check_provider_message_should_retry,
check_provider_message_should_send,
get_retry_delay,
send_broadcast_event,
send_broadcast_provider_message,
@@ -364,7 +364,7 @@ def test_send_broadcast_provider_message_sends_update_with_references(
)
alert_event = create_broadcast_event(broadcast_message, message_type=BroadcastEventMessageType.ALERT)
create_broadcast_provider_message(alert_event, provider)
create_broadcast_provider_message(alert_event, provider, status=BroadcastProviderMessageStatus.ACK)
update_event = create_broadcast_event(broadcast_message, message_type=BroadcastEventMessageType.UPDATE)
mock_update_broadcast = mocker.patch(
@@ -419,8 +419,8 @@ def test_send_broadcast_provider_message_sends_cancel_with_references(
update_event = create_broadcast_event(broadcast_message, message_type=BroadcastEventMessageType.UPDATE)
cancel_event = create_broadcast_event(broadcast_message, message_type=BroadcastEventMessageType.CANCEL)
create_broadcast_provider_message(alert_event, provider)
create_broadcast_provider_message(update_event, provider)
create_broadcast_provider_message(alert_event, provider, status=BroadcastProviderMessageStatus.ACK)
create_broadcast_provider_message(update_event, provider, status=BroadcastProviderMessageStatus.ACK)
mock_cancel_broadcast = mocker.patch(
f'app.clients.cbc_proxy.CBCProxy{provider_capitalised}.cancel_broadcast',
@@ -498,7 +498,7 @@ def test_send_broadcast_provider_message_errors(mocker, sample_service, provider
queue='broadcast-tasks'
)
broadcast_provider_message = event.get_provider_message(provider)
assert broadcast_provider_message.status == BroadcastProviderMessageStatus.TECHNICAL_FAILURE
assert broadcast_provider_message.status == BroadcastProviderMessageStatus.SENDING
@@ -597,74 +597,121 @@ def test_get_retry_delay_has_capped_backoff(retry_count, expected_delay):
@freeze_time('2021-01-01 12:00')
def test_check_provider_message_should_retry_doesnt_raise_if_event_hasnt_expired_yet(sample_template):
def test_check_provider_message_should_send_doesnt_raise_if_event_hasnt_expired_yet(sample_template):
broadcast_message = create_broadcast_message(sample_template)
current_event = create_broadcast_event(
broadcast_message,
transmitted_starts_at=datetime(2021, 1, 1, 0, 0),
transmitted_finishes_at=datetime(2021, 1, 1, 12, 1),
)
provider_message = create_broadcast_provider_message(current_event, 'ee')
check_provider_message_should_retry(provider_message)
check_provider_message_should_send(current_event, 'ee')
@freeze_time('2021-01-01 12:00')
def test_check_provider_message_should_retry_raises_if_event_has_expired(sample_template):
def test_check_provider_message_should_send_raises_if_event_has_expired(sample_template):
broadcast_message = create_broadcast_message(sample_template)
current_event = create_broadcast_event(
broadcast_message,
transmitted_starts_at=datetime(2021, 1, 1, 0, 0),
transmitted_finishes_at=datetime(2021, 1, 1, 11, 59),
)
provider_message = create_broadcast_provider_message(current_event, 'ee')
with pytest.raises(MaxRetriesExceededError) as exc:
check_provider_message_should_retry(provider_message)
with pytest.raises(CBCProxyFatalException) as exc:
check_provider_message_should_send(current_event, 'ee')
assert 'The expiry time of 2021-01-01 11:59:00 has already passed' in str(exc.value)
@freeze_time('2021-01-01 12:00')
def test_check_provider_message_should_retry_raises_if_a_newer_event_exists(sample_template):
def test_check_provider_message_should_send_raises_if_older_event_still_sending(sample_template):
broadcast_message = create_broadcast_message(sample_template)
# event approved at midnight
past_event = create_broadcast_event(
past_succesful_event = create_broadcast_event(
broadcast_message,
message_type='alert',
sent_at=datetime(2021, 1, 1, 0, 0),
transmitted_starts_at=datetime(2021, 1, 1, 0, 0),
transmitted_finishes_at=datetime(2021, 1, 2, 0, 0),
)
# event updated at 5am (this is the event we're currently trying to send)
current_event = create_broadcast_event(
past_still_sending_event = create_broadcast_event(
broadcast_message,
message_type='update',
sent_at=datetime(2021, 1, 1, 5, 0),
transmitted_starts_at=datetime(2021, 1, 1, 0, 0),
transmitted_finishes_at=datetime(2021, 1, 2, 0, 0),
)
# event updated at 7am
future_event = create_broadcast_event(
current_event = create_broadcast_event(
broadcast_message,
message_type='update',
sent_at=datetime(2021, 1, 1, 7, 0),
transmitted_starts_at=datetime(2021, 1, 1, 0, 0),
transmitted_finishes_at=datetime(2021, 1, 2, 0, 0),
)
# event cancelled at 10am
futurest_event = create_broadcast_event(
broadcast_message,
message_type='cancel',
sent_at=datetime(2021, 1, 1, 10, 0),
transmitted_starts_at=datetime(2021, 1, 1, 0, 0),
transmitted_finishes_at=datetime(2021, 1, 2, 0, 0),
)
provider_message = create_broadcast_provider_message(current_event, 'ee')
create_broadcast_provider_message(past_succesful_event, provider='ee', status=BroadcastProviderMessageStatus.ACK)
create_broadcast_provider_message(past_still_sending_event, provider='ee', status=BroadcastProviderMessageStatus.SENDING) # noqa
# we havent sent the previous update yet - it's still in sending - so don't try and send this one.
with pytest.raises(CBCProxyFatalException) as exc:
check_provider_message_should_send(current_event, 'ee')
assert f'Previous event {past_still_sending_event.id} (type update) has not finished sending to provider ee' in str(exc.value) # noqa
@freeze_time('2021-01-01 12:00')
def test_check_provider_message_should_send_raises_if_older_event_hasnt_started_sending_yet(sample_template):
broadcast_message = create_broadcast_message(sample_template)
# event approved at midnight
past_succesful_event = create_broadcast_event(
broadcast_message,
message_type='alert',
sent_at=datetime(2021, 1, 1, 0, 0),
)
# event updated at 5am (this is the event we're currently trying to send)
past_still_sending_event = create_broadcast_event(
broadcast_message,
message_type='update',
sent_at=datetime(2021, 1, 1, 5, 0),
)
# event updated at 7am
current_event = create_broadcast_event(
broadcast_message,
message_type='update',
sent_at=datetime(2021, 1, 1, 7, 0),
)
# no provider message for past_still_sending_event
create_broadcast_provider_message(past_succesful_event, provider='ee', status=BroadcastProviderMessageStatus.ACK)
# even though the task is going on until midnight tomorrow, we shouldn't send the update now, because the cancel
# message will be in the pipeline somewhere.
with pytest.raises(MaxRetriesExceededError) as exc:
check_provider_message_should_retry(provider_message)
with pytest.raises(CBCProxyFatalException) as exc:
check_provider_message_should_send(current_event, 'ee')
assert f'This event has been superceeded by cancel broadcast_event {futurest_event.id}' in str(exc.value)
assert f'Previous event {past_still_sending_event.id} (type update) has no provider_message for provider ee' in str(exc.value) # noqa
@freeze_time('2021-01-01 12:00')
def test_check_provider_message_should_send_doesnt_raise_if_newer_event_not_acked_yet(sample_template):
broadcast_message = create_broadcast_message(sample_template)
# event approved at midnight
current_event = create_broadcast_event(
broadcast_message,
message_type='alert',
sent_at=datetime(2021, 1, 1, 0, 0),
)
future_event = create_broadcast_event(
broadcast_message,
message_type='cancel',
sent_at=datetime(2021, 1, 1, 10, 0),
)
# this doesn't raise, because the alert event got an ack. The cancel doesn't have an event yet
# but this task is only interested in the current task (the update) so doesn't worry about that
check_provider_message_should_send(current_event, 'ee')
def test_check_provider_message_should_send_doesnt_raise_if_current_event_already_has_provider_message(sample_template):
broadcast_message = create_broadcast_message(sample_template)
current_event = create_broadcast_event(broadcast_message, message_type='alert')
# this might be set to technical-failure if we're currently retrying this event
create_broadcast_provider_message(current_event, provider='ee', status='technical-failure')
# this doesn't raise, because the alert got an ack and it doesnt matter if the current event
# has a provider message yet or not
check_provider_message_should_send(current_event, 'ee')