Files
notifications-admin/app/models/broadcast_message.py
Chris Hill-Scott 738ac1d818 Vary bleed amount based on population density
There are basically two kinds of 4G masts:

Frequency | Range       | Bandwidth
----------|-------------|----------------------------------
800MHz    | Long (500m) | Low (can handle a bit of traffic)
1800Mhz   | Short (5km) | High (can handle lots of traffic)

The 1800Mhz masts are better in terms of how much traffic they can
handle and how fast a connection they provide. But because they have
quite short range, it’s only economical to install them in very built up
areas†.

In more rural areas the 800MHz masts are better because they cover a
wider area, and have enough bandwidth for the lower population density.

The net effect of this is that cell broadcasts in rural areas are likely
to bleed further, because the masts they are being broadcast from are
less precise.

We can use population density as a proxy for how likely it is to be
covered by 1800Mhz masts, and therefore how much bleed we should expect.
So this commit varies the amount of bleed shown based on the population
density.

I came up with the formula based on 3 fixed points:
- The most remote areas (for example the Scottish Highlands) should have
  the highest average bleed, estimated at 5km
- An town, like Crewe, should have about the same bleed as we were
  estimating before (1.5km) – Pete D thinks this is about right based on
  his knowledge of the area around his office in Crewe
- The most built up areas, like London boroughs, could have as little as
  500m of bleed

Based on these three figures I came up with the following formula, which
roughly gives the right bleed distance (`b`) for each of their population
densities (`d`):
```
b = 5900 - (log10(d) × 1_250)
```

Plotted on a curve it looks like this:

This is based on averages – remember that the UI shows where is _likely_
to receive the alert, based on bleed, not where it’s _possible_ to
receive the alert.

Here’s what it looks like on the map:

---

†There are some additional subtleties which make this not strictly true:
- The 800Mhz masts are also used in built up areas to fill in the gaps
  between the areas covered by the 1800Mhz masts
- Switching between masts is inefficient, so if you’re moving fast
  through a built up area (for example on a train) your phone will only
  use the 800MHz masts so that you have to handoff from one mast to
  another less often
2021-03-18 09:37:23 +00:00

255 lines
7.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import itertools
from datetime import datetime, timedelta
from notifications_utils.polygons import Polygons
from notifications_utils.template import BroadcastPreviewTemplate
from orderedset import OrderedSet
from werkzeug.utils import cached_property
from app.broadcast_areas import CustomBroadcastAreas, broadcast_area_libraries
from app.formatters import round_to_significant_figures
from app.models import JSONModel, ModelList
from app.models.user import User
from app.notify_client.broadcast_message_api_client import (
broadcast_message_api_client,
)
class BroadcastMessage(JSONModel):
ALLOWED_PROPERTIES = {
'id',
'service_id',
'template_id',
'content',
'service_id',
'created_by',
'personalisation',
'starts_at',
'finishes_at',
'created_at',
'approved_at',
'cancelled_at',
'updated_at',
'created_by_id',
'approved_by_id',
'cancelled_by_id',
}
libraries = broadcast_area_libraries
def __lt__(self, other):
if self.starts_at and other.starts_at:
return self.starts_at < other.starts_at
if self.starts_at and not other.starts_at:
return True
if not self.starts_at and other.starts_at:
return False
if self.updated_at and not other.updated_at:
return self.updated_at < other.created_at
if not self.updated_at and other.updated_at:
return self.created_at < other.updated_at
if not self.updated_at and not other.updated_at:
return self.created_at < other.created_at
return self.updated_at < other.updated_at
@classmethod
def create(cls, *, service_id, template_id):
return cls(broadcast_message_api_client.create_broadcast_message(
service_id=service_id,
template_id=template_id,
content=None,
reference=None,
))
@classmethod
def create_from_content(cls, *, service_id, content, reference):
return cls(broadcast_message_api_client.create_broadcast_message(
service_id=service_id,
template_id=None,
content=content,
reference=reference,
))
@classmethod
def from_id(cls, broadcast_message_id, *, service_id):
return cls(broadcast_message_api_client.get_broadcast_message(
service_id=service_id,
broadcast_message_id=broadcast_message_id,
))
@property
def areas(self):
library_areas = self.get_areas(areas=self._dict['areas'])
if library_areas:
if len(library_areas) != len(self._dict['areas']):
raise RuntimeError(
f'BroadcastMessage has {len(self._dict["areas"])} areas '
f'but {len(library_areas)} found in the library'
)
return library_areas
return CustomBroadcastAreas(
areas=self._dict['areas'],
polygons=self._dict['simple_polygons'],
)
@property
def parent_areas(self):
return sorted(set(self._parent_areas_iterator))
@property
def _parent_areas_iterator(self):
for area in self.areas:
for parent in area.parents:
yield parent
@cached_property
def polygons(self):
return Polygons(
list(itertools.chain(*(
area.polygons for area in self.areas
)))
)
@cached_property
def simple_polygons(self):
return self.get_simple_polygons(areas=self.areas)
@cached_property
def simple_polygons_with_bleed(self):
polygons = Polygons(
list(itertools.chain(*(
area.simple_polygons_with_bleed for area in self.areas
)))
)
# If weve added multiple areas then we need to re-simplify the
# combined shapes to keep the point count down
return polygons.smooth.simplify if len(self.areas) > 1 else polygons
@property
def reference(self):
if self.template_id:
return self._dict['template_name']
return self._dict['reference']
@property
def template(self):
return BroadcastPreviewTemplate({
'template_type': BroadcastPreviewTemplate.template_type,
'name': self.reference,
'content': self.content,
})
@property
def status(self):
if (
self._dict['status']
and self._dict['status'] == 'broadcasting'
and self.finishes_at < datetime.utcnow().isoformat()
):
return 'completed'
return self._dict['status']
@cached_property
def created_by(self):
return User.from_id(self.created_by_id) if self.created_by_id else None
@cached_property
def approved_by(self):
return User.from_id(self.approved_by_id)
@cached_property
def cancelled_by(self):
return User.from_id(self.cancelled_by_id)
@property
def count_of_phones(self):
return round_to_significant_figures(
sum(area.count_of_phones for area in self.areas),
1
)
@property
def count_of_phones_likely(self):
area_estimate = self.simple_polygons.estimated_area
bleed_area_estimate = self.simple_polygons_with_bleed.estimated_area - area_estimate
return round_to_significant_figures(
self.count_of_phones + (self.count_of_phones * bleed_area_estimate / area_estimate),
1
)
def get_areas(self, areas):
return broadcast_area_libraries.get_areas(
*areas
)
def get_simple_polygons(self, areas):
polygons = Polygons(
list(itertools.chain(*(
area.simple_polygons for area in areas
)))
)
# If weve added multiple areas then we need to re-simplify the
# combined shapes to keep the point count down
return polygons.smooth.simplify if len(areas) > 1 else polygons
def add_areas(self, *new_areas):
areas = list(OrderedSet(
self._dict['areas'] + list(new_areas)
))
simple_polygons = self.get_simple_polygons(areas=self.get_areas(areas=areas))
self._update(areas=areas, simple_polygons=simple_polygons.as_coordinate_pairs_lat_long)
def remove_area(self, area_to_remove):
areas = [
area for area in self._dict['areas']
if area != area_to_remove
]
simple_polygons = self.get_simple_polygons(areas=self.get_areas(areas=areas))
self._update(areas=areas, simple_polygons=simple_polygons.as_coordinate_pairs_lat_long)
def _set_status_to(self, status):
broadcast_message_api_client.update_broadcast_message_status(
status,
broadcast_message_id=self.id,
service_id=self.service_id,
)
def _update(self, **kwargs):
broadcast_message_api_client.update_broadcast_message(
broadcast_message_id=self.id,
service_id=self.service_id,
data=kwargs,
)
def request_approval(self):
self._set_status_to('pending-approval')
def approve_broadcast(self):
self._update(
starts_at=datetime.utcnow().isoformat(),
finishes_at=(
datetime.utcnow() + timedelta(hours=4, minutes=0)
).isoformat(),
)
self._set_status_to('broadcasting')
def reject_broadcast(self):
self._set_status_to('rejected')
def cancel_broadcast(self):
self._set_status_to('cancelled')
class BroadcastMessages(ModelList):
model = BroadcastMessage
client_method = broadcast_message_api_client.get_broadcast_messages
def with_status(self, *statuses):
return [
broadcast for broadcast in self if broadcast.status in statuses
]