refactor shared functionality from provider priority logic

This commit is contained in:
Leo Hemsted
2019-12-12 11:19:35 +00:00
parent 31d1abd6d1
commit b355fc4523
2 changed files with 187 additions and 103 deletions

View File

@@ -5,7 +5,6 @@ from sqlalchemy import asc, desc, func
from flask import current_app from flask import current_app
from app.dao.dao_utils import transactional from app.dao.dao_utils import transactional
from app.dao.users_dao import get_user_by_id
from app.models import FactBilling, ProviderDetails, ProviderDetailsHistory, SMS_TYPE, User from app.models import FactBilling, ProviderDetails, ProviderDetailsHistory, SMS_TYPE, User
from app import db from app import db
@@ -34,6 +33,42 @@ def dao_get_provider_versions(provider_id):
).all() ).all()
def _adjust_provider_priority(provider, new_priority):
current_app.logger.info(
f'Adjusting provider priority - {provider.identifier} going from {provider.priority} to {new_priority}'
)
provider.priority = new_priority
# Automatic update so set as notify user
provider.created_by_id = current_app.config['NOTIFY_USER_ID']
# update without commit so that both rows can be changed without ending the transaction
# and releasing the for_update lock
_update_provider_details_without_commit(provider)
def _get_sms_providers_for_update(time_threshold):
"""
Returns a list of providers, while holding a for_update lock on the provider details table, guaranteeing that those
providers won't change (but can still be read) until you've committed/rolled back your current transaction.
if any of the providers have been changed recently, it returns an empty list - it's still your responsiblity to
release the transaction in that case
"""
# get current priority of both providers
q = ProviderDetails.query.filter(
ProviderDetails.notification_type == 'sms',
ProviderDetails.active
).with_for_update().all()
# if something updated recently, don't update again. If the updated_at is null, treat it as min time
if any((provider.updated_at or datetime.min) > datetime.utcnow() - time_threshold for provider in q):
current_app.logger.info(f"Not adjusting providers, providers updated less than {time_threshold} ago.")
return []
return q
@transactional @transactional
def dao_reduce_sms_provider_priority(identifier, *, time_threshold): def dao_reduce_sms_provider_priority(identifier, *, time_threshold):
""" """
@@ -41,50 +76,23 @@ def dao_reduce_sms_provider_priority(identifier, *, time_threshold):
If either provider has been updated in the last `time_threshold`, then it won't take any action. If either provider has been updated in the last `time_threshold`, then it won't take any action.
""" """
amount_to_reduce_by = 10 amount_to_reduce_by = 10
providers_list = _get_sms_providers_for_update(time_threshold)
# get current priority of both providers if not providers_list:
q = ProviderDetails.query.filter(
ProviderDetails.notification_type == 'sms',
ProviderDetails.active
).with_for_update().all()
providers = {provider.identifier: provider for provider in q}
other_identifier = get_alternative_sms_provider(identifier)
# if something updated recently, don't update again. If the updated_at is null, treat it as min time
if any((provider.updated_at or datetime.min) > datetime.utcnow() - time_threshold for provider in q):
current_app.logger.info("Not adjusting providers, providers updated less than {} ago.".format(time_threshold))
return return
providers = {provider.identifier: provider for provider in providers_list}
other_identifier = get_alternative_sms_provider(identifier)
reduced_provider = providers[identifier] reduced_provider = providers[identifier]
increased_provider = providers[other_identifier] increased_provider = providers[other_identifier]
pre_reduction_priority = reduced_provider.priority
pre_increase_priority = increased_provider.priority
# always keep values between 0 and 100 # always keep values between 0 and 100
reduced_provider.priority = max(0, reduced_provider.priority - amount_to_reduce_by) reduced_provider_priority = max(0, reduced_provider.priority - amount_to_reduce_by)
increased_provider.priority = min(100, increased_provider.priority + amount_to_reduce_by) increased_provider_priority = min(100, increased_provider.priority + amount_to_reduce_by)
current_app.logger.info('Adjusting provider priority - {} going from {} to {}'.format( _adjust_provider_priority(reduced_provider, reduced_provider_priority)
reduced_provider.identifier, _adjust_provider_priority(increased_provider, increased_provider_priority)
pre_reduction_priority,
reduced_provider.priority,
))
current_app.logger.info('Adjusting provider priority - {} going from {} to {}'.format(
increased_provider.identifier,
pre_increase_priority,
increased_provider.priority,
))
# Automatic update so set as notify user
notify_user = get_user_by_id(current_app.config['NOTIFY_USER_ID'])
reduced_provider.created_by_id = notify_user.id
increased_provider.created_by_id = notify_user.id
# update without commit so that both rows can be changed without ending the transaction
# and releasing the for_update lock
_update_provider_details_without_commit(reduced_provider)
_update_provider_details_without_commit(increased_provider)
@transactional @transactional
@@ -96,36 +104,19 @@ def dao_adjust_provider_priority_back_to_resting_points():
amount_to_reduce_by = 10 amount_to_reduce_by = 10
time_threshold = timedelta(hours=1) time_threshold = timedelta(hours=1)
# get current priority of both providers providers = _get_sms_providers_for_update(time_threshold)
providers = ProviderDetails.query.filter(
ProviderDetails.notification_type == 'sms',
ProviderDetails.active
).with_for_update().all()
# if something updated recently, don't update again. If the updated_at is null, treat it as min time
if any((provider.updated_at or datetime.min) > datetime.utcnow() - time_threshold for provider in providers):
current_app.logger.info("Not adjusting providers, providers updated less than {} ago.".format(time_threshold))
return
# Automatic update so set as notify user
notify_user = get_user_by_id(current_app.config['NOTIFY_USER_ID'])
for provider in providers: for provider in providers:
target = current_app.config['SMS_PROVIDER_RESTING_POINTS'][provider.identifier] target = current_app.config['SMS_PROVIDER_RESTING_POINTS'][provider.identifier]
current = provider.priority current = provider.priority
if current != target: if current != target:
if current > target: if current > target:
provider.priority = max(provider.priority - amount_to_reduce_by, target) new_priority = max(target, provider.priority - amount_to_reduce_by)
else: else:
provider.priority = min(provider.priority + amount_to_reduce_by, target) new_priority = min(target, provider.priority + amount_to_reduce_by)
provider.created_by_id = notify_user.id _adjust_provider_priority(provider, new_priority)
_update_provider_details_without_commit(provider)
current_app.logger.info('Adjusting provider priority - {} going from {} to {}'.format(
provider.identifier,
current,
provider.priority,
))
def get_provider_details_by_notification_type(notification_type, supports_international=False): def get_provider_details_by_notification_type(notification_type, supports_international=False):

View File

@@ -13,6 +13,9 @@ from app.dao.provider_details_dao import (
dao_update_provider_details, dao_update_provider_details,
dao_get_provider_stats, dao_get_provider_stats,
dao_reduce_sms_provider_priority, dao_reduce_sms_provider_priority,
dao_adjust_provider_priority_back_to_resting_points,
_adjust_provider_priority,
_get_sms_providers_for_update,
) )
from tests.app.db import ( from tests.app.db import (
create_ft_billing, create_ft_billing,
@@ -122,6 +125,71 @@ def test_get_alternative_sms_provider_fails_if_unrecognised():
get_alternative_sms_provider('ses') get_alternative_sms_provider('ses')
@freeze_time('2016-01-01 00:30')
def test_adjust_provider_priority_sets_priority(
restore_provider_details,
notify_user,
mmg_provider,
):
# need to update these manually to avoid triggering the `onupdate` clause of the updated_at column
ProviderDetails.query.filter(ProviderDetails.identifier == 'mmg').update({'updated_at': datetime.min})
_adjust_provider_priority(mmg_provider, 50)
assert mmg_provider.updated_at == datetime.utcnow()
assert mmg_provider.created_by.id == notify_user.id
assert mmg_provider.priority == 50
@freeze_time('2016-01-01 00:30')
def test_adjust_provider_priority_adds_history(
restore_provider_details,
notify_user,
mmg_provider,
):
# need to update these manually to avoid triggering the `onupdate` clause of the updated_at column
ProviderDetails.query.filter(ProviderDetails.identifier == 'mmg').update({'updated_at': datetime.min})
old_provider_history_rows = ProviderDetailsHistory.query.filter(
ProviderDetailsHistory.id == mmg_provider.id
).order_by(
desc(ProviderDetailsHistory.version)
).all()
_adjust_provider_priority(mmg_provider, 50)
updated_provider_history_rows = ProviderDetailsHistory.query.filter(
ProviderDetailsHistory.id == mmg_provider.id
).order_by(
desc(ProviderDetailsHistory.version)
).all()
assert len(updated_provider_history_rows) - len(old_provider_history_rows) == 1
assert updated_provider_history_rows[0].version - old_provider_history_rows[0].version == 1
assert updated_provider_history_rows[0].priority == 50
@freeze_time('2016-01-01 01:00')
def test_get_sms_providers_for_update_returns_providers(restore_provider_details):
sixty_one_minutes_ago = datetime(2015, 12, 31, 23, 59)
ProviderDetails.query.filter(ProviderDetails.identifier == 'mmg').update({'updated_at': sixty_one_minutes_ago})
ProviderDetails.query.filter(ProviderDetails.identifier == 'firetext').update({'updated_at': None})
resp = _get_sms_providers_for_update(timedelta(hours=1))
assert {p.identifier for p in resp} == {'mmg', 'firetext'}
@freeze_time('2016-01-01 01:00')
def test_get_sms_providers_for_update_returns_nothing_if_recent_updates(restore_provider_details):
fifty_nine_minutes_ago = datetime(2016, 1, 1, 0, 1)
ProviderDetails.query.filter(ProviderDetails.identifier == 'mmg').update({'updated_at': fifty_nine_minutes_ago})
resp = _get_sms_providers_for_update(timedelta(hours=1))
assert not resp
@pytest.mark.parametrize(['starting_priorities', 'expected_priorities'], [ @pytest.mark.parametrize(['starting_priorities', 'expected_priorities'], [
({'mmg': 50, 'firetext': 50}, {'mmg': 40, 'firetext': 60}), ({'mmg': 50, 'firetext': 50}, {'mmg': 40, 'firetext': 60}),
({'mmg': 0, 'firetext': 20}, {'mmg': 0, 'firetext': 30}), # lower bound respected ({'mmg': 0, 'firetext': 20}, {'mmg': 0, 'firetext': 30}), # lower bound respected
@@ -135,15 +203,15 @@ def test_get_alternative_sms_provider_fails_if_unrecognised():
({'mmg': -100, 'firetext': 50}, {'mmg': 0, 'firetext': 60}), ({'mmg': -100, 'firetext': 50}, {'mmg': 0, 'firetext': 60}),
({'mmg': 50, 'firetext': -100}, {'mmg': 40, 'firetext': -90}), ({'mmg': 50, 'firetext': -100}, {'mmg': 40, 'firetext': -90}),
]) ])
def test_reduce_sms_provider_priority_switches_provider( def test_reduce_sms_provider_priority_adjusts_provider_priorities(
notify_db_session,
mocker, mocker,
restore_provider_details, restore_provider_details,
sample_user, notify_user,
starting_priorities, starting_priorities,
expected_priorities, expected_priorities,
): ):
mocker.patch('app.dao.provider_details_dao.get_user_by_id', return_value=sample_user) mock_adjust = mocker.patch('app.dao.provider_details_dao._adjust_provider_priority')
mmg = get_provider_details_by_identifier('mmg') mmg = get_provider_details_by_identifier('mmg')
firetext = get_provider_details_by_identifier('firetext') firetext = get_provider_details_by_identifier('firetext')
@@ -155,56 +223,81 @@ def test_reduce_sms_provider_priority_switches_provider(
# switch away from mmg. currently both 50/50 # switch away from mmg. currently both 50/50
dao_reduce_sms_provider_priority('mmg', time_threshold=timedelta(minutes=10)) dao_reduce_sms_provider_priority('mmg', time_threshold=timedelta(minutes=10))
assert firetext.priority == expected_priorities['firetext'] mock_adjust.assert_any_call(firetext, expected_priorities['firetext'])
assert mmg.priority == expected_priorities['mmg'] mock_adjust.assert_any_call(mmg, expected_priorities['mmg'])
assert mmg.created_by is sample_user
assert firetext.created_by is sample_user
def test_reduce_sms_provider_priority_adds_rows_to_history_table(
mocker,
restore_provider_details,
sample_user
):
mocker.patch('app.dao.provider_details_dao.get_user_by_id', return_value=sample_user)
mmg = get_provider_details_by_identifier('mmg')
# need to update these manually to avoid triggering the `onupdate` clause of the updated_at column
ProviderDetails.query.filter(ProviderDetails.notification_type == 'sms').update({'updated_at': datetime.min})
provider_history_rows = ProviderDetailsHistory.query.filter(
ProviderDetailsHistory.id == mmg.id
).order_by(
desc(ProviderDetailsHistory.version)
).all()
dao_reduce_sms_provider_priority(mmg.identifier, time_threshold=timedelta(minutes=10))
updated_provider_history_rows = ProviderDetailsHistory.query.filter(
ProviderDetailsHistory.id == mmg.id
).order_by(
desc(ProviderDetailsHistory.version)
).all()
assert len(updated_provider_history_rows) - len(provider_history_rows) == 1
assert updated_provider_history_rows[0].version - provider_history_rows[0].version == 1
assert updated_provider_history_rows[0].priority == 90
def test_reduce_sms_provider_priority_does_nothing_if_providers_have_recently_changed( def test_reduce_sms_provider_priority_does_nothing_if_providers_have_recently_changed(
mocker, mocker,
restore_provider_details, restore_provider_details,
): ):
mock_is_slow = mocker.patch('app.celery.scheduled_tasks.is_delivery_slow_for_providers') mock_get_providers = mocker.patch('app.dao.provider_details_dao._get_sms_providers_for_update', return_value=None)
mock_reduce = mocker.patch('app.celery.scheduled_tasks.dao_reduce_sms_provider_priority') mock_adjust = mocker.patch('app.dao.provider_details_dao._adjust_provider_priority')
ProviderDetails.query.filter(ProviderDetails.identifier == 'firetext').update({'updated_at': datetime.min})
ProviderDetails.query.filter(ProviderDetails.identifier == 'mmg').update(
{'updated_at': datetime.utcnow() - timedelta(minutes=4, seconds=59)}
)
dao_reduce_sms_provider_priority('firetext', time_threshold=timedelta(minutes=5)) dao_reduce_sms_provider_priority('firetext', time_threshold=timedelta(minutes=5))
assert mock_is_slow.called is False mock_get_providers.assert_called_once_with(timedelta(minutes=5))
assert mock_reduce.called is False assert mock_adjust.called is False
@pytest.mark.parametrize('existing_mmg, existing_firetext, new_mmg, new_firetext', [
(50, 50, 60, 40), # not just 50/50 - 60/40 specifically
(65, 35, 60, 40), # doesn't overshoot if there's less than 10 difference
(0, 100, 10, 90), # only adjusts by 10
(100, 100, 90, 90), # it tries to fix weird data - it will reduce both if needs be
])
def test_adjust_provider_priority_back_to_resting_points_updates_all_providers(
restore_provider_details,
mocker,
existing_mmg,
existing_firetext,
new_mmg,
new_firetext
):
mmg = get_provider_details_by_identifier('mmg')
firetext = get_provider_details_by_identifier('firetext')
mmg.priority = existing_mmg
firetext.priority = existing_firetext
mock_adjust = mocker.patch('app.dao.provider_details_dao._adjust_provider_priority')
mock_get_providers = mocker.patch('app.dao.provider_details_dao._get_sms_providers_for_update', return_value=[
mmg, firetext
])
dao_adjust_provider_priority_back_to_resting_points()
mock_get_providers.assert_called_once_with(timedelta(hours=1))
mock_adjust.assert_any_call(mmg, new_mmg)
mock_adjust.assert_any_call(firetext, new_firetext)
def test_adjust_provider_priority_back_to_resting_points_does_nothing_if_theyre_already_at_right_values(
restore_provider_details,
mocker,
):
mmg = get_provider_details_by_identifier('mmg')
firetext = get_provider_details_by_identifier('firetext')
mmg.priority = 60
firetext.priority = 40
mock_adjust = mocker.patch('app.dao.provider_details_dao._adjust_provider_priority')
mocker.patch('app.dao.provider_details_dao._get_sms_providers_for_update', return_value=[mmg, firetext])
dao_adjust_provider_priority_back_to_resting_points()
assert mock_adjust.called is False
def test_adjust_provider_priority_back_to_resting_points_does_nothing_if_no_providers_to_update(
restore_provider_details,
mocker,
):
mock_adjust = mocker.patch('app.dao.provider_details_dao._adjust_provider_priority')
mocker.patch('app.dao.provider_details_dao._get_sms_providers_for_update', return_value=[])
dao_adjust_provider_priority_back_to_resting_points()
assert mock_adjust.called is False
@freeze_time('2018-06-28 12:00') @freeze_time('2018-06-28 12:00')