mirror of
https://github.com/GSA/notifications-api.git
synced 2025-12-22 16:31:15 -05:00
Retry tasks if they fail to send a broadcast event. Note that each task tries the regular proxy and the failover proxy for that provider. This runs a bit differently than our other retries: Retry with exponential backoff. Our other tasks retry with a fixed delay of 5 minutes between tries. If we can't send a broadcast, we want to try immediately. So instead, implement an exponential backoff (1, 2, 4, 8, ... seconds delay). We can't delay for longer than 310 seconds due to visibility timeout settings in SQS, so cap the delay at that amount. Normally we give up retrying after a set amount of retries (often 4 hours). As broadcast content is much more important than normal notifications, we don't ever want to give up on sending them to phones... ...UNLESS WE DO! Sometimes we do want to give up sending a broadcast though! Broadcasts have an expiry time, when they stop showing up on peoples devices, so if that has passed then we don't need to send the broadcast out. Broadcast events can also be superceded by updates or cancels. Check that the event is the most recent event for that broadcast message, if not, give up, as we don't want to accidentally send out two conflicting events for the same message.
425 lines
12 KiB
Python
425 lines
12 KiB
Python
import json
|
|
from abc import ABC, abstractmethod
|
|
|
|
import boto3
|
|
from flask import current_app
|
|
from notifications_utils.template import non_gsm_characters
|
|
|
|
from app.config import BroadcastProvider
|
|
from app.utils import DATETIME_FORMAT, format_sequential_number
|
|
|
|
# The variable names in this file have specific meaning in a CAP message
|
|
#
|
|
# identifier is a unique field for each CAP message
|
|
#
|
|
# headline is a field which we are not sure if we will use
|
|
#
|
|
# description is the body of the message
|
|
|
|
# areas is a list of dicts, with the following items
|
|
# * description is a string which populates the areaDesc field
|
|
# * polygon is a list of lat/long pairs
|
|
#
|
|
# previous_provider_messages is a list of previous events (models.py::BroadcastProviderMessage)
|
|
# ie a Cancel message would have a unique event but have the event of
|
|
# the preceeding Alert message in the previous_provider_messages field
|
|
|
|
|
|
class CBCProxyFatalException(Exception):
|
|
pass
|
|
|
|
|
|
class CBCProxyRetryableException(Exception):
|
|
pass
|
|
|
|
|
|
class CBCProxyClient:
|
|
_lambda_client = None
|
|
|
|
def init_app(self, app):
|
|
if app.config.get('CBC_PROXY_ENABLED'):
|
|
self._lambda_client = boto3.client(
|
|
'lambda',
|
|
region_name='eu-west-2',
|
|
aws_access_key_id=app.config['CBC_PROXY_AWS_ACCESS_KEY_ID'],
|
|
aws_secret_access_key=app.config['CBC_PROXY_AWS_SECRET_ACCESS_KEY'],
|
|
)
|
|
|
|
def get_proxy(self, provider):
|
|
proxy_classes = {
|
|
'canary': CBCProxyCanary,
|
|
BroadcastProvider.EE: CBCProxyEE,
|
|
BroadcastProvider.THREE: CBCProxyThree,
|
|
BroadcastProvider.O2: CBCProxyO2,
|
|
BroadcastProvider.VODAFONE: CBCProxyVodafone,
|
|
}
|
|
return proxy_classes[provider](self._lambda_client)
|
|
|
|
|
|
class CBCProxyClientBase(ABC):
|
|
@property
|
|
@abstractmethod
|
|
def lambda_name(self):
|
|
pass
|
|
|
|
@property
|
|
@abstractmethod
|
|
def failover_lambda_name(self):
|
|
pass
|
|
|
|
@property
|
|
@abstractmethod
|
|
def LANGUAGE_ENGLISH(self):
|
|
pass
|
|
|
|
@property
|
|
@abstractmethod
|
|
def LANGUAGE_WELSH(self):
|
|
pass
|
|
|
|
def __init__(self, lambda_client):
|
|
self._lambda_client = lambda_client
|
|
|
|
def send_canary(
|
|
self,
|
|
identifier,
|
|
):
|
|
pass
|
|
|
|
def send_link_test(
|
|
self,
|
|
identifier,
|
|
sequential_number
|
|
):
|
|
pass
|
|
|
|
def create_and_send_broadcast(
|
|
self, identifier, headline, description, areas, sent, expires, channel, message_number=None
|
|
):
|
|
pass
|
|
|
|
# We have not implementated updating a broadcast
|
|
def update_and_send_broadcast(
|
|
self,
|
|
identifier, previous_provider_messages, headline, description, areas,
|
|
sent, expires, channel, message_number=None
|
|
):
|
|
pass
|
|
|
|
def cancel_broadcast(
|
|
self,
|
|
identifier, previous_provider_messages, headline, description, areas,
|
|
sent, expires, message_number=None
|
|
):
|
|
pass
|
|
|
|
def _invoke_lambda_with_failover(self, payload):
|
|
result = self._invoke_lambda(self.lambda_name, payload)
|
|
|
|
if not result:
|
|
failover_result = self._invoke_lambda(self.failover_lambda_name, payload)
|
|
if not failover_result:
|
|
raise CBCProxyRetryableException(
|
|
f'Lambda failed for both {self.lambda_name} and {self.failover_lambda_name}'
|
|
)
|
|
|
|
return result
|
|
|
|
def _invoke_lambda(self, lambda_name, payload):
|
|
payload_bytes = bytes(json.dumps(payload), encoding='utf8')
|
|
current_app.logger.info(f"Calling lambda {lambda_name}")
|
|
result = self._lambda_client.invoke(
|
|
FunctionName=lambda_name,
|
|
InvocationType='RequestResponse',
|
|
Payload=payload_bytes,
|
|
)
|
|
current_app.logger.info(f"Finished calling lambda {lambda_name}")
|
|
|
|
if result['StatusCode'] > 299:
|
|
current_app.logger.info(
|
|
f"Error calling lambda {lambda_name} with status code { result['StatusCode']}, {result.get('Payload')}"
|
|
)
|
|
success = False
|
|
|
|
elif 'FunctionError' in result:
|
|
current_app.logger.info(
|
|
f"Error calling lambda {lambda_name} with function error { result['Payload'] }"
|
|
)
|
|
success = False
|
|
|
|
else:
|
|
success = True
|
|
|
|
return success
|
|
|
|
def infer_language_from(self, content):
|
|
if non_gsm_characters(content):
|
|
return self.LANGUAGE_WELSH
|
|
return self.LANGUAGE_ENGLISH
|
|
|
|
|
|
class CBCProxyCanary(CBCProxyClientBase):
|
|
"""
|
|
The canary is a lambda which tests notify's connectivity to the Cell Broadcast AWS infrastructure. It calls the
|
|
canary, a specific lambda that does not open a vpn or connect to a provider but just responds from within AWS.
|
|
"""
|
|
lambda_name = 'canary'
|
|
# we don't need a failover lambda for the canary as it doesn't actually make calls out to a CBC
|
|
# so we just reuse the normal one in case of a failover scenario
|
|
failover_lambda_name = 'canary'
|
|
|
|
LANGUAGE_ENGLISH = None
|
|
LANGUAGE_WELSH = None
|
|
|
|
def send_canary(
|
|
self,
|
|
identifier,
|
|
):
|
|
self._invoke_lambda(self.lambda_name, payload={'identifier': identifier})
|
|
|
|
|
|
class CBCProxyEE(CBCProxyClientBase):
|
|
lambda_name = 'ee-1-proxy'
|
|
failover_lambda_name = 'ee-2-proxy'
|
|
|
|
LANGUAGE_ENGLISH = 'en-GB'
|
|
LANGUAGE_WELSH = 'cy-GB'
|
|
|
|
def send_link_test(
|
|
self,
|
|
identifier,
|
|
sequential_number=None,
|
|
):
|
|
"""
|
|
link test - open up a connection to a specific provider, and send them an xml payload with a <msgType> of
|
|
test.
|
|
"""
|
|
payload = {
|
|
'message_type': 'test',
|
|
'identifier': identifier,
|
|
'message_format': 'cap'
|
|
}
|
|
|
|
self._invoke_lambda_with_failover(payload=payload)
|
|
|
|
def create_and_send_broadcast(
|
|
self, identifier, headline, description, areas, sent, expires, channel, message_number=None
|
|
):
|
|
payload = {
|
|
'message_type': 'alert',
|
|
'identifier': identifier,
|
|
'message_format': 'cap',
|
|
'headline': headline,
|
|
'description': description,
|
|
'areas': areas,
|
|
'sent': sent,
|
|
'expires': expires,
|
|
'language': self.infer_language_from(description),
|
|
'channel': channel,
|
|
}
|
|
self._invoke_lambda_with_failover(payload=payload)
|
|
|
|
def cancel_broadcast(
|
|
self,
|
|
identifier, previous_provider_messages,
|
|
sent, message_number=None
|
|
):
|
|
payload = {
|
|
'message_type': 'cancel',
|
|
'identifier': identifier,
|
|
'message_format': 'cap',
|
|
"references": [
|
|
{
|
|
"message_id": str(message.id),
|
|
"sent": message.created_at.strftime(DATETIME_FORMAT)
|
|
} for message in previous_provider_messages
|
|
],
|
|
'sent': sent,
|
|
}
|
|
self._invoke_lambda_with_failover(payload=payload)
|
|
|
|
|
|
class CBCProxyThree(CBCProxyClientBase):
|
|
lambda_name = 'three-1-proxy'
|
|
failover_lambda_name = 'three-2-proxy'
|
|
|
|
LANGUAGE_ENGLISH = 'en-GB'
|
|
LANGUAGE_WELSH = 'cy-GB'
|
|
|
|
def send_link_test(
|
|
self,
|
|
identifier,
|
|
sequential_number=None,
|
|
):
|
|
"""
|
|
link test - open up a connection to a specific provider, and send them an xml payload with a <msgType> of
|
|
test.
|
|
"""
|
|
payload = {
|
|
'message_type': 'test',
|
|
'identifier': identifier,
|
|
'message_format': 'cap'
|
|
}
|
|
|
|
self._invoke_lambda_with_failover(payload=payload)
|
|
|
|
def create_and_send_broadcast(
|
|
self, identifier, headline, description, areas, sent, expires, channel, message_number=None
|
|
):
|
|
payload = {
|
|
'message_type': 'alert',
|
|
'identifier': identifier,
|
|
'message_format': 'cap',
|
|
'headline': headline,
|
|
'description': description,
|
|
'areas': areas,
|
|
'sent': sent,
|
|
'expires': expires,
|
|
'language': self.infer_language_from(description),
|
|
'channel': channel,
|
|
}
|
|
self._invoke_lambda_with_failover(payload=payload)
|
|
|
|
def cancel_broadcast(
|
|
self,
|
|
identifier, previous_provider_messages,
|
|
sent, message_number=None
|
|
):
|
|
payload = {
|
|
'message_type': 'cancel',
|
|
'identifier': identifier,
|
|
'message_format': 'cap',
|
|
"references": [
|
|
{
|
|
"message_id": str(message.id),
|
|
"sent": message.created_at.strftime(DATETIME_FORMAT)
|
|
} for message in previous_provider_messages
|
|
],
|
|
'sent': sent,
|
|
}
|
|
self._invoke_lambda_with_failover(payload=payload)
|
|
|
|
class CBCProxyO2(CBCProxyClientBase):
|
|
lambda_name = 'o2-1-proxy'
|
|
failover_lambda_name = 'o2-2-proxy'
|
|
|
|
LANGUAGE_ENGLISH = 'en-GB'
|
|
LANGUAGE_WELSH = 'cy-GB'
|
|
|
|
def send_link_test(
|
|
self,
|
|
identifier,
|
|
sequential_number=None,
|
|
):
|
|
"""
|
|
link test - open up a connection to a specific provider, and send them an xml payload with a <msgType> of
|
|
test.
|
|
"""
|
|
payload = {
|
|
'message_type': 'test',
|
|
'identifier': identifier,
|
|
'message_format': 'cap'
|
|
}
|
|
|
|
self._invoke_lambda_with_failover(payload=payload)
|
|
|
|
def create_and_send_broadcast(
|
|
self, identifier, headline, description, areas, sent, expires, channel, message_number=None
|
|
):
|
|
payload = {
|
|
'message_type': 'alert',
|
|
'identifier': identifier,
|
|
'message_format': 'cap',
|
|
'headline': headline,
|
|
'description': description,
|
|
'areas': areas,
|
|
'sent': sent,
|
|
'expires': expires,
|
|
'language': self.infer_language_from(description),
|
|
'channel': channel,
|
|
}
|
|
self._invoke_lambda_with_failover(payload=payload)
|
|
|
|
def cancel_broadcast(
|
|
self,
|
|
identifier, previous_provider_messages,
|
|
sent, message_number=None
|
|
):
|
|
payload = {
|
|
'message_type': 'cancel',
|
|
'identifier': identifier,
|
|
'message_format': 'cap',
|
|
"references": [
|
|
{
|
|
"message_id": str(message.id),
|
|
"sent": message.created_at.strftime(DATETIME_FORMAT)
|
|
} for message in previous_provider_messages
|
|
],
|
|
'sent': sent,
|
|
}
|
|
self._invoke_lambda_with_failover(payload=payload)
|
|
|
|
|
|
class CBCProxyVodafone(CBCProxyClientBase):
|
|
lambda_name = 'vodafone-1-proxy'
|
|
failover_lambda_name = 'vodafone-2-proxy'
|
|
|
|
LANGUAGE_ENGLISH = 'English'
|
|
LANGUAGE_WELSH = 'Welsh'
|
|
|
|
def send_link_test(
|
|
self,
|
|
identifier,
|
|
sequential_number,
|
|
):
|
|
"""
|
|
link test - open up a connection to a specific provider, and send them an xml payload with a <msgType> of
|
|
test.
|
|
"""
|
|
payload = {
|
|
'message_type': 'test',
|
|
'identifier': identifier,
|
|
'message_number': sequential_number,
|
|
'message_format': 'ibag'
|
|
}
|
|
|
|
self._invoke_lambda_with_failover(payload=payload)
|
|
|
|
def create_and_send_broadcast(
|
|
self, identifier, message_number, headline, description, areas, sent, expires, channel
|
|
):
|
|
payload = {
|
|
'message_type': 'alert',
|
|
'identifier': identifier,
|
|
'message_number': message_number,
|
|
'message_format': 'ibag',
|
|
'headline': headline,
|
|
'description': description,
|
|
'areas': areas,
|
|
'sent': sent,
|
|
'expires': expires,
|
|
'language': self.infer_language_from(description),
|
|
'channel': channel,
|
|
}
|
|
self._invoke_lambda_with_failover(payload=payload)
|
|
|
|
def cancel_broadcast(
|
|
self, identifier, previous_provider_messages, sent, message_number
|
|
):
|
|
|
|
payload = {
|
|
'message_type': 'cancel',
|
|
'identifier': identifier,
|
|
'message_number': message_number,
|
|
'message_format': 'ibag',
|
|
"references": [
|
|
{
|
|
"message_id": str(message.id),
|
|
"message_number": format_sequential_number(message.message_number),
|
|
"sent": message.created_at.strftime(DATETIME_FORMAT)
|
|
} for message in previous_provider_messages
|
|
],
|
|
'sent': sent,
|
|
}
|
|
self._invoke_lambda_with_failover(payload=payload)
|