mirror of
https://github.com/GSA/notifications-admin.git
synced 2026-02-05 10:53:28 -05:00
If broadcast_message has no value under cancelled_by_id, display in the view that it was cancelled by an API call.
326 lines
11 KiB
Python
326 lines
11 KiB
Python
import itertools
|
||
from datetime import datetime, timedelta
|
||
|
||
from flask import current_app
|
||
from notifications_utils.polygons import Polygons
|
||
from notifications_utils.template import BroadcastPreviewTemplate
|
||
from orderedset import OrderedSet
|
||
from werkzeug.utils import cached_property
|
||
|
||
from app.broadcast_areas.models import (
|
||
CustomBroadcastArea,
|
||
CustomBroadcastAreas,
|
||
broadcast_area_libraries,
|
||
)
|
||
from app.broadcast_areas.utils import aggregate_areas
|
||
from app.formatters import round_to_significant_figures
|
||
from app.models import JSONModel, ModelList
|
||
from app.models.user import User
|
||
from app.notify_client.broadcast_message_api_client import (
|
||
broadcast_message_api_client,
|
||
)
|
||
|
||
ESTIMATED_AREA_OF_LARGEST_UK_COUNTY = broadcast_area_libraries.get_areas([
|
||
'ctyua19-E10000023' # North Yorkshire
|
||
])[0].polygons.estimated_area
|
||
|
||
|
||
class BroadcastMessage(JSONModel):
|
||
|
||
ALLOWED_PROPERTIES = {
|
||
'id',
|
||
'service_id',
|
||
'template_id',
|
||
'content',
|
||
'service_id',
|
||
'created_by',
|
||
'personalisation',
|
||
'starts_at',
|
||
'finishes_at',
|
||
'created_at',
|
||
'approved_at',
|
||
'cancelled_at',
|
||
'updated_at',
|
||
'created_by_id',
|
||
'approved_by_id',
|
||
'cancelled_by_id',
|
||
}
|
||
|
||
libraries = broadcast_area_libraries
|
||
|
||
def __lt__(self, other):
|
||
if self.starts_at and other.starts_at:
|
||
return self.starts_at < other.starts_at
|
||
if self.starts_at and not other.starts_at:
|
||
return True
|
||
if not self.starts_at and other.starts_at:
|
||
return False
|
||
if self.updated_at and not other.updated_at:
|
||
return self.updated_at < other.created_at
|
||
if not self.updated_at and other.updated_at:
|
||
return self.created_at < other.updated_at
|
||
if not self.updated_at and not other.updated_at:
|
||
return self.created_at < other.created_at
|
||
return self.updated_at < other.updated_at
|
||
|
||
@classmethod
|
||
def create(cls, *, service_id, template_id):
|
||
return cls(broadcast_message_api_client.create_broadcast_message(
|
||
service_id=service_id,
|
||
template_id=template_id,
|
||
content=None,
|
||
reference=None,
|
||
))
|
||
|
||
@classmethod
|
||
def create_from_content(cls, *, service_id, content, reference):
|
||
return cls(broadcast_message_api_client.create_broadcast_message(
|
||
service_id=service_id,
|
||
template_id=None,
|
||
content=content,
|
||
reference=reference,
|
||
))
|
||
|
||
@classmethod
|
||
def from_id(cls, broadcast_message_id, *, service_id):
|
||
return cls(broadcast_message_api_client.get_broadcast_message(
|
||
service_id=service_id,
|
||
broadcast_message_id=broadcast_message_id,
|
||
))
|
||
|
||
@cached_property
|
||
def areas(self):
|
||
if 'ids' in self._dict['areas']:
|
||
library_areas = self.get_areas(self.area_ids)
|
||
|
||
if len(library_areas) == len(self.area_ids):
|
||
return library_areas
|
||
else:
|
||
# it's possible an old broadcast may refer to areas that
|
||
# are no longer part of our area libraries; in this case
|
||
# we should just treat the whole thing as a custom broadcast,
|
||
# which isn't great as our code doesn't support editing its
|
||
# areas, but we don't expect this to happen often
|
||
current_app.logger.warn(
|
||
f'BroadcastMessage has {len(self.area_ids)} area IDs '
|
||
f'but {len(library_areas)} found in the library. Treating '
|
||
f'{self.id} as a custom broadcast.'
|
||
)
|
||
|
||
polygons = self._dict['areas'].get('simple_polygons', [])
|
||
|
||
if polygons:
|
||
return CustomBroadcastAreas(
|
||
names=self._dict['areas']['names'],
|
||
polygons=polygons,
|
||
)
|
||
|
||
return []
|
||
|
||
@property
|
||
def area_ids(self):
|
||
return self._dict['areas'].get('ids', [])
|
||
|
||
@area_ids.setter
|
||
def area_ids(self, value):
|
||
self._dict['areas']['ids'] = value
|
||
|
||
@property
|
||
def ancestor_areas(self):
|
||
return sorted(set(self._ancestor_areas_iterator))
|
||
|
||
@property
|
||
def _ancestor_areas_iterator(self):
|
||
for area in self.areas:
|
||
for ancestor in area.ancestors:
|
||
yield ancestor
|
||
|
||
@cached_property
|
||
def polygons(self):
|
||
return self.get_polygons_from_areas(area_attribute='polygons')
|
||
|
||
@cached_property
|
||
def simple_polygons(self):
|
||
return self.get_polygons_from_areas(area_attribute='simple_polygons')
|
||
|
||
@cached_property
|
||
def simple_polygons_with_bleed(self):
|
||
return self.get_polygons_from_areas(area_attribute='simple_polygons_with_bleed')
|
||
|
||
@property
|
||
def reference(self):
|
||
if self.template_id:
|
||
return self._dict['template_name']
|
||
return self._dict['cap_event'] or self._dict['reference']
|
||
|
||
@property
|
||
def template(self):
|
||
return BroadcastPreviewTemplate({
|
||
'template_type': BroadcastPreviewTemplate.template_type,
|
||
'name': self.reference,
|
||
'content': self.content,
|
||
})
|
||
|
||
@property
|
||
def status(self):
|
||
if (
|
||
self._dict['status']
|
||
and self._dict['status'] == 'broadcasting'
|
||
and self.finishes_at < datetime.utcnow().isoformat()
|
||
):
|
||
return 'completed'
|
||
return self._dict['status']
|
||
|
||
@cached_property
|
||
def created_by(self):
|
||
return User.from_id(self.created_by_id) if self.created_by_id else None
|
||
|
||
@cached_property
|
||
def approved_by(self):
|
||
return User.from_id(self.approved_by_id) if self.approved_by_id else None
|
||
|
||
@cached_property
|
||
def cancelled_by(self):
|
||
if not self.cancelled_by_id:
|
||
return "an API call"
|
||
return User.from_id(self.cancelled_by_id).name
|
||
|
||
@cached_property
|
||
def count_of_phones(self):
|
||
return round_to_significant_figures(
|
||
sum(area.count_of_phones for area in self.areas),
|
||
1
|
||
)
|
||
|
||
@cached_property
|
||
def count_of_phones_likely(self):
|
||
estimated_area = self.simple_polygons.estimated_area
|
||
|
||
if estimated_area > ESTIMATED_AREA_OF_LARGEST_UK_COUNTY:
|
||
# For large areas, use a naïve but computationally less
|
||
# expensive way of counting the number of phones in the
|
||
# bleed area
|
||
count = self.count_of_phones * (
|
||
self.simple_polygons_with_bleed.estimated_area / estimated_area
|
||
)
|
||
else:
|
||
# For smaller areas, where the computation can be done in
|
||
# a second or less (approximately) calculate the number of
|
||
# phones based on the ammount of overlap with areas for
|
||
# which we have population data
|
||
count = CustomBroadcastArea.from_polygon_objects(
|
||
self.simple_polygons_with_bleed
|
||
).count_of_phones
|
||
|
||
return round_to_significant_figures(count, 1)
|
||
|
||
def get_areas(self, area_ids):
|
||
return broadcast_area_libraries.get_areas(
|
||
area_ids
|
||
)
|
||
|
||
def get_polygons_from_areas(self, area_attribute):
|
||
areas_polygons = [
|
||
getattr(area, area_attribute) for area in self.areas
|
||
]
|
||
coordinate_reference_systems = {
|
||
polygons.utm_crs for polygons in areas_polygons
|
||
}
|
||
|
||
if len(coordinate_reference_systems) == 1:
|
||
# All our polygons are defined in the same coordinate
|
||
# reference system so we just have to flatten the list and
|
||
# say which coordinate reference system we are using
|
||
polygons = Polygons(
|
||
list(itertools.chain(*areas_polygons)),
|
||
utm_crs=next(iter(coordinate_reference_systems)),
|
||
)
|
||
else:
|
||
# Our polygons are in different coordinate reference systems
|
||
# We need to convert them back to degrees and make a new
|
||
# instance of `Polygons` which will determine a common
|
||
# coordinate reference system
|
||
polygons = Polygons(
|
||
list(itertools.chain(*(
|
||
area_polygon.as_wgs84_coordinates
|
||
for area_polygon in areas_polygons
|
||
)))
|
||
)
|
||
|
||
if area_attribute != 'polygons' and len(self.areas) > 1:
|
||
# We’re combining simplified polygons from multiple areas so we
|
||
# need to re-simplify the combined polygons to keep the point
|
||
# count down
|
||
return polygons.smooth.simplify
|
||
return polygons
|
||
|
||
def add_areas(self, *new_area_ids):
|
||
self.area_ids = list(OrderedSet(self.area_ids + list(new_area_ids)))
|
||
self._update_areas()
|
||
|
||
def remove_area(self, area_id):
|
||
self.area_ids = list(set(self._dict['areas']['ids']) - {area_id})
|
||
self._update_areas()
|
||
|
||
def _set_status_to(self, status):
|
||
broadcast_message_api_client.update_broadcast_message_status(
|
||
status,
|
||
broadcast_message_id=self.id,
|
||
service_id=self.service_id,
|
||
)
|
||
|
||
def _update_areas(self, force_override=False):
|
||
areas = {
|
||
'ids': self.area_ids,
|
||
'names': [area.name for area in self.areas],
|
||
'aggregate_names': [area.name for area in aggregate_areas(self.areas)],
|
||
'simple_polygons': self.simple_polygons.as_coordinate_pairs_lat_long
|
||
}
|
||
|
||
data = {'areas': areas}
|
||
|
||
# TEMPORARY: while we migrate to a new format for "areas"
|
||
if force_override:
|
||
data['force_override'] = True
|
||
|
||
self._update(**data)
|
||
|
||
def _update(self, **kwargs):
|
||
broadcast_message_api_client.update_broadcast_message(
|
||
broadcast_message_id=self.id,
|
||
service_id=self.service_id,
|
||
data=kwargs,
|
||
)
|
||
|
||
def request_approval(self):
|
||
self._set_status_to('pending-approval')
|
||
|
||
def approve_broadcast(self, channel):
|
||
if channel in {'test', 'operator'}:
|
||
ttl = timedelta(hours=4, minutes=0)
|
||
else:
|
||
ttl = timedelta(hours=22, minutes=30)
|
||
|
||
self._update(
|
||
starts_at=datetime.utcnow().isoformat(),
|
||
finishes_at=(datetime.utcnow() + ttl).isoformat(),
|
||
)
|
||
self._set_status_to('broadcasting')
|
||
|
||
def reject_broadcast(self):
|
||
self._set_status_to('rejected')
|
||
|
||
def cancel_broadcast(self):
|
||
self._set_status_to('cancelled')
|
||
|
||
|
||
class BroadcastMessages(ModelList):
|
||
|
||
model = BroadcastMessage
|
||
client_method = broadcast_message_api_client.get_broadcast_messages
|
||
|
||
def with_status(self, *statuses):
|
||
return [
|
||
broadcast for broadcast in self if broadcast.status in statuses
|
||
]
|