send messages to multiple providers

at the moment only EE is enabled (this is set in app.config, but also,
only EE have a function defined for them so even if another provider was
enabled without changing the dict in cbc_proxy.py we won't trigger
anything). this commit just adds wrapper tasks that check what providers
are enabled, and invokes the send function for each provider.

The send function doesn't currently distinguish between providers for
now - as we only have EE set up. in the future we'll want to separate
the cbc_proxy_client into separate clients for separate providers.
Different providers have different lambda functions, and have different
requirements. For example, we know that the two different CBC software
solutions handle references to previous messages differently.
This commit is contained in:
Leo Hemsted
2020-10-29 11:12:28 +00:00
parent 2e665de46d
commit bc3512467b
4 changed files with 57 additions and 21 deletions

View File

@@ -1,15 +1,29 @@
import uuid
from flask import current_app
from notifications_utils.statsd_decorators import statsd
from app import cbc_proxy_client, notify_celery
from app.models import BroadcastEventMessageType
from app.config import QueueNames
from app.models import BroadcastEventMessageType, BroadcastProvider
from app.dao.broadcast_message_dao import dao_get_broadcast_event_by_id
@notify_celery.task(name="send-broadcast-event")
@statsd(namespace="tasks")
def send_broadcast_event(broadcast_event_id):
for provider in BroadcastProvider.PROVIDERS:
# TODO: Decide whether to send to each provider based on platform admin, service level settings, broadcast
# level settings, etc.
send_broadcast_provider_message.apply_async(
kwargs={'broadcast_event_id': broadcast_event_id, 'provider': provider},
queue=QueueNames.NOTIFY
)
@notify_celery.task(name="send-broadcast-provider-message")
@statsd(namespace="tasks")
def send_broadcast_provider_message(broadcast_event_id, provider):
broadcast_event = dao_get_broadcast_event_by_id(broadcast_event_id)
current_app.logger.info(
@@ -52,3 +66,24 @@ def send_broadcast_event(broadcast_event_id):
sent=broadcast_event.sent_at_as_cap_datetime_string,
expires=broadcast_event.transmitted_finishes_at_as_cap_datetime_string,
)
@notify_celery.task(name='trigger-link-test')
def trigger_link_test(provider):
"""
Currently we only have one hardcoded CBC Proxy, which corresponds to one
CBC, and so currently we do not specify the CBC Proxy name
In future we will have multiple CBC proxies, each proxy corresponding to
one MNO's CBC
This task should invoke other tasks which do the actual link tests, eg:
for cbc_name in app.config.ENABLED_CBCS:
send_link_test_for_cbc(cbc_name)
Alternatively this task could be configured to be a Celery group
"""
identifier = str(uuid.uuid4())
message = f"Sending a link test to CBC proxy for provider {provider} with ID {identifier}"
current_app.logger.info(message)
cbc_proxy_client.send_link_test(identifier)

View File

@@ -17,6 +17,7 @@ from app.celery.tasks import (
process_row,
process_incomplete_jobs)
from app.celery.letters_pdf_tasks import get_pdf_for_templated_letter
from app.celery.broadcast_message_tasks import trigger_link_test
from app.config import QueueNames
from app.dao.invited_org_user_dao import delete_org_invitations_created_more_than_two_days_ago
from app.dao.invited_user_dao import delete_invitations_created_more_than_two_days_ago
@@ -305,21 +306,5 @@ def send_canary_to_cbc_proxy():
@notify_celery.task(name='trigger-link-tests')
def trigger_link_tests():
"""
Currently we only have one hardcoded CBC Proxy, which corresponds to one
CBC, and so currently we do not specify the CBC Proxy name
In future we will have multiple CBC proxies, each proxy corresponding to
one MNO's CBC
This task should invoke other tasks which do the actual link tests, eg:
for cbc_name in app.config.ENABLED_CBCS:
send_link_test_for_cbc(cbc_name)
Alternatively this task could be configured to be a Celery group
"""
for _ in range(1):
identifier = str(uuid.uuid4())
message = f"Sending a link test to CBC proxy with ID {identifier}"
current_app.logger.info(message)
cbc_proxy_client.send_link_test(identifier)
for cbc_name in current_app.config['ENABLED_CBCS']:
trigger_link_test.apply_async(kwargs={'provider': cbc_name}, queue=QueueNames.NOTIFY)

View File

@@ -2,6 +2,8 @@ import json
import boto3
from app.models import BroadcastProviders
# The variable names in this file have specific meaning in a CAP message
#
# identifier is a unique field for each CAP message
@@ -67,6 +69,9 @@ class CBCProxyNoopClient:
class CBCProxyClient:
provider_function_name_map = {
BroadcastProviders.EE: 'bt-ee-1-proxy',
}
def init_app(self, app):
self._lambda_client = boto3.client(
@@ -97,12 +102,21 @@ class CBCProxyClient:
self,
identifier,
):
"""
canary - a specific lambda that does not connect to a provider, but just confirms the connectivity between
Notify and the CBC proxy AWS account
"""
self._invoke_lambda(function_name='canary', payload={'identifier': identifier})
def send_link_test(
self,
identifier,
provider,
):
"""
link test - open up a connection to a specific provider, and send them an xml payload with a <msgType> of
test.
"""
payload = {'message_type': 'test', 'identifier': identifier}
self._invoke_lambda(function_name='bt-ee-1-proxy', payload=payload)
@@ -121,7 +135,6 @@ class CBCProxyClient:
'sent': sent,
'expires': expires,
}
self._invoke_lambda(function_name='bt-ee-1-proxy', payload=payload)
# We have not implementated updating a broadcast

View File

@@ -367,6 +367,9 @@ class Config(object):
CBC_PROXY_AWS_ACCESS_KEY_ID = os.environ.get('CBC_PROXY_AWS_ACCESS_KEY_ID', '')
CBC_PROXY_AWS_SECRET_ACCESS_KEY = os.environ.get('CBC_PROXY_AWS_SECRET_ACCESS_KEY', '')
# matches up with the strings in models.py::BroadcastProvider
ENABLED_CBCS = ['ee']
######################
# Config overrides ###