send broadcast events rather than messages

use the new endpoint from cbc proxy. create a new task that just
serializes the event and sends it across rather than sending a template
and the broadcast message.

some changes to serialize to make it json friendly etc. it also expects
sent_at and transmitted_finishes_at to always be set (we set them in the
code but don't enforce it n the DB right now), as they're required by
utils template. not sure whether we'll update db constraints to be more
strict or utils template to be more permissive just yet, wait until we
find out more about the requirements of the CBCs we integrate with.
This commit is contained in:
Leo Hemsted
2020-08-04 19:21:22 +01:00
parent 1c48e2efb2
commit bdf2253298
7 changed files with 138 additions and 44 deletions

View File

@@ -14,7 +14,7 @@ from app.dao.broadcast_message_dao import (
from app.dao.services_dao import dao_fetch_service_by_id
from app.errors import register_errors, InvalidRequest
from app.models import BroadcastMessage, BroadcastStatusType, BroadcastEvent, BroadcastEventMessageType
from app.celery.broadcast_message_tasks import send_broadcast_message
from app.celery.broadcast_message_tasks import send_broadcast_event
from app.broadcast_message.broadcast_message_schema import (
create_broadcast_message_schema,
update_broadcast_message_schema,
@@ -196,7 +196,7 @@ def _create_broadcast_event(broadcast_message):
# save to the DB
dao_create_broadcast_message(event)
send_broadcast_message.apply_async(
kwargs={'broadcast_message_id': str(broadcast_message.id)},
send_broadcast_event.apply_async(
kwargs={'broadcast_event_id': str(event.id)},
queue=QueueNames.NOTIFY
)

View File

@@ -4,7 +4,7 @@ from notifications_utils.statsd_decorators import statsd
from app import notify_celery
from app.dao.broadcast_message_dao import dao_get_broadcast_message_by_id
from app.dao.broadcast_message_dao import dao_get_broadcast_message_by_id, dao_get_broadcast_event_by_id
@notify_celery.task(name="send-broadcast-message")
@@ -35,3 +35,27 @@ def send_broadcast_message(broadcast_message_id, provider='stub-1'):
f'broadcast_message {broadcast_message.id} '
f'status {broadcast_message.status} sent to {provider}'
)
@notify_celery.task(name="send-broadcast-event")
@statsd(namespace="tasks")
def send_broadcast_event(broadcast_event_id, provider='stub-1'):
broadcast_event = dao_get_broadcast_event_by_id(broadcast_event_id)
current_app.logger.info(
f'sending broadcast_event {broadcast_event.reference} '
f'msgType {broadcast_event.message_type} to {provider}'
)
payload = broadcast_event.serialize()
resp = requests.post(
f'{current_app.config["CBC_PROXY_URL"]}/broadcasts/events/{provider}',
json=payload
)
resp.raise_for_status()
current_app.logger.info(
f'broadcast_event {broadcast_event.reference} '
f'msgType {broadcast_event.message_type} sent to {provider}'
)

View File

@@ -24,6 +24,10 @@ def dao_get_broadcast_message_by_id(broadcast_message_id):
return BroadcastMessage.query.get(broadcast_message_id)
def dao_get_broadcast_event_by_id(broadcast_event_id):
return BroadcastEvent.query.get(broadcast_event_id)
def dao_get_broadcast_messages_for_service(service_id):
return BroadcastMessage.query.filter(
BroadcastMessage.service_id == service_id

View File

@@ -2338,20 +2338,19 @@ class BroadcastEvent(db.Model):
def get_earlier_message_references(self):
from app.dao.broadcast_message_dao import get_earlier_events_for_broadcast_event
return [event.reference for event in get_earlier_events_for_broadcast_event(self)]
return [event.reference for event in get_earlier_events_for_broadcast_event(self.id)]
def serialize(self):
return {
'id': self.id,
'id': str(self.id),
'service_id': self.service_id,
'reference': self.reference,
'service_id': str(self.service_id),
'previous_event_references': self.get_earlier_message_references(),
'broadcast_message_id': self.broadcast_message_id,
'sent_at': self.sent_at,
'broadcast_message_id': str(self.broadcast_message_id),
# sent_at is required by BroadcastMessageTemplate.from_broadcast_event
'sent_at': self.sent_at.strftime(DATETIME_FORMAT),
'message_type': self.message_type,
'transmitted_content': self.transmitted_content,
@@ -2359,6 +2358,7 @@ class BroadcastEvent(db.Model):
'transmitted_sender': self.transmitted_sender,
'transmitted_starts_at': get_dt_string_or_none(self.transmitted_starts_at),
'transmitted_finishes_at': get_dt_string_or_none(self.transmitted_finishes_at),
# transmitted_finishes_at is required by BroadcastMessageTemplate.from_broadcast_event
'transmitted_finishes_at': self.transmitted_finishes_at.strftime(DATETIME_FORMAT),
}

View File

@@ -1,9 +1,10 @@
from unittest.mock import ANY
import uuid
from freezegun import freeze_time
import pytest
from app.models import BROADCAST_TYPE, BroadcastStatusType, BroadcastEvent, BroadcastEventMessageType
from app.models import BROADCAST_TYPE, BroadcastStatusType, BroadcastEventMessageType
from tests.app.db import create_broadcast_message, create_template, create_service, create_user
@@ -270,7 +271,7 @@ def test_update_broadcast_message_status_stores_cancelled_by_and_cancelled_at(ad
bm = create_broadcast_message(t, status=BroadcastStatusType.BROADCASTING)
canceller = create_user(email='canceller@gov.uk')
sample_service.users.append(canceller)
mock_task = mocker.patch('app.celery.broadcast_message_tasks.send_broadcast_message.apply_async')
mock_task = mocker.patch('app.celery.broadcast_message_tasks.send_broadcast_event.apply_async')
response = admin_request.post(
'broadcast_message.update_broadcast_message_status',
@@ -280,13 +281,16 @@ def test_update_broadcast_message_status_stores_cancelled_by_and_cancelled_at(ad
_expected_status=200
)
assert len(bm.events) == 1
cancel_event = bm.events[0]
cancel_id = str(cancel_event.id)
mock_task.assert_called_once_with(kwargs={'broadcast_event_id': cancel_id}, queue='notify-internal-tasks')
assert response['status'] == BroadcastStatusType.CANCELLED
assert response['cancelled_at'] is not None
assert response['cancelled_by_id'] == str(canceller.id)
mock_task.assert_called_once_with(kwargs={'broadcast_message_id': str(bm.id)}, queue='notify-internal-tasks')
assert len(bm.events) == 1
cancel_event = bm.events[0]
assert cancel_event.service_id == sample_service.id
assert cancel_event.transmitted_areas == bm.areas
assert cancel_event.message_type == BroadcastEventMessageType.CANCEL
@@ -303,7 +307,7 @@ def test_update_broadcast_message_status_stores_approved_by_and_approved_at_and_
bm = create_broadcast_message(t, status=BroadcastStatusType.PENDING_APPROVAL)
approver = create_user(email='approver@gov.uk')
sample_service.users.append(approver)
mock_task = mocker.patch('app.celery.broadcast_message_tasks.send_broadcast_message.apply_async')
mock_task = mocker.patch('app.celery.broadcast_message_tasks.send_broadcast_event.apply_async')
response = admin_request.post(
'broadcast_message.update_broadcast_message_status',
@@ -316,10 +320,12 @@ def test_update_broadcast_message_status_stores_approved_by_and_approved_at_and_
assert response['status'] == BroadcastStatusType.BROADCASTING
assert response['approved_at'] is not None
assert response['approved_by_id'] == str(approver.id)
mock_task.assert_called_once_with(kwargs={'broadcast_message_id': str(bm.id)}, queue='notify-internal-tasks')
assert len(bm.events) == 1
alert_event = bm.events[0]
mock_task.assert_called_once_with(kwargs={'broadcast_event_id': str(alert_event.id)}, queue='notify-internal-tasks')
assert alert_event.service_id == sample_service.id
assert alert_event.transmitted_areas == bm.areas
assert alert_event.message_type == BroadcastEventMessageType.ALERT
@@ -334,7 +340,7 @@ def test_update_broadcast_message_status_rejects_approval_from_creator(
):
t = create_template(sample_service, BROADCAST_TYPE)
bm = create_broadcast_message(t, status=BroadcastStatusType.PENDING_APPROVAL)
mock_task = mocker.patch('app.celery.broadcast_message_tasks.send_broadcast_message.apply_async')
mock_task = mocker.patch('app.celery.broadcast_message_tasks.send_broadcast_event.apply_async')
response = admin_request.post(
'broadcast_message.update_broadcast_message_status',
@@ -358,7 +364,7 @@ def test_update_broadcast_message_status_allows_platform_admin_to_approve_own_me
user.platform_admin = True
t = create_template(sample_service, BROADCAST_TYPE)
bm = create_broadcast_message(t, status=BroadcastStatusType.PENDING_APPROVAL)
mock_task = mocker.patch('app.celery.broadcast_message_tasks.send_broadcast_message.apply_async')
mock_task = mocker.patch('app.celery.broadcast_message_tasks.send_broadcast_event.apply_async')
response = admin_request.post(
'broadcast_message.update_broadcast_message_status',
@@ -372,7 +378,10 @@ def test_update_broadcast_message_status_allows_platform_admin_to_approve_own_me
assert response['approved_at'] is not None
assert response['created_by_id'] == str(user.id)
assert response['approved_by_id'] == str(user.id)
mock_task.assert_called_once_with(kwargs={'broadcast_message_id': str(bm.id)}, queue='notify-internal-tasks')
mock_task.assert_called_once_with(
kwargs={'broadcast_event_id': str(bm.events[0].id)},
queue='notify-internal-tasks'
)
def test_update_broadcast_message_status_allows_trial_mode_services_to_approve_own_message(
@@ -384,7 +393,7 @@ def test_update_broadcast_message_status_allows_trial_mode_services_to_approve_o
sample_service.restricted = True
t = create_template(sample_service, BROADCAST_TYPE)
bm = create_broadcast_message(t, status=BroadcastStatusType.PENDING_APPROVAL)
mock_task = mocker.patch('app.celery.broadcast_message_tasks.send_broadcast_message.apply_async')
mock_task = mocker.patch('app.celery.broadcast_message_tasks.send_broadcast_event.apply_async')
response = admin_request.post(
'broadcast_message.update_broadcast_message_status',
@@ -398,7 +407,7 @@ def test_update_broadcast_message_status_allows_trial_mode_services_to_approve_o
assert response['approved_at'] is not None
assert response['created_by_id'] == str(t.created_by_id)
assert response['approved_by_id'] == str(t.created_by_id)
mock_task.assert_called_once_with(kwargs={'broadcast_message_id': str(bm.id)}, queue='notify-internal-tasks')
mock_task.assert_called_once_with(kwargs={'broadcast_event_id': ANY}, queue='notify-internal-tasks')
def test_update_broadcast_message_status_rejects_approval_from_user_not_on_that_service(
@@ -409,7 +418,7 @@ def test_update_broadcast_message_status_rejects_approval_from_user_not_on_that_
t = create_template(sample_service, BROADCAST_TYPE)
bm = create_broadcast_message(t, status=BroadcastStatusType.PENDING_APPROVAL)
approver = create_user(email='approver@gov.uk')
mock_task = mocker.patch('app.celery.broadcast_message_tasks.send_broadcast_message.apply_async')
mock_task = mocker.patch('app.celery.broadcast_message_tasks.send_broadcast_event.apply_async')
response = admin_request.post(
'broadcast_message.update_broadcast_message_status',
@@ -441,7 +450,7 @@ def test_update_broadcast_message_status_restricts_status_transitions_to_explici
bm = create_broadcast_message(t, status=current_status)
approver = create_user(email='approver@gov.uk')
sample_service.users.append(approver)
mock_task = mocker.patch('app.celery.broadcast_message_tasks.send_broadcast_message.apply_async')
mock_task = mocker.patch('app.celery.broadcast_message_tasks.send_broadcast_event.apply_async')
response = admin_request.post(
'broadcast_message.update_broadcast_message_status',

View File

@@ -1,61 +1,118 @@
from freezegun import freeze_time
import pytest
import requests_mock
from requests import RequestException
from app.dao.templates_dao import dao_update_template
from app.models import BROADCAST_TYPE, BroadcastStatusType
from app.celery.broadcast_message_tasks import send_broadcast_message
from tests.app.db import create_template, create_broadcast_message
from app.models import BROADCAST_TYPE, BroadcastStatusType, BroadcastEventMessageType
from app.celery.broadcast_message_tasks import send_broadcast_message, send_broadcast_event
from tests.app.db import create_template, create_broadcast_message, create_broadcast_event
def test_send_broadcast_message_sends_data_correctly(sample_service):
t = create_template(sample_service, BROADCAST_TYPE)
bm = create_broadcast_message(t, areas=['london'], status=BroadcastStatusType.BROADCASTING)
template = create_template(sample_service, BROADCAST_TYPE)
broadcast_message = create_broadcast_message(template, areas=['london'], status=BroadcastStatusType.BROADCASTING)
with requests_mock.Mocker() as request_mock:
request_mock.post("http://test-cbc-proxy/broadcasts/stub-1", json={'valid': 'true'}, status_code=200)
send_broadcast_message(broadcast_message_id=str(bm.id))
send_broadcast_message(broadcast_message_id=str(broadcast_message.id))
assert request_mock.call_count == 1
assert request_mock.request_history[0].method == 'POST'
assert request_mock.request_history[0].headers["Content-type"] == "application/json"
cbc_json = request_mock.request_history[0].json()
assert cbc_json['template']['id'] == str(t.id)
assert cbc_json['template']['id'] == str(template.id)
assert cbc_json['template']['template_type'] == BROADCAST_TYPE
assert cbc_json['broadcast_message']['areas'] == ['london']
def test_send_broadcast_message_sends_old_version_of_template(sample_service):
t = create_template(sample_service, BROADCAST_TYPE, content='first content')
bm = create_broadcast_message(t, areas=['london'], status=BroadcastStatusType.BROADCASTING)
template = create_template(sample_service, BROADCAST_TYPE, content='first content')
broadcast_message = create_broadcast_message(template, areas=['london'], status=BroadcastStatusType.BROADCASTING)
t.content = 'second content'
dao_update_template(t)
assert t.version == 2
template.content = 'second content'
dao_update_template(template)
assert template.version == 2
with requests_mock.Mocker() as request_mock:
request_mock.post("http://test-cbc-proxy/broadcasts/stub-1", json={'valid': 'true'}, status_code=200)
send_broadcast_message(broadcast_message_id=str(bm.id))
send_broadcast_message(broadcast_message_id=str(broadcast_message.id))
assert request_mock.call_count == 1
assert request_mock.request_history[0].method == 'POST'
assert request_mock.request_history[0].headers["Content-type"] == "application/json"
cbc_json = request_mock.request_history[0].json()
assert cbc_json['template']['id'] == str(t.id)
assert cbc_json['template']['id'] == str(template.id)
assert cbc_json['template']['version'] == 1
assert cbc_json['template']['content'] == 'first content'
def test_send_broadcast_message_errors(sample_service):
t = create_template(sample_service, BROADCAST_TYPE)
bm = create_broadcast_message(t, status=BroadcastStatusType.BROADCASTING)
template = create_template(sample_service, BROADCAST_TYPE)
broadcast_message = create_broadcast_message(template, status=BroadcastStatusType.BROADCASTING)
with requests_mock.Mocker() as request_mock:
request_mock.post("http://test-cbc-proxy/broadcasts/stub-1", text='503 bad gateway', status_code=503)
# we're not retrying or anything for the moment - but this'll ensure any exception gets logged
with pytest.raises(RequestException) as ex:
send_broadcast_message(broadcast_message_id=str(bm.id))
send_broadcast_message(broadcast_message_id=str(broadcast_message.id))
assert ex.value.response.status_code == 503
@freeze_time('2020-08-01 12:00')
def test_send_broadcast_event_sends_data_correctly(sample_service):
template = create_template(sample_service, BROADCAST_TYPE)
broadcast_message = create_broadcast_message(template, areas=['london'], status=BroadcastStatusType.BROADCASTING)
event = create_broadcast_event(broadcast_message)
with requests_mock.Mocker() as request_mock:
request_mock.post("http://test-cbc-proxy/broadcasts/events/stub-1", json={'valid': 'true'}, status_code=200)
send_broadcast_event(broadcast_event_id=str(event.id))
assert request_mock.call_count == 1
assert request_mock.request_history[0].method == 'POST'
assert request_mock.request_history[0].headers["Content-type"] == "application/json"
cbc_json = request_mock.request_history[0].json()
assert cbc_json['id'] == str(event.id)
assert cbc_json['broadcast_message_id'] == str(broadcast_message.id)
assert cbc_json['sent_at'] == '2020-08-01T12:00:00.000000Z'
assert cbc_json['transmitted_starts_at'] is None
assert cbc_json['transmitted_areas'] == ['london']
def test_send_broadcast_event_sends_references(sample_service):
template = create_template(sample_service, BROADCAST_TYPE, content='content')
broadcast_message = create_broadcast_message(template, areas=['london'], status=BroadcastStatusType.BROADCASTING)
alert_event = create_broadcast_event(broadcast_message, message_type=BroadcastEventMessageType.ALERT)
cancel_event = create_broadcast_event(broadcast_message, message_type=BroadcastEventMessageType.CANCEL)
with requests_mock.Mocker() as request_mock:
request_mock.post("http://test-cbc-proxy/broadcasts/events/stub-1", json={'valid': 'true'}, status_code=200)
send_broadcast_event(broadcast_event_id=str(cancel_event.id))
assert request_mock.call_count == 1
assert request_mock.request_history[0].method == 'POST'
assert request_mock.request_history[0].headers["Content-type"] == "application/json"
cbc_json = request_mock.request_history[0].json()
assert cbc_json['id'] == str(cancel_event.id)
assert cbc_json['message_type'] == cancel_event.message_type
assert cbc_json['previous_event_references'] == [alert_event.reference]
def test_send_broadcast_event_errors(sample_service):
template = create_template(sample_service, BROADCAST_TYPE)
broadcast_message = create_broadcast_message(template, status=BroadcastStatusType.BROADCASTING)
event = create_broadcast_event(broadcast_message)
with requests_mock.Mocker() as request_mock:
request_mock.post("http://test-cbc-proxy/broadcasts/events/stub-1", text='503 bad gateway', status_code=503)
# we're not retrying or anything for the moment - but this'll ensure any exception gets logged
with pytest.raises(RequestException) as ex:
send_broadcast_event(broadcast_event_id=str(event.id))
assert ex.value.response.status_code == 503

View File

@@ -1043,7 +1043,7 @@ def create_broadcast_event(
transmitted_areas=transmitted_areas or ['london'],
transmitted_sender=transmitted_sender or 'www.notifications.service.gov.uk',
transmitted_starts_at=transmitted_starts_at,
transmitted_finishes_at=transmitted_finishes_at,
transmitted_finishes_at=transmitted_finishes_at or datetime.utcnow(),
)
db.session.add(b_e)
db.session.commit()