Cancel broadcast via API

When a service sends us an XML CAP broadcast message with Cancel
status, and that broadcast is in broadcasting state, we cancel it.
This commit is contained in:
Pea Tyczynska
2022-01-14 14:59:38 +00:00
parent 940126abfb
commit 3b4a9d8942
2 changed files with 59 additions and 5 deletions

View File

@@ -155,8 +155,54 @@ def test_valid_cancel_broadcast_request_rejects_unapproved_alert_and_returns_201
assert response_json_for_reject['updated_at'] is not None
def test_valid_cancel_broadcast_request_cancels_active_alert_and_returns_201():
pass
def test_valid_cancel_broadcast_request_cancels_active_alert_and_returns_201(
client,
sample_broadcast_service,
notify_db,
notify_db_session,
mocker
):
auth_header = create_service_authorization_header(service_id=sample_broadcast_service.id)
# create a broadcast
response_for_create = client.post(
path='/v2/broadcast',
data=sample_cap_xml_documents.WAINFLEET,
headers=[('Content-Type', 'application/cap+xml'), auth_header],
)
assert response_for_create.status_code == 201
response_json_for_create = json.loads(response_for_create.get_data(as_text=True))
assert response_json_for_create['cancelled_at'] is None
assert response_json_for_create['cancelled_by_id'] is None
assert response_json_for_create['reference'] == '50385fcb0ab7aa447bbd46d848ce8466E'
assert response_json_for_create['status'] == 'pending-approval'
# approve broadcast
broadcast_message = dao_get_broadcast_message_by_id_and_service_id(
response_json_for_create["id"], response_json_for_create["service_id"]
)
broadcast_message.status = 'broadcasting'
# cancel broadcast
mock_task = mocker.patch('app.celery.broadcast_message_tasks.send_broadcast_event.apply_async')
response_for_cancel = client.post(
path='/v2/broadcast',
data=sample_cap_xml_documents.WAINFLEET_CANCEL,
headers=[('Content-Type', 'application/cap+xml'), auth_header],
)
assert response_for_cancel.status_code == 201
assert len(broadcast_message.events) == 1
alert_event = broadcast_message.events[0]
mock_task.assert_called_once_with(kwargs={'broadcast_event_id': str(alert_event.id)}, queue='broadcast-tasks')
assert broadcast_message.status == 'cancelled'
assert broadcast_message.cancelled_at is not None
assert broadcast_message.cancelled_by_id is None # broadcast cancelled via API, so not by any single user
assert broadcast_message.updated_at is not None
def test_cancel_request_does_not_cancel_broadcast_if_reference_does_not_match():