diff --git a/app/broadcast_message/rest.py b/app/broadcast_message/rest.py index ab9f4f292..322f6bce2 100644 --- a/app/broadcast_message/rest.py +++ b/app/broadcast_message/rest.py @@ -14,7 +14,7 @@ from app.dao.broadcast_message_dao import ( from app.dao.services_dao import dao_fetch_service_by_id from app.errors import register_errors, InvalidRequest from app.models import BroadcastMessage, BroadcastStatusType, BroadcastEvent, BroadcastEventMessageType -from app.celery.broadcast_message_tasks import send_broadcast_message +from app.celery.broadcast_message_tasks import send_broadcast_event from app.broadcast_message.broadcast_message_schema import ( create_broadcast_message_schema, update_broadcast_message_schema, @@ -196,7 +196,7 @@ def _create_broadcast_event(broadcast_message): # save to the DB dao_create_broadcast_message(event) - send_broadcast_message.apply_async( - kwargs={'broadcast_message_id': str(broadcast_message.id)}, + send_broadcast_event.apply_async( + kwargs={'broadcast_event_id': str(event.id)}, queue=QueueNames.NOTIFY ) diff --git a/app/celery/broadcast_message_tasks.py b/app/celery/broadcast_message_tasks.py index 837bf352b..97cc11b9a 100644 --- a/app/celery/broadcast_message_tasks.py +++ b/app/celery/broadcast_message_tasks.py @@ -4,7 +4,7 @@ from notifications_utils.statsd_decorators import statsd from app import notify_celery -from app.dao.broadcast_message_dao import dao_get_broadcast_message_by_id +from app.dao.broadcast_message_dao import dao_get_broadcast_message_by_id, dao_get_broadcast_event_by_id @notify_celery.task(name="send-broadcast-message") @@ -35,3 +35,27 @@ def send_broadcast_message(broadcast_message_id, provider='stub-1'): f'broadcast_message {broadcast_message.id} ' f'status {broadcast_message.status} sent to {provider}' ) + + +@notify_celery.task(name="send-broadcast-event") +@statsd(namespace="tasks") +def send_broadcast_event(broadcast_event_id, provider='stub-1'): + broadcast_event = dao_get_broadcast_event_by_id(broadcast_event_id) + + current_app.logger.info( + f'sending broadcast_event {broadcast_event.reference} ' + f'msgType {broadcast_event.message_type} to {provider}' + ) + + payload = broadcast_event.serialize() + + resp = requests.post( + f'{current_app.config["CBC_PROXY_URL"]}/broadcasts/events/{provider}', + json=payload + ) + resp.raise_for_status() + + current_app.logger.info( + f'broadcast_event {broadcast_event.reference} ' + f'msgType {broadcast_event.message_type} sent to {provider}' + ) diff --git a/app/dao/broadcast_message_dao.py b/app/dao/broadcast_message_dao.py index c40c1bdab..68b208352 100644 --- a/app/dao/broadcast_message_dao.py +++ b/app/dao/broadcast_message_dao.py @@ -24,6 +24,10 @@ def dao_get_broadcast_message_by_id(broadcast_message_id): return BroadcastMessage.query.get(broadcast_message_id) +def dao_get_broadcast_event_by_id(broadcast_event_id): + return BroadcastEvent.query.get(broadcast_event_id) + + def dao_get_broadcast_messages_for_service(service_id): return BroadcastMessage.query.filter( BroadcastMessage.service_id == service_id diff --git a/app/models.py b/app/models.py index f2e71ca50..72e900af4 100644 --- a/app/models.py +++ b/app/models.py @@ -2338,20 +2338,19 @@ class BroadcastEvent(db.Model): def get_earlier_message_references(self): from app.dao.broadcast_message_dao import get_earlier_events_for_broadcast_event - return [event.reference for event in get_earlier_events_for_broadcast_event(self)] + return [event.reference for event in get_earlier_events_for_broadcast_event(self.id)] def serialize(self): return { - 'id': self.id, + 'id': str(self.id), - 'service_id': self.service_id, - - 'reference': self.reference, + 'service_id': str(self.service_id), 'previous_event_references': self.get_earlier_message_references(), - 'broadcast_message_id': self.broadcast_message_id, - 'sent_at': self.sent_at, + 'broadcast_message_id': str(self.broadcast_message_id), + # sent_at is required by BroadcastMessageTemplate.from_broadcast_event + 'sent_at': self.sent_at.strftime(DATETIME_FORMAT), 'message_type': self.message_type, 'transmitted_content': self.transmitted_content, @@ -2359,6 +2358,7 @@ class BroadcastEvent(db.Model): 'transmitted_sender': self.transmitted_sender, 'transmitted_starts_at': get_dt_string_or_none(self.transmitted_starts_at), - 'transmitted_finishes_at': get_dt_string_or_none(self.transmitted_finishes_at), + # transmitted_finishes_at is required by BroadcastMessageTemplate.from_broadcast_event + 'transmitted_finishes_at': self.transmitted_finishes_at.strftime(DATETIME_FORMAT), } diff --git a/tests/app/broadcast_message/test_rest.py b/tests/app/broadcast_message/test_rest.py index 50ee25ea5..d9c04cc9d 100644 --- a/tests/app/broadcast_message/test_rest.py +++ b/tests/app/broadcast_message/test_rest.py @@ -1,9 +1,10 @@ +from unittest.mock import ANY import uuid from freezegun import freeze_time import pytest -from app.models import BROADCAST_TYPE, BroadcastStatusType, BroadcastEvent, BroadcastEventMessageType +from app.models import BROADCAST_TYPE, BroadcastStatusType, BroadcastEventMessageType from tests.app.db import create_broadcast_message, create_template, create_service, create_user @@ -270,7 +271,7 @@ def test_update_broadcast_message_status_stores_cancelled_by_and_cancelled_at(ad bm = create_broadcast_message(t, status=BroadcastStatusType.BROADCASTING) canceller = create_user(email='canceller@gov.uk') sample_service.users.append(canceller) - mock_task = mocker.patch('app.celery.broadcast_message_tasks.send_broadcast_message.apply_async') + mock_task = mocker.patch('app.celery.broadcast_message_tasks.send_broadcast_event.apply_async') response = admin_request.post( 'broadcast_message.update_broadcast_message_status', @@ -280,13 +281,16 @@ def test_update_broadcast_message_status_stores_cancelled_by_and_cancelled_at(ad _expected_status=200 ) + assert len(bm.events) == 1 + cancel_event = bm.events[0] + + cancel_id = str(cancel_event.id) + + mock_task.assert_called_once_with(kwargs={'broadcast_event_id': cancel_id}, queue='notify-internal-tasks') assert response['status'] == BroadcastStatusType.CANCELLED assert response['cancelled_at'] is not None assert response['cancelled_by_id'] == str(canceller.id) - mock_task.assert_called_once_with(kwargs={'broadcast_message_id': str(bm.id)}, queue='notify-internal-tasks') - assert len(bm.events) == 1 - cancel_event = bm.events[0] assert cancel_event.service_id == sample_service.id assert cancel_event.transmitted_areas == bm.areas assert cancel_event.message_type == BroadcastEventMessageType.CANCEL @@ -303,7 +307,7 @@ def test_update_broadcast_message_status_stores_approved_by_and_approved_at_and_ bm = create_broadcast_message(t, status=BroadcastStatusType.PENDING_APPROVAL) approver = create_user(email='approver@gov.uk') sample_service.users.append(approver) - mock_task = mocker.patch('app.celery.broadcast_message_tasks.send_broadcast_message.apply_async') + mock_task = mocker.patch('app.celery.broadcast_message_tasks.send_broadcast_event.apply_async') response = admin_request.post( 'broadcast_message.update_broadcast_message_status', @@ -316,10 +320,12 @@ def test_update_broadcast_message_status_stores_approved_by_and_approved_at_and_ assert response['status'] == BroadcastStatusType.BROADCASTING assert response['approved_at'] is not None assert response['approved_by_id'] == str(approver.id) - mock_task.assert_called_once_with(kwargs={'broadcast_message_id': str(bm.id)}, queue='notify-internal-tasks') assert len(bm.events) == 1 alert_event = bm.events[0] + + mock_task.assert_called_once_with(kwargs={'broadcast_event_id': str(alert_event.id)}, queue='notify-internal-tasks') + assert alert_event.service_id == sample_service.id assert alert_event.transmitted_areas == bm.areas assert alert_event.message_type == BroadcastEventMessageType.ALERT @@ -334,7 +340,7 @@ def test_update_broadcast_message_status_rejects_approval_from_creator( ): t = create_template(sample_service, BROADCAST_TYPE) bm = create_broadcast_message(t, status=BroadcastStatusType.PENDING_APPROVAL) - mock_task = mocker.patch('app.celery.broadcast_message_tasks.send_broadcast_message.apply_async') + mock_task = mocker.patch('app.celery.broadcast_message_tasks.send_broadcast_event.apply_async') response = admin_request.post( 'broadcast_message.update_broadcast_message_status', @@ -358,7 +364,7 @@ def test_update_broadcast_message_status_allows_platform_admin_to_approve_own_me user.platform_admin = True t = create_template(sample_service, BROADCAST_TYPE) bm = create_broadcast_message(t, status=BroadcastStatusType.PENDING_APPROVAL) - mock_task = mocker.patch('app.celery.broadcast_message_tasks.send_broadcast_message.apply_async') + mock_task = mocker.patch('app.celery.broadcast_message_tasks.send_broadcast_event.apply_async') response = admin_request.post( 'broadcast_message.update_broadcast_message_status', @@ -372,7 +378,10 @@ def test_update_broadcast_message_status_allows_platform_admin_to_approve_own_me assert response['approved_at'] is not None assert response['created_by_id'] == str(user.id) assert response['approved_by_id'] == str(user.id) - mock_task.assert_called_once_with(kwargs={'broadcast_message_id': str(bm.id)}, queue='notify-internal-tasks') + mock_task.assert_called_once_with( + kwargs={'broadcast_event_id': str(bm.events[0].id)}, + queue='notify-internal-tasks' + ) def test_update_broadcast_message_status_allows_trial_mode_services_to_approve_own_message( @@ -384,7 +393,7 @@ def test_update_broadcast_message_status_allows_trial_mode_services_to_approve_o sample_service.restricted = True t = create_template(sample_service, BROADCAST_TYPE) bm = create_broadcast_message(t, status=BroadcastStatusType.PENDING_APPROVAL) - mock_task = mocker.patch('app.celery.broadcast_message_tasks.send_broadcast_message.apply_async') + mock_task = mocker.patch('app.celery.broadcast_message_tasks.send_broadcast_event.apply_async') response = admin_request.post( 'broadcast_message.update_broadcast_message_status', @@ -398,7 +407,7 @@ def test_update_broadcast_message_status_allows_trial_mode_services_to_approve_o assert response['approved_at'] is not None assert response['created_by_id'] == str(t.created_by_id) assert response['approved_by_id'] == str(t.created_by_id) - mock_task.assert_called_once_with(kwargs={'broadcast_message_id': str(bm.id)}, queue='notify-internal-tasks') + mock_task.assert_called_once_with(kwargs={'broadcast_event_id': ANY}, queue='notify-internal-tasks') def test_update_broadcast_message_status_rejects_approval_from_user_not_on_that_service( @@ -409,7 +418,7 @@ def test_update_broadcast_message_status_rejects_approval_from_user_not_on_that_ t = create_template(sample_service, BROADCAST_TYPE) bm = create_broadcast_message(t, status=BroadcastStatusType.PENDING_APPROVAL) approver = create_user(email='approver@gov.uk') - mock_task = mocker.patch('app.celery.broadcast_message_tasks.send_broadcast_message.apply_async') + mock_task = mocker.patch('app.celery.broadcast_message_tasks.send_broadcast_event.apply_async') response = admin_request.post( 'broadcast_message.update_broadcast_message_status', @@ -441,7 +450,7 @@ def test_update_broadcast_message_status_restricts_status_transitions_to_explici bm = create_broadcast_message(t, status=current_status) approver = create_user(email='approver@gov.uk') sample_service.users.append(approver) - mock_task = mocker.patch('app.celery.broadcast_message_tasks.send_broadcast_message.apply_async') + mock_task = mocker.patch('app.celery.broadcast_message_tasks.send_broadcast_event.apply_async') response = admin_request.post( 'broadcast_message.update_broadcast_message_status', diff --git a/tests/app/celery/test_broadcast_message_tasks.py b/tests/app/celery/test_broadcast_message_tasks.py index 1622c2a7e..b39744e2f 100644 --- a/tests/app/celery/test_broadcast_message_tasks.py +++ b/tests/app/celery/test_broadcast_message_tasks.py @@ -1,61 +1,118 @@ +from freezegun import freeze_time import pytest import requests_mock from requests import RequestException from app.dao.templates_dao import dao_update_template -from app.models import BROADCAST_TYPE, BroadcastStatusType -from app.celery.broadcast_message_tasks import send_broadcast_message -from tests.app.db import create_template, create_broadcast_message +from app.models import BROADCAST_TYPE, BroadcastStatusType, BroadcastEventMessageType +from app.celery.broadcast_message_tasks import send_broadcast_message, send_broadcast_event +from tests.app.db import create_template, create_broadcast_message, create_broadcast_event def test_send_broadcast_message_sends_data_correctly(sample_service): - t = create_template(sample_service, BROADCAST_TYPE) - bm = create_broadcast_message(t, areas=['london'], status=BroadcastStatusType.BROADCASTING) + template = create_template(sample_service, BROADCAST_TYPE) + broadcast_message = create_broadcast_message(template, areas=['london'], status=BroadcastStatusType.BROADCASTING) with requests_mock.Mocker() as request_mock: request_mock.post("http://test-cbc-proxy/broadcasts/stub-1", json={'valid': 'true'}, status_code=200) - send_broadcast_message(broadcast_message_id=str(bm.id)) + send_broadcast_message(broadcast_message_id=str(broadcast_message.id)) assert request_mock.call_count == 1 assert request_mock.request_history[0].method == 'POST' assert request_mock.request_history[0].headers["Content-type"] == "application/json" cbc_json = request_mock.request_history[0].json() - assert cbc_json['template']['id'] == str(t.id) + assert cbc_json['template']['id'] == str(template.id) assert cbc_json['template']['template_type'] == BROADCAST_TYPE assert cbc_json['broadcast_message']['areas'] == ['london'] def test_send_broadcast_message_sends_old_version_of_template(sample_service): - t = create_template(sample_service, BROADCAST_TYPE, content='first content') - bm = create_broadcast_message(t, areas=['london'], status=BroadcastStatusType.BROADCASTING) + template = create_template(sample_service, BROADCAST_TYPE, content='first content') + broadcast_message = create_broadcast_message(template, areas=['london'], status=BroadcastStatusType.BROADCASTING) - t.content = 'second content' - dao_update_template(t) - assert t.version == 2 + template.content = 'second content' + dao_update_template(template) + assert template.version == 2 with requests_mock.Mocker() as request_mock: request_mock.post("http://test-cbc-proxy/broadcasts/stub-1", json={'valid': 'true'}, status_code=200) - send_broadcast_message(broadcast_message_id=str(bm.id)) + send_broadcast_message(broadcast_message_id=str(broadcast_message.id)) assert request_mock.call_count == 1 assert request_mock.request_history[0].method == 'POST' assert request_mock.request_history[0].headers["Content-type"] == "application/json" cbc_json = request_mock.request_history[0].json() - assert cbc_json['template']['id'] == str(t.id) + assert cbc_json['template']['id'] == str(template.id) assert cbc_json['template']['version'] == 1 assert cbc_json['template']['content'] == 'first content' def test_send_broadcast_message_errors(sample_service): - t = create_template(sample_service, BROADCAST_TYPE) - bm = create_broadcast_message(t, status=BroadcastStatusType.BROADCASTING) + template = create_template(sample_service, BROADCAST_TYPE) + broadcast_message = create_broadcast_message(template, status=BroadcastStatusType.BROADCASTING) with requests_mock.Mocker() as request_mock: request_mock.post("http://test-cbc-proxy/broadcasts/stub-1", text='503 bad gateway', status_code=503) # we're not retrying or anything for the moment - but this'll ensure any exception gets logged with pytest.raises(RequestException) as ex: - send_broadcast_message(broadcast_message_id=str(bm.id)) + send_broadcast_message(broadcast_message_id=str(broadcast_message.id)) + + assert ex.value.response.status_code == 503 + + +@freeze_time('2020-08-01 12:00') +def test_send_broadcast_event_sends_data_correctly(sample_service): + template = create_template(sample_service, BROADCAST_TYPE) + broadcast_message = create_broadcast_message(template, areas=['london'], status=BroadcastStatusType.BROADCASTING) + event = create_broadcast_event(broadcast_message) + + with requests_mock.Mocker() as request_mock: + request_mock.post("http://test-cbc-proxy/broadcasts/events/stub-1", json={'valid': 'true'}, status_code=200) + send_broadcast_event(broadcast_event_id=str(event.id)) + + assert request_mock.call_count == 1 + assert request_mock.request_history[0].method == 'POST' + assert request_mock.request_history[0].headers["Content-type"] == "application/json" + + cbc_json = request_mock.request_history[0].json() + assert cbc_json['id'] == str(event.id) + assert cbc_json['broadcast_message_id'] == str(broadcast_message.id) + assert cbc_json['sent_at'] == '2020-08-01T12:00:00.000000Z' + assert cbc_json['transmitted_starts_at'] is None + assert cbc_json['transmitted_areas'] == ['london'] + + +def test_send_broadcast_event_sends_references(sample_service): + template = create_template(sample_service, BROADCAST_TYPE, content='content') + broadcast_message = create_broadcast_message(template, areas=['london'], status=BroadcastStatusType.BROADCASTING) + alert_event = create_broadcast_event(broadcast_message, message_type=BroadcastEventMessageType.ALERT) + cancel_event = create_broadcast_event(broadcast_message, message_type=BroadcastEventMessageType.CANCEL) + + with requests_mock.Mocker() as request_mock: + request_mock.post("http://test-cbc-proxy/broadcasts/events/stub-1", json={'valid': 'true'}, status_code=200) + send_broadcast_event(broadcast_event_id=str(cancel_event.id)) + + assert request_mock.call_count == 1 + assert request_mock.request_history[0].method == 'POST' + assert request_mock.request_history[0].headers["Content-type"] == "application/json" + + cbc_json = request_mock.request_history[0].json() + assert cbc_json['id'] == str(cancel_event.id) + assert cbc_json['message_type'] == cancel_event.message_type + assert cbc_json['previous_event_references'] == [alert_event.reference] + + +def test_send_broadcast_event_errors(sample_service): + template = create_template(sample_service, BROADCAST_TYPE) + broadcast_message = create_broadcast_message(template, status=BroadcastStatusType.BROADCASTING) + event = create_broadcast_event(broadcast_message) + + with requests_mock.Mocker() as request_mock: + request_mock.post("http://test-cbc-proxy/broadcasts/events/stub-1", text='503 bad gateway', status_code=503) + # we're not retrying or anything for the moment - but this'll ensure any exception gets logged + with pytest.raises(RequestException) as ex: + send_broadcast_event(broadcast_event_id=str(event.id)) assert ex.value.response.status_code == 503 diff --git a/tests/app/db.py b/tests/app/db.py index d8646d1e4..75d4edeaa 100644 --- a/tests/app/db.py +++ b/tests/app/db.py @@ -1043,7 +1043,7 @@ def create_broadcast_event( transmitted_areas=transmitted_areas or ['london'], transmitted_sender=transmitted_sender or 'www.notifications.service.gov.uk', transmitted_starts_at=transmitted_starts_at, - transmitted_finishes_at=transmitted_finishes_at, + transmitted_finishes_at=transmitted_finishes_at or datetime.utcnow(), ) db.session.add(b_e) db.session.commit()