Merge pull request #3429 from alphagov/cancel_alert_via_api

Cancel broadcast via API
This commit is contained in:
Pea Tyczynska
2022-01-21 14:04:35 +00:00
committed by GitHub
11 changed files with 666 additions and 489 deletions

View File

@@ -1,7 +1,5 @@
from datetime import datetime
import iso8601
from flask import Blueprint, current_app, jsonify, request
from flask import Blueprint, jsonify, request
from notifications_utils.template import BroadcastMessageTemplate
from app.broadcast_message.broadcast_message_schema import (
@@ -9,8 +7,9 @@ from app.broadcast_message.broadcast_message_schema import (
update_broadcast_message_schema,
update_broadcast_message_status_schema,
)
from app.celery.broadcast_message_tasks import send_broadcast_event
from app.config import QueueNames
from app.broadcast_message.utils import (
validate_and_update_broadcast_message_status,
)
from app.dao.broadcast_message_dao import (
dao_get_broadcast_message_by_id_and_service_id,
dao_get_broadcast_messages_for_service,
@@ -20,12 +19,7 @@ from app.dao.services_dao import dao_fetch_service_by_id
from app.dao.templates_dao import dao_get_template_by_id_and_service_id
from app.dao.users_dao import get_user_by_id
from app.errors import InvalidRequest, register_errors
from app.models import (
BroadcastEvent,
BroadcastEventMessageType,
BroadcastMessage,
BroadcastStatusType,
)
from app.models import BroadcastMessage, BroadcastStatusType
from app.schema_validation import validate
broadcast_message_blueprint = Blueprint(
@@ -42,47 +36,6 @@ def _parse_nullable_datetime(dt):
return dt
def _update_broadcast_message(broadcast_message, new_status, updating_user):
if updating_user not in broadcast_message.service.users:
# we allow platform admins to cancel broadcasts
if not (new_status == BroadcastStatusType.CANCELLED and updating_user.platform_admin):
raise InvalidRequest(
f'User {updating_user.id} cannot update broadcast_message {broadcast_message.id} from other service',
status_code=400
)
if new_status not in BroadcastStatusType.ALLOWED_STATUS_TRANSITIONS[broadcast_message.status]:
raise InvalidRequest(
f'Cannot move broadcast_message {broadcast_message.id} from {broadcast_message.status} to {new_status}',
status_code=400
)
if new_status == BroadcastStatusType.BROADCASTING:
# training mode services can approve their own broadcasts
if updating_user == broadcast_message.created_by and not broadcast_message.service.restricted:
raise InvalidRequest(
f'User {updating_user.id} cannot approve their own broadcast_message {broadcast_message.id}',
status_code=400
)
elif len(broadcast_message.areas['simple_polygons']) == 0:
raise InvalidRequest(
f'broadcast_message {broadcast_message.id} has no selected areas and so cannot be broadcasted.',
status_code=400
)
else:
broadcast_message.approved_at = datetime.utcnow()
broadcast_message.approved_by = updating_user
if new_status == BroadcastStatusType.CANCELLED:
broadcast_message.cancelled_at = datetime.utcnow()
broadcast_message.cancelled_by = updating_user
current_app.logger.info(
f'broadcast_message {broadcast_message.id} moving from {broadcast_message.status} to {new_status}'
)
broadcast_message.status = new_status
@broadcast_message_blueprint.route('', methods=['GET'])
def get_broadcast_messages_for_service(service_id):
# TODO: should this return template content/data in some way? or can we rely on them being cached admin side.
@@ -201,54 +154,14 @@ def update_broadcast_message_status(service_id, broadcast_message_id):
new_status = data['status']
updating_user = get_user_by_id(data['created_by'])
_update_broadcast_message(broadcast_message, new_status, updating_user)
dao_save_object(broadcast_message)
if updating_user not in broadcast_message.service.users:
# we allow platform admins to cancel broadcasts, and we don't check user if request was done via API
if not (new_status == BroadcastStatusType.CANCELLED and updating_user.platform_admin):
raise InvalidRequest(
f'User {updating_user.id} cannot update broadcast_message {broadcast_message.id} from other service',
status_code=400
)
if new_status in {BroadcastStatusType.BROADCASTING, BroadcastStatusType.CANCELLED}:
_create_broadcast_event(broadcast_message)
validate_and_update_broadcast_message_status(broadcast_message, new_status, updating_user)
return jsonify(broadcast_message.serialize()), 200
def _create_broadcast_event(broadcast_message):
"""
If the service is live and the broadcast message is not stubbed, creates a broadcast event, stores it in the
database, and triggers the task to send the CAP XML off.
"""
service = broadcast_message.service
if not broadcast_message.stubbed and not service.restricted:
msg_types = {
BroadcastStatusType.BROADCASTING: BroadcastEventMessageType.ALERT,
BroadcastStatusType.CANCELLED: BroadcastEventMessageType.CANCEL,
}
event = BroadcastEvent(
service=service,
broadcast_message=broadcast_message,
message_type=msg_types[broadcast_message.status],
transmitted_content={"body": broadcast_message.content},
transmitted_areas=broadcast_message.areas,
# TODO: Probably move this somewhere more standalone too and imply that it shouldn't change. Should it
# include a service based identifier too? eg "flood-warnings@notifications.service.gov.uk" or similar
transmitted_sender='notifications.service.gov.uk',
# TODO: Should this be set to now? Or the original starts_at?
transmitted_starts_at=broadcast_message.starts_at,
transmitted_finishes_at=broadcast_message.finishes_at,
)
dao_save_object(event)
send_broadcast_event.apply_async(
kwargs={'broadcast_event_id': str(event.id)},
queue=QueueNames.BROADCASTS
)
elif broadcast_message.stubbed != service.restricted:
# It's possible for a service to create a broadcast in trial mode, and then approve it after the
# service is live (or vice versa). We don't think it's safe to send such broadcasts, as the service
# has changed since they were created. Log an error instead.
current_app.logger.error(
f'Broadcast event not created. Stubbed status of broadcast message was {broadcast_message.stubbed}'
f' but service was {"in trial mode" if service.restricted else "live"}'
)

View File

@@ -7,6 +7,7 @@ def cap_xml_to_dict(cap_xml):
return {
"msgType": cap.alert.msgType.text,
"reference": cap.alert.identifier.text,
"references": cap.alert.references.text, # references to previous events belonging to the same alert
"cap_event": cap.alert.info.event.text,
"category": cap.alert.info.category.text,
"expires": cap.alert.info.expires.text,

View File

@@ -0,0 +1,95 @@
from datetime import datetime
from flask import current_app
from app.celery.broadcast_message_tasks import send_broadcast_event
from app.config import QueueNames
from app.dao.dao_utils import dao_save_object
from app.errors import InvalidRequest
from app.models import (
BroadcastEvent,
BroadcastEventMessageType,
BroadcastStatusType,
)
def validate_and_update_broadcast_message_status(broadcast_message, new_status, updating_user):
if new_status not in BroadcastStatusType.ALLOWED_STATUS_TRANSITIONS[broadcast_message.status]:
raise InvalidRequest(
f'Cannot move broadcast_message {broadcast_message.id} from {broadcast_message.status} to {new_status}',
status_code=400
)
if new_status == BroadcastStatusType.BROADCASTING:
# training mode services can approve their own broadcasts
if updating_user == broadcast_message.created_by and not broadcast_message.service.restricted:
raise InvalidRequest(
f'User {updating_user.id} cannot approve their own broadcast_message {broadcast_message.id}',
status_code=400
)
elif len(broadcast_message.areas['simple_polygons']) == 0:
raise InvalidRequest(
f'broadcast_message {broadcast_message.id} has no selected areas and so cannot be broadcasted.',
status_code=400
)
else:
broadcast_message.approved_at = datetime.utcnow()
broadcast_message.approved_by = updating_user
if new_status == BroadcastStatusType.CANCELLED:
broadcast_message.cancelled_at = datetime.utcnow()
broadcast_message.cancelled_by = updating_user
current_app.logger.info(
f'broadcast_message {broadcast_message.id} moving from {broadcast_message.status} to {new_status}'
)
broadcast_message.status = new_status
dao_save_object(broadcast_message)
if new_status in {BroadcastStatusType.BROADCASTING, BroadcastStatusType.CANCELLED}:
_create_broadcast_event(broadcast_message)
def _create_broadcast_event(broadcast_message):
"""
If the service is live and the broadcast message is not stubbed, creates a broadcast event, stores it in the
database, and triggers the task to send the CAP XML off.
"""
service = broadcast_message.service
if not broadcast_message.stubbed and not service.restricted:
msg_types = {
BroadcastStatusType.BROADCASTING: BroadcastEventMessageType.ALERT,
BroadcastStatusType.CANCELLED: BroadcastEventMessageType.CANCEL,
}
event = BroadcastEvent(
service=service,
broadcast_message=broadcast_message,
message_type=msg_types[broadcast_message.status],
transmitted_content={"body": broadcast_message.content},
transmitted_areas=broadcast_message.areas,
# TODO: Probably move this somewhere more standalone too and imply that it shouldn't change. Should it
# include a service based identifier too? eg "flood-warnings@notifications.service.gov.uk" or similar
transmitted_sender='notifications.service.gov.uk',
# TODO: Should this be set to now? Or the original starts_at?
transmitted_starts_at=broadcast_message.starts_at,
transmitted_finishes_at=broadcast_message.finishes_at,
)
dao_save_object(event)
send_broadcast_event.apply_async(
kwargs={'broadcast_event_id': str(event.id)},
queue=QueueNames.BROADCASTS
)
elif broadcast_message.stubbed != service.restricted:
# It's possible for a service to create a broadcast in trial mode, and then approve it after the
# service is live (or vice versa). We don't think it's safe to send such broadcasts, as the service
# has changed since they were created. Log an error instead.
current_app.logger.error(
f'Broadcast event not created. Stubbed status of broadcast message was {broadcast_message.stubbed}'
f' but service was {"in trial mode" if service.restricted else "live"}'
)

View File

@@ -24,6 +24,13 @@ def dao_get_broadcast_message_by_id_and_service_id(broadcast_message_id, service
).one()
def dao_get_broadcast_message_by_references_and_service_id(references_to_original_broadcast, service_id):
return BroadcastMessage.query.filter(
BroadcastMessage.reference.in_(references_to_original_broadcast),
BroadcastMessage.service_id == service_id
).one()
def dao_get_broadcast_event_by_id(broadcast_event_id):
return BroadcastEvent.query.filter(BroadcastEvent.id == broadcast_event_id).one()

View File

@@ -17,6 +17,12 @@ post_broadcast_schema = {
"null",
],
},
"references": {
"type": [
"string",
"null",
],
},
"cap_event": {
"type": [
"string",
@@ -63,10 +69,10 @@ post_broadcast_schema = {
"type": "string",
"enum": [
"Alert",
"Cancel",
# The following are valid CAP but not supported by our
# API at the moment
# "Update",
# "Cancel",
# "Ack",
# "Error",
],

View File

@@ -6,6 +6,12 @@ from notifications_utils.template import BroadcastMessageTemplate
from app import api_user, authenticated_service
from app.broadcast_message.translators import cap_xml_to_dict
from app.broadcast_message.utils import (
validate_and_update_broadcast_message_status,
)
from app.dao.broadcast_message_dao import (
dao_get_broadcast_message_by_references_and_service_id,
)
from app.dao.dao_utils import dao_save_object
from app.models import BROADCAST_TYPE, BroadcastMessage, BroadcastStatusType
from app.notifications.validators import check_service_has_permission
@@ -37,52 +43,77 @@ def create_broadcast():
message='Request data is not valid CAP XML',
status_code=400,
)
broadcast_json = cap_xml_to_dict(cap_xml)
validate(broadcast_json, post_broadcast_schema)
_validate_template(broadcast_json)
polygons = Polygons(list(chain.from_iterable((
[
[[y, x] for x, y in polygon]
for polygon in area['polygons']
] for area in broadcast_json['areas']
))))
if broadcast_json["msgType"] == "Cancel":
broadcast_message = _cancel_or_reject_broadcast(
broadcast_json["references"].split(","),
authenticated_service.id
)
return jsonify(broadcast_message.serialize()), 201
if len(polygons) > 12 or polygons.point_count > 250:
simple_polygons = polygons.smooth.simplify
else:
simple_polygons = polygons
_validate_template(broadcast_json)
broadcast_message = BroadcastMessage(
service_id=authenticated_service.id,
content=broadcast_json['content'],
reference=broadcast_json['reference'],
cap_event=broadcast_json['cap_event'],
areas={
'names': [
area['name'] for area in broadcast_json['areas']
],
'simple_polygons': simple_polygons.as_coordinate_pairs_lat_long,
},
status=BroadcastStatusType.PENDING_APPROVAL,
api_key_id=api_user.id,
stubbed=authenticated_service.restricted
# The client may pass in broadcast_json['expires'] but its
# simpler for now to ignore it and have the rules around expiry
# for broadcasts created with the API match those created from
# the admin app
polygons = Polygons(list(chain.from_iterable((
[
[[y, x] for x, y in polygon]
for polygon in area['polygons']
] for area in broadcast_json['areas']
))))
if len(polygons) > 12 or polygons.point_count > 250:
simple_polygons = polygons.smooth.simplify
else:
simple_polygons = polygons
broadcast_message = BroadcastMessage(
service_id=authenticated_service.id,
content=broadcast_json['content'],
reference=broadcast_json['reference'],
cap_event=broadcast_json['cap_event'],
areas={
'names': [
area['name'] for area in broadcast_json['areas']
],
'simple_polygons': simple_polygons.as_coordinate_pairs_lat_long,
},
status=BroadcastStatusType.PENDING_APPROVAL,
api_key_id=api_user.id,
stubbed=authenticated_service.restricted
# The client may pass in broadcast_json['expires'] but its
# simpler for now to ignore it and have the rules around expiry
# for broadcasts created with the API match those created from
# the admin app
)
dao_save_object(broadcast_message)
current_app.logger.info(
f'Broadcast message {broadcast_message.id} created for service '
f'{authenticated_service.id} with reference {broadcast_json["reference"]}'
)
return jsonify(broadcast_message.serialize()), 201
def _cancel_or_reject_broadcast(references_to_original_broadcast, service_id):
broadcast_message = dao_get_broadcast_message_by_references_and_service_id(
references_to_original_broadcast,
service_id
)
dao_save_object(broadcast_message)
current_app.logger.info(
f'Broadcast message {broadcast_message.id} created for service '
f'{authenticated_service.id} with reference {broadcast_json["reference"]}'
if broadcast_message.status == BroadcastStatusType.PENDING_APPROVAL:
new_status = BroadcastStatusType.REJECTED
else:
new_status = BroadcastStatusType.CANCELLED
validate_and_update_broadcast_message_status(
broadcast_message,
new_status,
updating_user=None
)
return jsonify(broadcast_message.serialize()), 201
return broadcast_message
def _validate_template(broadcast_json):

View File

@@ -620,91 +620,6 @@ def test_update_broadcast_message_allows_service_user_and_platform_admin_to_canc
assert cancel_event.transmitted_content == {"body": "emergency broadcast"}
def test_update_broadcast_message_status_stores_approved_by_and_approved_at_and_queues_task(
admin_request,
sample_broadcast_service,
mocker
):
t = create_template(sample_broadcast_service, BROADCAST_TYPE, content='emergency broadcast')
bm = create_broadcast_message(
t,
status=BroadcastStatusType.PENDING_APPROVAL,
areas={
"ids": ["london"],
"simple_polygons": [[[51.30, 0.7], [51.28, 0.8], [51.25, -0.7]]]
}
)
approver = create_user(email='approver@gov.uk')
sample_broadcast_service.users.append(approver)
mock_task = mocker.patch('app.celery.broadcast_message_tasks.send_broadcast_event.apply_async')
response = admin_request.post(
'broadcast_message.update_broadcast_message_status',
_data={'status': BroadcastStatusType.BROADCASTING, 'created_by': str(approver.id)},
service_id=t.service_id,
broadcast_message_id=bm.id,
_expected_status=200
)
assert response['status'] == BroadcastStatusType.BROADCASTING
assert response['approved_at'] is not None
assert response['approved_by_id'] == str(approver.id)
assert len(bm.events) == 1
alert_event = bm.events[0]
mock_task.assert_called_once_with(kwargs={'broadcast_event_id': str(alert_event.id)}, queue='broadcast-tasks')
assert alert_event.service_id == sample_broadcast_service.id
assert alert_event.transmitted_areas == bm.areas
assert alert_event.message_type == BroadcastEventMessageType.ALERT
assert alert_event.transmitted_finishes_at == bm.finishes_at
assert alert_event.transmitted_content == {"body": "emergency broadcast"}
@pytest.mark.parametrize('broadcast_message_stubbed, service_restricted_before_approval', [
(True, True),
(True, False),
(False, True),
])
def test_update_broadcast_message_status_updates_details_but_does_not_queue_task_if_bm_is_stubbed_or_service_not_live(
admin_request,
sample_broadcast_service,
mocker,
broadcast_message_stubbed,
service_restricted_before_approval,
):
sample_broadcast_service.restricted = broadcast_message_stubbed
t = create_template(sample_broadcast_service, BROADCAST_TYPE, content='emergency broadcast')
bm = create_broadcast_message(
t,
status=BroadcastStatusType.PENDING_APPROVAL,
areas={"ids": ["london"], "simple_polygons": [[[51.30, 0.7], [51.28, 0.8], [51.25, -0.7]]]},
stubbed=broadcast_message_stubbed
)
approver = create_user(email='approver@gov.uk')
sample_broadcast_service.users.append(approver)
mock_task = mocker.patch('app.celery.broadcast_message_tasks.send_broadcast_event.apply_async')
sample_broadcast_service.restricted = service_restricted_before_approval
response = admin_request.post(
'broadcast_message.update_broadcast_message_status',
_data={'status': BroadcastStatusType.BROADCASTING, 'created_by': str(approver.id)},
service_id=t.service_id,
broadcast_message_id=bm.id,
_expected_status=200
)
assert response['status'] == BroadcastStatusType.BROADCASTING
assert response['approved_at'] is not None
assert response['approved_by_id'] == str(approver.id)
# The broadcast can be approved, but does not create a broadcast_event in the database or put a task on the queue
assert len(bm.events) == 0
assert len(mock_task.mock_calls) == 0
def test_update_broadcast_message_status_aborts_if_service_is_suspended(
admin_request,
sample_broadcast_service,
@@ -721,123 +636,6 @@ def test_update_broadcast_message_status_aborts_if_service_is_suspended(
)
def test_update_broadcast_message_status_creates_event_with_correct_content_if_broadcast_has_no_template(
admin_request,
sample_broadcast_service,
mocker
):
bm = create_broadcast_message(
service=sample_broadcast_service,
template=None,
content='tailor made emergency broadcast content',
status=BroadcastStatusType.PENDING_APPROVAL,
areas={
"ids": ["london"],
"simple_polygons": [[[51.30, 0.7], [51.28, 0.8], [51.25, -0.7]]]
}
)
approver = create_user(email='approver@gov.uk')
sample_broadcast_service.users.append(approver)
mock_task = mocker.patch('app.celery.broadcast_message_tasks.send_broadcast_event.apply_async')
response = admin_request.post(
'broadcast_message.update_broadcast_message_status',
_data={'status': BroadcastStatusType.BROADCASTING, 'created_by': str(approver.id)},
service_id=sample_broadcast_service.id,
broadcast_message_id=bm.id,
_expected_status=200
)
assert response['status'] == BroadcastStatusType.BROADCASTING
assert len(bm.events) == 1
alert_event = bm.events[0]
mock_task.assert_called_once_with(kwargs={'broadcast_event_id': str(alert_event.id)}, queue='broadcast-tasks')
assert alert_event.transmitted_content == {"body": "tailor made emergency broadcast content"}
@pytest.mark.parametrize('is_platform_admin', [True, False])
def test_update_broadcast_message_status_rejects_approval_from_creator(
admin_request,
sample_broadcast_service,
mocker,
is_platform_admin
):
t = create_template(sample_broadcast_service, BROADCAST_TYPE)
bm = create_broadcast_message(t, status=BroadcastStatusType.PENDING_APPROVAL)
user = sample_broadcast_service.created_by
user.platform_admin = is_platform_admin
mock_task = mocker.patch('app.celery.broadcast_message_tasks.send_broadcast_event.apply_async')
response = admin_request.post(
'broadcast_message.update_broadcast_message_status',
_data={'status': BroadcastStatusType.BROADCASTING, 'created_by': str(t.created_by_id)},
service_id=t.service_id,
broadcast_message_id=bm.id,
_expected_status=400
)
assert mock_task.called is False
assert 'cannot approve their own broadcast' in response['message']
def test_update_broadcast_message_status_rejects_approval_of_broadcast_with_no_areas(
admin_request,
sample_broadcast_service,
mocker
):
template = create_template(sample_broadcast_service, BROADCAST_TYPE)
broadcast = create_broadcast_message(template, status=BroadcastStatusType.PENDING_APPROVAL)
approver = create_user(email='approver@gov.uk')
sample_broadcast_service.users.append(approver)
mock_task = mocker.patch('app.celery.broadcast_message_tasks.send_broadcast_event.apply_async')
response = admin_request.post(
'broadcast_message.update_broadcast_message_status',
_data={'status': BroadcastStatusType.BROADCASTING, 'created_by': str(approver.id)},
service_id=template.service_id,
broadcast_message_id=broadcast.id,
_expected_status=400
)
assert mock_task.called is False
assert response[
'message'
] == f'broadcast_message {broadcast.id} has no selected areas and so cannot be broadcasted.'
def test_update_broadcast_message_status_allows_trial_mode_services_to_approve_own_message(
notify_db,
admin_request,
sample_broadcast_service,
mocker
):
sample_broadcast_service.restricted = True
t = create_template(sample_broadcast_service, BROADCAST_TYPE)
bm = create_broadcast_message(
t,
status=BroadcastStatusType.PENDING_APPROVAL,
areas={"ids": ["london"], "simple_polygons": [[[51.30, 0.7], [51.28, 0.8], [51.25, -0.7]]]}
)
mock_task = mocker.patch('app.celery.broadcast_message_tasks.send_broadcast_event.apply_async')
response = admin_request.post(
'broadcast_message.update_broadcast_message_status',
_data={'status': BroadcastStatusType.BROADCASTING, 'created_by': str(t.created_by_id)},
service_id=t.service_id,
broadcast_message_id=bm.id,
_expected_status=200
)
assert response['status'] == BroadcastStatusType.BROADCASTING
assert response['approved_at'] is not None
assert response['created_by_id'] == str(t.created_by_id)
assert response['approved_by_id'] == str(t.created_by_id)
assert not mock_task.called
def test_update_broadcast_message_status_rejects_approval_from_user_not_on_that_service(
admin_request,
sample_broadcast_service,
@@ -858,56 +656,3 @@ def test_update_broadcast_message_status_rejects_approval_from_user_not_on_that_
assert mock_task.called is False
assert 'cannot update broadcast' in response['message']
@pytest.mark.parametrize('current_status, new_status', [
(BroadcastStatusType.DRAFT, BroadcastStatusType.DRAFT),
(BroadcastStatusType.DRAFT, BroadcastStatusType.BROADCASTING),
(BroadcastStatusType.DRAFT, BroadcastStatusType.CANCELLED),
(BroadcastStatusType.PENDING_APPROVAL, BroadcastStatusType.PENDING_APPROVAL),
(BroadcastStatusType.PENDING_APPROVAL, BroadcastStatusType.CANCELLED),
(BroadcastStatusType.PENDING_APPROVAL, BroadcastStatusType.COMPLETED),
(BroadcastStatusType.REJECTED, BroadcastStatusType.REJECTED),
(BroadcastStatusType.REJECTED, BroadcastStatusType.BROADCASTING),
(BroadcastStatusType.REJECTED, BroadcastStatusType.CANCELLED),
(BroadcastStatusType.REJECTED, BroadcastStatusType.COMPLETED),
(BroadcastStatusType.BROADCASTING, BroadcastStatusType.DRAFT),
(BroadcastStatusType.BROADCASTING, BroadcastStatusType.PENDING_APPROVAL),
(BroadcastStatusType.BROADCASTING, BroadcastStatusType.BROADCASTING),
(BroadcastStatusType.COMPLETED, BroadcastStatusType.DRAFT),
(BroadcastStatusType.COMPLETED, BroadcastStatusType.PENDING_APPROVAL),
(BroadcastStatusType.COMPLETED, BroadcastStatusType.BROADCASTING),
(BroadcastStatusType.COMPLETED, BroadcastStatusType.CANCELLED),
(BroadcastStatusType.CANCELLED, BroadcastStatusType.DRAFT),
(BroadcastStatusType.CANCELLED, BroadcastStatusType.PENDING_APPROVAL),
(BroadcastStatusType.CANCELLED, BroadcastStatusType.BROADCASTING),
(BroadcastStatusType.CANCELLED, BroadcastStatusType.COMPLETED),
])
def test_update_broadcast_message_status_restricts_status_transitions_to_explicit_list(
admin_request,
sample_broadcast_service,
mocker,
current_status,
new_status
):
t = create_template(sample_broadcast_service, BROADCAST_TYPE)
bm = create_broadcast_message(t, status=current_status)
approver = create_user(email='approver@gov.uk')
sample_broadcast_service.users.append(approver)
mock_task = mocker.patch('app.celery.broadcast_message_tasks.send_broadcast_event.apply_async')
response = admin_request.post(
'broadcast_message.update_broadcast_message_status',
_data={'status': new_status, 'created_by': str(approver.id)},
service_id=t.service_id,
broadcast_message_id=bm.id,
_expected_status=400
)
assert mock_task.called is False
assert f'from {current_status} to {new_status}' in response['message']

View File

@@ -0,0 +1,304 @@
import pytest
from app.broadcast_message.utils import (
validate_and_update_broadcast_message_status,
)
from app.errors import InvalidRequest
from app.models import (
BROADCAST_TYPE,
BroadcastEventMessageType,
BroadcastStatusType,
)
from tests.app.db import create_broadcast_message, create_template, create_user
def test_validate_and_update_broadcast_message_status_stores_approved_by_and_approved_at_and_queues_task(
sample_broadcast_service,
mocker
):
template = create_template(sample_broadcast_service, BROADCAST_TYPE, content='emergency broadcast')
broadcast_message = create_broadcast_message(
template,
status=BroadcastStatusType.PENDING_APPROVAL,
areas={
"ids": ["london"],
"simple_polygons": [[[51.30, 0.7], [51.28, 0.8], [51.25, -0.7]]]
}
)
approver = create_user(email='approver@gov.uk')
sample_broadcast_service.users.append(approver)
mock_task = mocker.patch('app.celery.broadcast_message_tasks.send_broadcast_event.apply_async')
validate_and_update_broadcast_message_status(
broadcast_message, BroadcastStatusType.BROADCASTING, approver
)
assert broadcast_message.status == BroadcastStatusType.BROADCASTING
assert broadcast_message.approved_at is not None
assert broadcast_message.approved_by_id == approver.id
assert len(broadcast_message.events) == 1
alert_event = broadcast_message.events[0]
mock_task.assert_called_once_with(kwargs={'broadcast_event_id': str(alert_event.id)}, queue='broadcast-tasks')
assert alert_event.service_id == sample_broadcast_service.id
assert alert_event.transmitted_areas == broadcast_message.areas
assert alert_event.message_type == BroadcastEventMessageType.ALERT
assert alert_event.transmitted_finishes_at == broadcast_message.finishes_at
assert alert_event.transmitted_content == {"body": "emergency broadcast"}
@pytest.mark.parametrize("cancel_route", ["admin_interface", "api_call"])
def test_validate_and_update_broadcast_message_status_for_cancelling_broadcast(
sample_broadcast_service,
mocker,
cancel_route
):
template = create_template(sample_broadcast_service, BROADCAST_TYPE, content='emergency broadcast')
broadcast_message = create_broadcast_message(
template,
status=BroadcastStatusType.BROADCASTING,
areas={
"ids": ["london"],
"simple_polygons": [[[51.30, 0.7], [51.28, 0.8], [51.25, -0.7]]]
}
)
if cancel_route == "admin_interface":
canceller = sample_broadcast_service.created_by
else:
canceller = None
mock_task = mocker.patch('app.celery.broadcast_message_tasks.send_broadcast_event.apply_async')
validate_and_update_broadcast_message_status(
broadcast_message, BroadcastStatusType.CANCELLED, canceller
)
assert broadcast_message.status == BroadcastStatusType.CANCELLED
assert broadcast_message.cancelled_at is not None
assert broadcast_message.cancelled_by_id == (canceller.id if canceller else None)
assert len(broadcast_message.events) == 1
alert_event = broadcast_message.events[0]
mock_task.assert_called_once_with(kwargs={'broadcast_event_id': str(alert_event.id)}, queue='broadcast-tasks')
assert alert_event.service_id == sample_broadcast_service.id
assert alert_event.message_type == BroadcastEventMessageType.CANCEL
@pytest.mark.parametrize("reject_route", ["admin_interface", "api_call"])
def test_validate_and_update_broadcast_message_status_for_rejecting_broadcast(
sample_broadcast_service,
mocker,
reject_route
):
template = create_template(sample_broadcast_service, BROADCAST_TYPE, content='emergency broadcast')
broadcast_message = create_broadcast_message(
template,
status=BroadcastStatusType.PENDING_APPROVAL,
areas={
"ids": ["london"],
"simple_polygons": [[[51.30, 0.7], [51.28, 0.8], [51.25, -0.7]]]
}
)
if reject_route == "admin_interface":
canceller = sample_broadcast_service.created_by
else:
canceller = None
mock_task = mocker.patch('app.celery.broadcast_message_tasks.send_broadcast_event.apply_async')
validate_and_update_broadcast_message_status(
broadcast_message, BroadcastStatusType.REJECTED, canceller
)
assert broadcast_message.status == BroadcastStatusType.REJECTED
assert broadcast_message.cancelled_at is None
assert broadcast_message.cancelled_by_id is None
assert broadcast_message.updated_at is not None
assert not mock_task.called
assert len(broadcast_message.events) == 0
@pytest.mark.parametrize('current_status, new_status', [
(BroadcastStatusType.DRAFT, BroadcastStatusType.DRAFT),
(BroadcastStatusType.DRAFT, BroadcastStatusType.BROADCASTING),
(BroadcastStatusType.DRAFT, BroadcastStatusType.CANCELLED),
(BroadcastStatusType.PENDING_APPROVAL, BroadcastStatusType.PENDING_APPROVAL),
(BroadcastStatusType.PENDING_APPROVAL, BroadcastStatusType.CANCELLED),
(BroadcastStatusType.PENDING_APPROVAL, BroadcastStatusType.COMPLETED),
(BroadcastStatusType.REJECTED, BroadcastStatusType.REJECTED),
(BroadcastStatusType.REJECTED, BroadcastStatusType.BROADCASTING),
(BroadcastStatusType.REJECTED, BroadcastStatusType.CANCELLED),
(BroadcastStatusType.REJECTED, BroadcastStatusType.COMPLETED),
(BroadcastStatusType.BROADCASTING, BroadcastStatusType.DRAFT),
(BroadcastStatusType.BROADCASTING, BroadcastStatusType.PENDING_APPROVAL),
(BroadcastStatusType.BROADCASTING, BroadcastStatusType.BROADCASTING),
(BroadcastStatusType.COMPLETED, BroadcastStatusType.DRAFT),
(BroadcastStatusType.COMPLETED, BroadcastStatusType.PENDING_APPROVAL),
(BroadcastStatusType.COMPLETED, BroadcastStatusType.BROADCASTING),
(BroadcastStatusType.COMPLETED, BroadcastStatusType.CANCELLED),
(BroadcastStatusType.CANCELLED, BroadcastStatusType.DRAFT),
(BroadcastStatusType.CANCELLED, BroadcastStatusType.PENDING_APPROVAL),
(BroadcastStatusType.CANCELLED, BroadcastStatusType.BROADCASTING),
(BroadcastStatusType.CANCELLED, BroadcastStatusType.COMPLETED),
])
def test_validate_and_update_broadcast_message_status_restricts_status_transitions_to_explicit_list(
sample_broadcast_service,
mocker,
current_status,
new_status
):
t = create_template(sample_broadcast_service, BROADCAST_TYPE)
broadcast_message = create_broadcast_message(t, status=current_status)
approver = create_user(email='approver@gov.uk')
sample_broadcast_service.users.append(approver)
mock_task = mocker.patch('app.celery.broadcast_message_tasks.send_broadcast_event.apply_async')
with pytest.raises(expected_exception=InvalidRequest) as e:
validate_and_update_broadcast_message_status(broadcast_message, new_status, approver)
assert mock_task.called is False
assert f'from {current_status} to {new_status}' in str(e.value)
@pytest.mark.parametrize('is_platform_admin', [True, False])
def test_validate_and_update_broadcast_message_status_rejects_approval_from_creator(
sample_broadcast_service,
mocker,
is_platform_admin
):
template = create_template(sample_broadcast_service, BROADCAST_TYPE)
broadcast_message = create_broadcast_message(template, status=BroadcastStatusType.PENDING_APPROVAL)
creator_and_approver = sample_broadcast_service.created_by
creator_and_approver.platform_admin = is_platform_admin
mock_task = mocker.patch('app.celery.broadcast_message_tasks.send_broadcast_event.apply_async')
with pytest.raises(expected_exception=InvalidRequest) as e:
validate_and_update_broadcast_message_status(
broadcast_message, BroadcastStatusType.BROADCASTING, creator_and_approver
)
assert mock_task.called is False
assert 'cannot approve their own broadcast' in str(e.value)
def test_validate_and_update_broadcast_message_status_rejects_approval_of_broadcast_with_no_areas(
admin_request,
sample_broadcast_service,
mocker
):
template = create_template(sample_broadcast_service, BROADCAST_TYPE)
broadcast = create_broadcast_message(template, status=BroadcastStatusType.PENDING_APPROVAL)
approver = create_user(email='approver@gov.uk')
sample_broadcast_service.users.append(approver)
mock_task = mocker.patch('app.celery.broadcast_message_tasks.send_broadcast_event.apply_async')
with pytest.raises(expected_exception=InvalidRequest) as e:
validate_and_update_broadcast_message_status(broadcast, BroadcastStatusType.BROADCASTING, approver)
assert mock_task.called is False
assert f'broadcast_message {broadcast.id} has no selected areas and so cannot be broadcasted.' in str(e.value)
def test_validate_and_update_broadcast_message_status_allows_trial_mode_services_to_approve_own_message(
notify_db,
sample_broadcast_service,
mocker
):
sample_broadcast_service.restricted = True
template = create_template(sample_broadcast_service, BROADCAST_TYPE)
broadcast_message = create_broadcast_message(
template,
status=BroadcastStatusType.PENDING_APPROVAL,
areas={"ids": ["london"], "simple_polygons": [[[51.30, 0.7], [51.28, 0.8], [51.25, -0.7]]]}
)
creator_and_approver = sample_broadcast_service.created_by
mock_task = mocker.patch('app.celery.broadcast_message_tasks.send_broadcast_event.apply_async')
validate_and_update_broadcast_message_status(
broadcast_message, BroadcastStatusType.BROADCASTING, creator_and_approver
)
assert broadcast_message.status == BroadcastStatusType.BROADCASTING
assert broadcast_message.approved_at is not None
assert broadcast_message.created_by_id == template.created_by_id
assert broadcast_message.approved_by_id == template.created_by_id
assert not mock_task.called
@pytest.mark.parametrize('broadcast_message_stubbed, service_restricted_before_approval', [
(True, True),
(True, False),
(False, True),
])
def test_validate_and_update_broadcast_message_status_when_broadcast_message_is_stubbed_or_service_not_live(
admin_request,
sample_broadcast_service,
mocker,
broadcast_message_stubbed,
service_restricted_before_approval,
):
sample_broadcast_service.restricted = broadcast_message_stubbed
template = create_template(sample_broadcast_service, BROADCAST_TYPE, content='emergency broadcast')
broadcast_message = create_broadcast_message(
template,
status=BroadcastStatusType.PENDING_APPROVAL,
areas={"ids": ["london"], "simple_polygons": [[[51.30, 0.7], [51.28, 0.8], [51.25, -0.7]]]},
stubbed=broadcast_message_stubbed
)
approver = create_user(email='approver@gov.uk')
sample_broadcast_service.users.append(approver)
mock_task = mocker.patch('app.celery.broadcast_message_tasks.send_broadcast_event.apply_async')
sample_broadcast_service.restricted = service_restricted_before_approval
validate_and_update_broadcast_message_status(
broadcast_message, BroadcastStatusType.BROADCASTING, approver
)
assert broadcast_message.status == BroadcastStatusType.BROADCASTING
assert broadcast_message.approved_at is not None
assert broadcast_message.approved_by_id == approver.id
# The broadcast can be approved, but does not create a broadcast_event in the database or put a task on the queue
assert len(broadcast_message.events) == 0
assert len(mock_task.mock_calls) == 0
def test_validate_and_update_broadcast_message_status_creates_event_with_correct_content_if_broadcast_has_no_template(
admin_request,
sample_broadcast_service,
mocker
):
broadcast_message = create_broadcast_message(
service=sample_broadcast_service,
template=None,
content='tailor made emergency broadcast content',
status=BroadcastStatusType.PENDING_APPROVAL,
areas={
"ids": ["london"],
"simple_polygons": [[[51.30, 0.7], [51.28, 0.8], [51.25, -0.7]]]
}
)
approver = create_user(email='approver@gov.uk')
sample_broadcast_service.users.append(approver)
mock_task = mocker.patch('app.celery.broadcast_message_tasks.send_broadcast_event.apply_async')
validate_and_update_broadcast_message_status(
broadcast_message, BroadcastStatusType.BROADCASTING, approver
)
assert broadcast_message.status == BroadcastStatusType.BROADCASTING
assert len(broadcast_message.events) == 1
alert_event = broadcast_message.events[0]
mock_task.assert_called_once_with(kwargs={'broadcast_event_id': str(alert_event.id)}, queue='broadcast-tasks')
assert alert_event.transmitted_content == {"body": "tailor made emergency broadcast content"}

View File

@@ -183,6 +183,33 @@ def sample_broadcast_service(broadcast_organisation, sample_user):
return service
@pytest.fixture(scope='function')
def sample_broadcast_service_2(broadcast_organisation, sample_user):
service_name = 'Sample broadcast service 2'
email_from = service_name.lower().replace(' ', '.')
data = {
'name': service_name,
'message_limit': 1000,
'restricted': False,
'email_from': email_from,
'created_by': sample_user,
'crown': True,
'count_as_live': False,
}
service = Service.query.filter_by(name=service_name).first()
if not service:
service = Service(**data)
dao_create_service(service, sample_user, service_permissions=[BROADCAST_TYPE])
insert_or_update_service_broadcast_settings(service, channel="severe")
dao_add_service_to_organisation(service, current_app.config['BROADCAST_ORGANISATION_ID'])
else:
if sample_user not in service.users:
dao_add_user_to_service(service, sample_user)
return service
@pytest.fixture(scope='function', name='sample_service_full_permissions')
def _sample_service_full_permissions(notify_db_session):
service = create_service(

View File

@@ -35,6 +35,41 @@ WAINFLEET = """
</alert>
"""
WAINFLEET_CANCEL = """
<alert xmlns="urn:oasis:names:tc:emergency:cap:1.2">
<identifier>5fc99d720abb86020b233422a503af78E</identifier>
<sender>www.gov.uk/environment-agency</sender>
<sent>2020-02-16T23:02:26-00:00</sent>
<status>Actual</status>
<msgType>Cancel</msgType>
<source>Flood warning service</source>
<scope>Public</scope>
<code></code>
<references>www.gov.uk/environment-agency,50385fcb0ab7aa447bbd46d848ce8466E,2020-02-16T23:01:13-00:00</references>
<info>
<language>en-GB</language>
<category>Met</category>
<event><![CDATA[Remove Severe Flood Warning - Cell Broadcast]]></event>
<urgency>Immediate</urgency>
<severity>Severe</severity>
<certainty>Likely</certainty>
<expires>2020-02-16T23:30:13-00:00</expires>
<senderName>Environment Agency</senderName>
<description>Cancel Warning</description>
<web>https://flood-warning-information.service.gov.uk</web>
<contact>0345 988 1188</contact>
<area>
<areaDesc>River Steeping in Wainfleet All Saints</areaDesc>
<polygon>53.10569,0.24453 53.10593,0.24430 53.10601,0.24375 53.10615,0.24349 53.10629,0.24356 53.10656,0.24336 53.10697,0.24354 53.10684,0.24298 53.10694,0.24264 53.10721,0.24302 53.10752,0.24310 53.10777,0.24308 53.10805,0.24320 53.10803,0.24187 53.10776,0.24085 53.10774,0.24062 53.10702,0.24056 53.10679,0.24088 53.10658,0.24071 53.10651,0.24049 53.10656,0.24022 53.10642,0.24022 53.10632,0.24052 53.10629,0.24082 53.10612,0.24093 53.10583,0.24133 53.10564,0.24178 53.10541,0.24282 53.10569,0.24453</polygon>
<geocode>
<valueName>TargetAreaCode</valueName>
<value>053FWFSTEEP4</value>
</geocode>
</area>
</info>
</alert>
"""
UPDATE = """
<alert xmlns="urn:oasis:names:tc:emergency:cap:1.2">
<identifier>PAAQ-4-mg5a94</identifier>
@@ -118,89 +153,6 @@ UPDATE = """
</alert>
"""
CANCEL = """
<alert xmlns="urn:oasis:names:tc:emergency:cap:1.2">
<identifier>PAAQ-4-mg5a94</identifier>
<sender>wcatwc@noaa.gov</sender>
<sent>2013-01-05T10:58:23-00:00</sent>
<status>Actual</status>
<msgType>Cancel</msgType>
<source>WCATWC</source>
<scope>Public</scope>
<code>IPAWSv1.0</code>
<references>wcatwc@noaa.gov,PAAQ-1-mg5a94,2013-01-05T09:01:16-00:00 wcatwc@noaa.gov,PAAQ-2-mg5a94,2013-01-05T09:30:16-00:00 wcatwc@noaa.gov,PAAQ-3-mg5a94,2013-01-05T10:17:31-00:00</references>
<incidents>mg5a94</incidents>
<info>
<category>Geo</category>
<event>Tsunami Cancellation</event>
<responseType>None</responseType>
<urgency>Past</urgency>
<severity>Unknown</severity>
<certainty>Unlikely</certainty>
<onset>2013-01-05T10:58:23-00:00</onset>
<expires>2013-01-05T10:58:23-00:00</expires>
<senderName>NWS West Coast/Alaska Tsunami Warning Center Palmer AK</senderName>
<headline>The tsunami Warning is canceled for the coastal areas of British Columbia and Alaska from the north tip of Vancouver Island, British Columbia to Cape Fairweather, Alaska (80 miles SE of Yakutat).</headline>
<description>The tsunami Warning is canceled for the coastal areas of British Columbia and Alaska from the north tip of Vancouver Island, British Columbia to Cape Fairweather, Alaska (80 miles SE of Yakutat). - Event details: Preliminary magnitude 7.5 (Mw) earthquake / Lat: 55.300, Lon: -134.900 at 2013-01-05T08:58:20Z Tsunami cancellations indicate the end of the damaging tsunami threat. A cancellation is issued after an evaluation of sea level data confirms that a destructive tsunami will not impact the alerted region, or after tsunami levels have subsided to non-damaging levels. </description>
<instruction>Recommended Actions: Do not re-occupy hazard zones until local emergency officials indicate it is safe to do so. This will be the last West Coast/Alaska Tsunami Warning Center message issued for this event. Refer to the internet site ntwc.arh.noaa.gov for more information. </instruction>
<web>http://ntwc.arh.noaa.gov/events/PAAQ/2013/01/05/mg5a94/4/WEAK51/WEAK51.txt</web>
<parameter>
<valueName>EventLocationName</valueName>
<value>95 miles NW of Dixon Entrance, Alaska</value>
</parameter>
<parameter>
<valueName>EventPreliminaryMagnitude</valueName>
<value>7.5</value>
</parameter>
<parameter>
<valueName>EventPreliminaryMagnitudeType</valueName>
<value>Mw</value>
</parameter>
<parameter>
<valueName>EventOriginTime</valueName>
<value>2013-01-05T08:58:20-00:00</value>
</parameter>
<parameter>
<valueName>EventDepth</valueName>
<value>5 kilometers</value>
</parameter>
<parameter>
<valueName>EventLatLon</valueName>
<value>55.300,-134.900 0.000</value>
</parameter>
<parameter>
<valueName>VTEC</valueName>
<value>/O.CAN.PAAQ.TS.W.0001.000000T0000Z-000000T0000Z/</value>
</parameter>
<parameter>
<valueName>NWSUGC</valueName>
<value>BCZ220-210-922-912-921-911-110-AKZ026&gt;029-023-024-019&gt;022-025-051258-</value>
</parameter>
<parameter>
<valueName>ProductDefinition</valueName>
<value>Tsunami cancellations indicate the end of the damaging tsunami threat. A cancellation is issued after an evaluation of sea level data confirms that a destructive tsunami will not impact the alerted region, or after tsunami levels have subsided to non-damaging levels. </value>
</parameter>
<parameter>
<valueName>WEAK51</valueName>
<value>Public Tsunami Warnings, Watches, and Advisories for AK, BC, and US West Coast</value>
</parameter>
<parameter>
<valueName>EAS-ORG</valueName>
<value>WXR</value>
</parameter>
<resource>
<resourceDesc>Event Data as a JSON document</resourceDesc>
<mimeType>application/json</mimeType>
<uri>http://ntwc.arh.noaa.gov/events/PAAQ/2013/01/05/mg5a94/4/WEAK51/PAAQ.json</uri>
</resource>
<area>
<areaDesc>95 miles NW of Dixon Entrance, Alaska</areaDesc>
<circle>55.3,-134.9 0.0</circle>
</area>
</info>
</alert>
"""
WITH_PLACEHOLDER_FOR_CONTENT = """
<alert xmlns="urn:oasis:names:tc:emergency:cap:1.2">
<identifier>50385fcb0ab7aa447bbd46d848ce8466E</identifier>
@@ -237,7 +189,7 @@ WITH_PLACEHOLDER_FOR_CONTENT = """
WINDEMERE = """
<alert xmlns="urn:oasis:names:tc:emergency:cap:1.2">
<identifier>50385fcb0ab7aa447bbd46d848ce8466E</identifier>
<identifier>4f6d28b10ab7aa447bbd46d85f1e9effE</identifier>
<sender>www.gov.uk/environment-agency</sender>
<sent>2020-02-16T23:01:13-00:00</sent>
<status>Actual</status>

View File

@@ -119,6 +119,112 @@ def test_valid_post_cap_xml_broadcast_returns_201(
assert response_json['updated_at'] is None
@pytest.mark.parametrize("is_approved,expected_status", [
[True, "cancelled"],
[False, "rejected"]
])
def test_valid_cancel_broadcast_request_calls_validate_and_update_broadcast_message_status_and_returns_201(
client,
sample_broadcast_service,
mocker,
is_approved,
expected_status
):
auth_header = create_service_authorization_header(service_id=sample_broadcast_service.id)
# create a broadcast
response_for_create = client.post(
path='/v2/broadcast',
data=sample_cap_xml_documents.WAINFLEET,
headers=[('Content-Type', 'application/cap+xml'), auth_header],
)
assert response_for_create.status_code == 201
response_json_for_create = json.loads(response_for_create.get_data(as_text=True))
broadcast_message = dao_get_broadcast_message_by_id_and_service_id(
response_json_for_create["id"], response_json_for_create["service_id"]
)
# approve broadcast
if is_approved:
broadcast_message.status = 'broadcasting'
mock_update = mocker.patch('app.v2.broadcast.post_broadcast.validate_and_update_broadcast_message_status')
# cancel broadcast
response_for_cancel = client.post(
path='/v2/broadcast',
data=sample_cap_xml_documents.WAINFLEET_CANCEL,
headers=[('Content-Type', 'application/cap+xml'), auth_header],
)
assert response_for_cancel.status_code == 201
mock_update.assert_called_once_with(broadcast_message, expected_status, updating_user=None)
def test_cancel_request_does_not_cancel_broadcast_if_reference_does_not_match(
client,
sample_broadcast_service
):
auth_header = create_service_authorization_header(service_id=sample_broadcast_service.id)
# create a broadcast
response_for_create = client.post(
path='/v2/broadcast',
data=sample_cap_xml_documents.WINDEMERE,
headers=[('Content-Type', 'application/cap+xml'), auth_header],
)
assert response_for_create.status_code == 201
response_json_for_create = json.loads(response_for_create.get_data(as_text=True))
assert response_json_for_create['cancelled_at'] is None
assert response_json_for_create['cancelled_by_id'] is None
assert response_json_for_create['reference'] == '4f6d28b10ab7aa447bbd46d85f1e9effE'
assert response_json_for_create['status'] == 'pending-approval'
# try to cancel broadcast, but reference doesn't match
response_for_cancel = client.post(
path='/v2/broadcast',
data=sample_cap_xml_documents.WAINFLEET_CANCEL,
headers=[('Content-Type', 'application/cap+xml'), auth_header],
)
assert response_for_cancel.status_code == 404
def test_cancel_request_does_not_cancel_broadcast_if_service_id_does_not_match(
client,
sample_broadcast_service,
sample_broadcast_service_2
):
auth_header = create_service_authorization_header(service_id=sample_broadcast_service.id)
# create a broadcast
response_for_create = client.post(
path='/v2/broadcast',
data=sample_cap_xml_documents.WAINFLEET,
headers=[('Content-Type', 'application/cap+xml'), auth_header],
)
assert response_for_create.status_code == 201
response_json_for_create = json.loads(response_for_create.get_data(as_text=True))
assert response_json_for_create['cancelled_at'] is None
assert response_json_for_create['cancelled_by_id'] is None
assert response_json_for_create['reference'] == '50385fcb0ab7aa447bbd46d848ce8466E'
assert response_json_for_create['status'] == 'pending-approval'
# try to cancel broadcast, but service id doesn't match
auth_header_2 = create_service_authorization_header(service_id=sample_broadcast_service_2.id)
response_for_cancel = client.post(
path='/v2/broadcast',
data=sample_cap_xml_documents.WAINFLEET_CANCEL,
headers=[('Content-Type', 'application/cap+xml'), auth_header_2],
)
assert response_for_cancel.status_code == 404
def test_large_polygon_is_simplified(
client,
sample_broadcast_service,
@@ -191,32 +297,22 @@ def test_invalid_post_cap_xml_broadcast_returns_400(
}
@pytest.mark.parametrize('xml_document, expected_error_message', (
(sample_cap_xml_documents.CANCEL, (
'msgType Cancel is not one of [Alert]'
)),
(sample_cap_xml_documents.UPDATE, (
'msgType Update is not one of [Alert]'
)),
))
def test_unsupported_message_types_400(
client,
sample_broadcast_service,
xml_document,
expected_error_message,
):
auth_header = create_service_authorization_header(service_id=sample_broadcast_service.id)
response = client.post(
path='/v2/broadcast',
data=xml_document,
data=sample_cap_xml_documents.UPDATE,
headers=[('Content-Type', 'application/cap+xml'), auth_header],
)
assert response.status_code == 400
assert {
'error': 'ValidationError',
'message': expected_error_message,
'message': 'msgType Update is not one of [Alert, Cancel]',
} in (
json.loads(response.get_data(as_text=True))['errors']
)