diff --git a/app/celery/broadcast_message_tasks.py b/app/celery/broadcast_message_tasks.py index e60607f4c..18ca0225a 100644 --- a/app/celery/broadcast_message_tasks.py +++ b/app/celery/broadcast_message_tasks.py @@ -32,25 +32,61 @@ def get_retry_delay(retry_count): return min(delay, 300) -def check_provider_message_should_retry(broadcast_provider_message): - this_event = broadcast_provider_message.broadcast_event +def check_provider_message_should_send(broadcast_event, provider): + """ + If any previous event hasn't sent yet for that provider, then we shouldn't send the current event. Instead, fail and + raise a P1 - so that a notify team member can assess the state of the previous messages, and if necessary, can + replay the `send_broadcast_provider_message` task if the previous message has now been sent. - if this_event.transmitted_finishes_at < datetime.utcnow(): - print(this_event.transmitted_finishes_at, datetime.utcnow(),) - raise MaxRetriesExceededError( - f'Given up sending broadcast_event {this_event.id} ' + - f'to provider {broadcast_provider_message.provider}: ' + - f'The expiry time of {this_event.transmitted_finishes_at} has already passed' + Note: This is called before the new broadcast_provider_message is created. + + # Help, I've come across this code following a pagerduty alert, what should I do? + + 1. Find the failing broadcast_provider_message associated with the previous event that caused this to trip. + 2. If that provider message is still failing to send, fix the issue causing that. The task to send that previous + message might still be retrying in the background - look for logs related to that task. + 3. If that provider message has sent succesfully, you might need to send this task off depending on context. This + might not always be true though, for example, it may not be necessary to send a cancel if the original alert has + already expired. + 4. If you need to re-send this task off again, you'll need to run the following command on paas: + `send_broadcast_provider_message.apply_async(args=(broadcast_event_id, provider), queue=QueueNames.BROADCASTS)` + """ + if broadcast_event.transmitted_finishes_at < datetime.utcnow(): + # TODO: This should be a different kind of exception to distinguish "We should know something went wrong, but + # no immediate action" from "We need to fix this immediately" + raise CBCProxyFatalException( + f'Cannot send broadcast_event {broadcast_event.id} ' + + f'to provider {provider}: ' + + f'The expiry time of {broadcast_event.transmitted_finishes_at} has already passed' ) - newest_event = max(this_event.broadcast_message.events, key=lambda x: x.sent_at) + # get events sorted from earliest to latest + events = sorted(broadcast_event.broadcast_message.events, key=lambda x: x.sent_at) - if this_event != newest_event: - raise MaxRetriesExceededError( - f'Given up sending broadcast_event {this_event.id} ' + - f'to provider {broadcast_provider_message.provider}: ' + - f'This event has been superceeded by {newest_event.message_type} broadcast_event {newest_event.id}' - ) + for prev_event in events: + if prev_event.id != broadcast_event.id and prev_event.sent_at < broadcast_event.sent_at: + # get the record from when that event was sent to the same provider + prev_provider_message = prev_event.get_provider_message(provider) + + # the previous message hasn't even got round to running `send_broadcast_provider_message` yet. + if not prev_provider_message: + raise CBCProxyFatalException( + f'Cannot send {broadcast_event.id}. Previous event {prev_event.id} ' + + f'(type {prev_event.message_type}) has no provider_message for provider {provider} yet.\n' + + f'You must ensure that the other event sends succesfully, then manually kick off this event ' + + f'again by re-running send_broadcast_provider_message for this event and provider.' + ) + + # if there's a previous message that has started but not finished sending (whether it fatally errored or is + # currently retrying) + if prev_provider_message.status != BroadcastProviderMessageStatus.ACK: + raise CBCProxyFatalException( + f'Cannot send {broadcast_event.id}. Previous event {prev_event.id} ' + + f'(type {prev_event.message_type}) has not finished sending to provider {provider} yet.\n' + + f'It is currently in status "{prev_provider_message.status}".\n' + + f'You must ensure that the other event sends succesfully, then manually kick off this event ' + + f'again by re-running send_broadcast_provider_message for this event and provider.' + ) @notify_celery.task(name="send-broadcast-event") @@ -74,7 +110,9 @@ def send_broadcast_event(broadcast_event_id): def send_broadcast_provider_message(self, broadcast_event_id, provider): broadcast_event = dao_get_broadcast_event_by_id(broadcast_event_id) - # the broadcast_provider_message will already exist if we retried previously + check_provider_message_should_send(broadcast_event, provider) + + # the broadcast_provider_message may already exist if we retried previously broadcast_provider_message = broadcast_event.get_provider_message(provider) if broadcast_provider_message is None: broadcast_provider_message = create_broadcast_provider_message(broadcast_event, provider) @@ -138,19 +176,15 @@ def send_broadcast_provider_message(self, broadcast_event_id, provider): sent=broadcast_event.sent_at_as_cap_datetime_string, ) except CBCProxyRetryableException as exc: - # this will raise MaxRetriesExceededError if we no longer want to retry - # (because the message has expired) - check_provider_message_should_retry(broadcast_provider_message) - - # TODO: Decide whether to set to TECHNICAL_FAILURE or ERROR based on response codes from cbc proxy - update_broadcast_provider_message_status( - broadcast_provider_message, - status=BroadcastProviderMessageStatus.TECHNICAL_FAILURE + delay = get_retry_delay(self.request.retries) + current_app.logger.exception( + f'Retrying send_broadcast_provider_message for broadcast_event {broadcast_event_id} and ' + + f'provider {provider} in {delay} seconds' ) self.retry( exc=exc, - countdown=get_retry_delay(self.request.retries), + countdown=delay, queue=QueueNames.BROADCASTS, ) diff --git a/app/dao/broadcast_message_dao.py b/app/dao/broadcast_message_dao.py index 746bb175c..12af8e246 100644 --- a/app/dao/broadcast_message_dao.py +++ b/app/dao/broadcast_message_dao.py @@ -64,5 +64,5 @@ def create_broadcast_provider_message(broadcast_event, provider): @transactional -def update_broadcast_provider_message_status(broadcast_provider_message, status): +def update_broadcast_provider_message_status(broadcast_provider_message, *, status): broadcast_provider_message.status = status diff --git a/tests/app/celery/test_broadcast_message_tasks.py b/tests/app/celery/test_broadcast_message_tasks.py index d09027ffc..5864e4805 100644 --- a/tests/app/celery/test_broadcast_message_tasks.py +++ b/tests/app/celery/test_broadcast_message_tasks.py @@ -3,7 +3,7 @@ from datetime import datetime from unittest.mock import call, ANY from freezegun import freeze_time -from celery.exceptions import MaxRetriesExceededError, Retry +from celery.exceptions import Retry import pytest from app.models import ( @@ -14,9 +14,9 @@ from app.models import ( ServiceBroadcastProviderRestriction, ServiceBroadcastSettings, ) -from app.clients.cbc_proxy import CBCProxyRetryableException +from app.clients.cbc_proxy import CBCProxyRetryableException, CBCProxyFatalException from app.celery.broadcast_message_tasks import ( - check_provider_message_should_retry, + check_provider_message_should_send, get_retry_delay, send_broadcast_event, send_broadcast_provider_message, @@ -364,7 +364,7 @@ def test_send_broadcast_provider_message_sends_update_with_references( ) alert_event = create_broadcast_event(broadcast_message, message_type=BroadcastEventMessageType.ALERT) - create_broadcast_provider_message(alert_event, provider) + create_broadcast_provider_message(alert_event, provider, status=BroadcastProviderMessageStatus.ACK) update_event = create_broadcast_event(broadcast_message, message_type=BroadcastEventMessageType.UPDATE) mock_update_broadcast = mocker.patch( @@ -419,8 +419,8 @@ def test_send_broadcast_provider_message_sends_cancel_with_references( update_event = create_broadcast_event(broadcast_message, message_type=BroadcastEventMessageType.UPDATE) cancel_event = create_broadcast_event(broadcast_message, message_type=BroadcastEventMessageType.CANCEL) - create_broadcast_provider_message(alert_event, provider) - create_broadcast_provider_message(update_event, provider) + create_broadcast_provider_message(alert_event, provider, status=BroadcastProviderMessageStatus.ACK) + create_broadcast_provider_message(update_event, provider, status=BroadcastProviderMessageStatus.ACK) mock_cancel_broadcast = mocker.patch( f'app.clients.cbc_proxy.CBCProxy{provider_capitalised}.cancel_broadcast', @@ -498,7 +498,7 @@ def test_send_broadcast_provider_message_errors(mocker, sample_service, provider queue='broadcast-tasks' ) broadcast_provider_message = event.get_provider_message(provider) - assert broadcast_provider_message.status == BroadcastProviderMessageStatus.TECHNICAL_FAILURE + assert broadcast_provider_message.status == BroadcastProviderMessageStatus.SENDING @@ -597,74 +597,121 @@ def test_get_retry_delay_has_capped_backoff(retry_count, expected_delay): @freeze_time('2021-01-01 12:00') -def test_check_provider_message_should_retry_doesnt_raise_if_event_hasnt_expired_yet(sample_template): +def test_check_provider_message_should_send_doesnt_raise_if_event_hasnt_expired_yet(sample_template): broadcast_message = create_broadcast_message(sample_template) current_event = create_broadcast_event( broadcast_message, transmitted_starts_at=datetime(2021, 1, 1, 0, 0), transmitted_finishes_at=datetime(2021, 1, 1, 12, 1), ) - provider_message = create_broadcast_provider_message(current_event, 'ee') - - check_provider_message_should_retry(provider_message) + check_provider_message_should_send(current_event, 'ee') @freeze_time('2021-01-01 12:00') -def test_check_provider_message_should_retry_raises_if_event_has_expired(sample_template): +def test_check_provider_message_should_send_raises_if_event_has_expired(sample_template): broadcast_message = create_broadcast_message(sample_template) current_event = create_broadcast_event( broadcast_message, transmitted_starts_at=datetime(2021, 1, 1, 0, 0), transmitted_finishes_at=datetime(2021, 1, 1, 11, 59), ) - provider_message = create_broadcast_provider_message(current_event, 'ee') - - with pytest.raises(MaxRetriesExceededError) as exc: - check_provider_message_should_retry(provider_message) + with pytest.raises(CBCProxyFatalException) as exc: + check_provider_message_should_send(current_event, 'ee') assert 'The expiry time of 2021-01-01 11:59:00 has already passed' in str(exc.value) @freeze_time('2021-01-01 12:00') -def test_check_provider_message_should_retry_raises_if_a_newer_event_exists(sample_template): +def test_check_provider_message_should_send_raises_if_older_event_still_sending(sample_template): broadcast_message = create_broadcast_message(sample_template) # event approved at midnight - past_event = create_broadcast_event( + past_succesful_event = create_broadcast_event( broadcast_message, message_type='alert', sent_at=datetime(2021, 1, 1, 0, 0), - transmitted_starts_at=datetime(2021, 1, 1, 0, 0), - transmitted_finishes_at=datetime(2021, 1, 2, 0, 0), ) # event updated at 5am (this is the event we're currently trying to send) - current_event = create_broadcast_event( + past_still_sending_event = create_broadcast_event( broadcast_message, message_type='update', sent_at=datetime(2021, 1, 1, 5, 0), - transmitted_starts_at=datetime(2021, 1, 1, 0, 0), - transmitted_finishes_at=datetime(2021, 1, 2, 0, 0), ) # event updated at 7am - future_event = create_broadcast_event( + current_event = create_broadcast_event( broadcast_message, message_type='update', sent_at=datetime(2021, 1, 1, 7, 0), - transmitted_starts_at=datetime(2021, 1, 1, 0, 0), - transmitted_finishes_at=datetime(2021, 1, 2, 0, 0), - ) - # event cancelled at 10am - futurest_event = create_broadcast_event( - broadcast_message, - message_type='cancel', - sent_at=datetime(2021, 1, 1, 10, 0), - transmitted_starts_at=datetime(2021, 1, 1, 0, 0), - transmitted_finishes_at=datetime(2021, 1, 2, 0, 0), ) - provider_message = create_broadcast_provider_message(current_event, 'ee') + create_broadcast_provider_message(past_succesful_event, provider='ee', status=BroadcastProviderMessageStatus.ACK) + create_broadcast_provider_message(past_still_sending_event, provider='ee', status=BroadcastProviderMessageStatus.SENDING) # noqa + + # we havent sent the previous update yet - it's still in sending - so don't try and send this one. + with pytest.raises(CBCProxyFatalException) as exc: + check_provider_message_should_send(current_event, 'ee') + + assert f'Previous event {past_still_sending_event.id} (type update) has not finished sending to provider ee' in str(exc.value) # noqa + + +@freeze_time('2021-01-01 12:00') +def test_check_provider_message_should_send_raises_if_older_event_hasnt_started_sending_yet(sample_template): + broadcast_message = create_broadcast_message(sample_template) + # event approved at midnight + past_succesful_event = create_broadcast_event( + broadcast_message, + message_type='alert', + sent_at=datetime(2021, 1, 1, 0, 0), + ) + # event updated at 5am (this is the event we're currently trying to send) + past_still_sending_event = create_broadcast_event( + broadcast_message, + message_type='update', + sent_at=datetime(2021, 1, 1, 5, 0), + ) + # event updated at 7am + current_event = create_broadcast_event( + broadcast_message, + message_type='update', + sent_at=datetime(2021, 1, 1, 7, 0), + ) + + # no provider message for past_still_sending_event + create_broadcast_provider_message(past_succesful_event, provider='ee', status=BroadcastProviderMessageStatus.ACK) # even though the task is going on until midnight tomorrow, we shouldn't send the update now, because the cancel # message will be in the pipeline somewhere. - with pytest.raises(MaxRetriesExceededError) as exc: - check_provider_message_should_retry(provider_message) + with pytest.raises(CBCProxyFatalException) as exc: + check_provider_message_should_send(current_event, 'ee') - assert f'This event has been superceeded by cancel broadcast_event {futurest_event.id}' in str(exc.value) + assert f'Previous event {past_still_sending_event.id} (type update) has no provider_message for provider ee' in str(exc.value) # noqa + + +@freeze_time('2021-01-01 12:00') +def test_check_provider_message_should_send_doesnt_raise_if_newer_event_not_acked_yet(sample_template): + broadcast_message = create_broadcast_message(sample_template) + # event approved at midnight + current_event = create_broadcast_event( + broadcast_message, + message_type='alert', + sent_at=datetime(2021, 1, 1, 0, 0), + ) + future_event = create_broadcast_event( + broadcast_message, + message_type='cancel', + sent_at=datetime(2021, 1, 1, 10, 0), + ) + + # this doesn't raise, because the alert event got an ack. The cancel doesn't have an event yet + # but this task is only interested in the current task (the update) so doesn't worry about that + check_provider_message_should_send(current_event, 'ee') + + +def test_check_provider_message_should_send_doesnt_raise_if_current_event_already_has_provider_message(sample_template): + broadcast_message = create_broadcast_message(sample_template) + current_event = create_broadcast_event(broadcast_message, message_type='alert') + + # this might be set to technical-failure if we're currently retrying this event + create_broadcast_provider_message(current_event, provider='ee', status='technical-failure') + + # this doesn't raise, because the alert got an ack and it doesnt matter if the current event + # has a provider message yet or not + check_provider_message_should_send(current_event, 'ee')