diff --git a/app/celery/scheduled_tasks.py b/app/celery/scheduled_tasks.py index 3e58dfc02..52831ebc1 100644 --- a/app/celery/scheduled_tasks.py +++ b/app/celery/scheduled_tasks.py @@ -33,7 +33,10 @@ from app.dao.notifications_dao import ( letters_missing_from_sending_bucket, is_delivery_slow_for_providers, ) -from app.dao.provider_details_dao import dao_reduce_sms_provider_priority +from app.dao.provider_details_dao import ( + dao_reduce_sms_provider_priority, + dao_adjust_provider_priority_back_to_resting_points +) from app.dao.users_dao import delete_codes_older_created_more_than_a_day_ago from app.dao.services_dao import dao_find_services_sending_to_tv_numbers, dao_find_services_with_high_failure_rates from app.models import ( @@ -126,6 +129,12 @@ def switch_current_sms_provider_on_slow_delivery(): dao_reduce_sms_provider_priority(provider_name, time_threshold=timedelta(minutes=10)) +@notify_celery.task(name='tend-providers-back-to-middle') +@statsd(namespace='tasks') +def tend_providers_back_to_middle(): + dao_adjust_provider_priority_back_to_resting_points() + + @notify_celery.task(name='check-job-status') @statsd(namespace="tasks") def check_job_status(): diff --git a/app/config.py b/app/config.py index f248cf9ed..49136a50c 100644 --- a/app/config.py +++ b/app/config.py @@ -135,6 +135,12 @@ class Config(object): CHECK_PROXY_HEADER = False + # these should always add up to 100% + SMS_PROVIDER_RESTING_POINTS = { + 'mmg': 60, + 'firetext': 40 + } + NOTIFY_SERVICE_ID = 'd6aa2c68-a2d9-4437-ab19-3ae8eb202553' NOTIFY_USER_ID = '6af522d0-2915-4e52-83a3-3690455a5fe6' INVITATION_EMAIL_TEMPLATE_ID = '4f46df42-f795-4cc4-83bb-65ca312f49cc' @@ -198,6 +204,11 @@ class Config(object): 'schedule': crontab(), 'options': {'queue': QueueNames.PERIODIC} }, + 'tend-providers-back-to-middle': { + 'task': 'tend-providers-back-to-middle', + 'schedule': crontab(minute='*/5'), + 'options': {'queue': QueueNames.PERIODIC} + }, 'check-for-missing-rows-in-completed-jobs': { 'task': 'check-for-missing-rows-in-completed-jobs', 'schedule': crontab(minute='*/10'), diff --git a/app/dao/provider_details_dao.py b/app/dao/provider_details_dao.py index 1c4977bd8..64f5ae6b4 100644 --- a/app/dao/provider_details_dao.py +++ b/app/dao/provider_details_dao.py @@ -1,11 +1,10 @@ -from datetime import datetime +from datetime import datetime, timedelta from notifications_utils.timezones import convert_utc_to_bst from sqlalchemy import asc, desc, func from flask import current_app from app.dao.dao_utils import transactional -from app.dao.users_dao import get_user_by_id from app.models import FactBilling, ProviderDetails, ProviderDetailsHistory, SMS_TYPE, User from app import db @@ -34,6 +33,42 @@ def dao_get_provider_versions(provider_id): ).all() +def _adjust_provider_priority(provider, new_priority): + current_app.logger.info( + f'Adjusting provider priority - {provider.identifier} going from {provider.priority} to {new_priority}' + ) + provider.priority = new_priority + + # Automatic update so set as notify user + provider.created_by_id = current_app.config['NOTIFY_USER_ID'] + + # update without commit so that both rows can be changed without ending the transaction + # and releasing the for_update lock + _update_provider_details_without_commit(provider) + + +def _get_sms_providers_for_update(time_threshold): + """ + Returns a list of providers, while holding a for_update lock on the provider details table, guaranteeing that those + providers won't change (but can still be read) until you've committed/rolled back your current transaction. + + if any of the providers have been changed recently, it returns an empty list - it's still your responsiblity to + release the transaction in that case + """ + # get current priority of both providers + q = ProviderDetails.query.filter( + ProviderDetails.notification_type == 'sms', + ProviderDetails.active + ).with_for_update().all() + + # if something updated recently, don't update again. If the updated_at is null, treat it as min time + if any((provider.updated_at or datetime.min) > datetime.utcnow() - time_threshold for provider in q): + current_app.logger.info(f"Not adjusting providers, providers updated less than {time_threshold} ago.") + return [] + + return q + + @transactional def dao_reduce_sms_provider_priority(identifier, *, time_threshold): """ @@ -41,50 +76,47 @@ def dao_reduce_sms_provider_priority(identifier, *, time_threshold): If either provider has been updated in the last `time_threshold`, then it won't take any action. """ amount_to_reduce_by = 10 + providers_list = _get_sms_providers_for_update(time_threshold) - # get current priority of both providers - q = ProviderDetails.query.filter( - ProviderDetails.notification_type == 'sms', - ProviderDetails.active - ).with_for_update().all() - - providers = {provider.identifier: provider for provider in q} - other_identifier = get_alternative_sms_provider(identifier) - - # if something updated recently, don't update again. If the updated_at is null, treat it as min time - if any((provider.updated_at or datetime.min) > datetime.utcnow() - time_threshold for provider in q): - current_app.logger.info("Not adjusting providers, providers updated less than {} ago.".format(time_threshold)) + if not providers_list: return + providers = {provider.identifier: provider for provider in providers_list} + other_identifier = get_alternative_sms_provider(identifier) + reduced_provider = providers[identifier] increased_provider = providers[other_identifier] - pre_reduction_priority = reduced_provider.priority - pre_increase_priority = increased_provider.priority # always keep values between 0 and 100 - reduced_provider.priority = max(0, reduced_provider.priority - amount_to_reduce_by) - increased_provider.priority = min(100, increased_provider.priority + amount_to_reduce_by) + reduced_provider_priority = max(0, reduced_provider.priority - amount_to_reduce_by) + increased_provider_priority = min(100, increased_provider.priority + amount_to_reduce_by) - current_app.logger.info('Adjusting provider priority - {} going from {} to {}'.format( - reduced_provider.identifier, - pre_reduction_priority, - reduced_provider.priority, - )) - current_app.logger.info('Adjusting provider priority - {} going from {} to {}'.format( - increased_provider.identifier, - pre_increase_priority, - increased_provider.priority, - )) + _adjust_provider_priority(reduced_provider, reduced_provider_priority) + _adjust_provider_priority(increased_provider, increased_provider_priority) - # Automatic update so set as notify user - notify_user = get_user_by_id(current_app.config['NOTIFY_USER_ID']) - reduced_provider.created_by_id = notify_user.id - increased_provider.created_by_id = notify_user.id - # update without commit so that both rows can be changed without ending the transaction - # and releasing the for_update lock - _update_provider_details_without_commit(reduced_provider) - _update_provider_details_without_commit(increased_provider) +@transactional +def dao_adjust_provider_priority_back_to_resting_points(): + """ + Provided that neither SMS provider has been modified in the last hour, move both providers by 10 percentage points + each towards their defined resting points (set in SMS_PROVIDER_RESTING_POINTS in config.py). + """ + amount_to_reduce_by = 10 + time_threshold = timedelta(hours=1) + + providers = _get_sms_providers_for_update(time_threshold) + + for provider in providers: + target = current_app.config['SMS_PROVIDER_RESTING_POINTS'][provider.identifier] + current = provider.priority + + if current != target: + if current > target: + new_priority = max(target, provider.priority - amount_to_reduce_by) + else: + new_priority = min(target, provider.priority + amount_to_reduce_by) + + _adjust_provider_priority(provider, new_priority) def get_provider_details_by_notification_type(notification_type, supports_international=False): diff --git a/app/provider_details/switch_providers.py b/app/provider_details/switch_providers.py deleted file mode 100644 index 4ed279de3..000000000 --- a/app/provider_details/switch_providers.py +++ /dev/null @@ -1,53 +0,0 @@ -from flask import current_app - -from app.dao.users_dao import get_user_by_id - - -def provider_is_inactive(new_provider): - if not new_provider.active: - current_app.logger.warning('Cancelling switch to {} as they are inactive'.format( - new_provider.identifier, - )) - return True - - -def provider_is_primary(current_provider, new_provider, identifier): - if current_provider.identifier == identifier: - current_app.logger.warning('Provider {} is already activated'.format(current_provider.display_name)) - return True - - return False - - -def switch_providers(current_provider, new_provider): - # Automatic update so set as notify user - notify_user = get_user_by_id(current_app.config['NOTIFY_USER_ID']) - current_provider.created_by_id = new_provider.created_by_id = notify_user.id - - # Swap priority to change primary provider - if new_provider.priority > current_provider.priority: - new_provider.priority, current_provider.priority = current_provider.priority, new_provider.priority - - # Increase other provider priority if equal - elif new_provider.priority == current_provider.priority: - current_provider.priority += 10 - - _print_provider_switch_logs(current_provider, new_provider) - return current_provider, new_provider - - -def _print_provider_switch_logs(current_provider, new_provider): - current_app.logger.warning('Switching provider from {} to {}'.format( - current_provider.identifier, - new_provider.identifier - )) - - current_app.logger.warning('Provider {} now updated with priority of {}'.format( - current_provider.identifier, - current_provider.priority - )) - - current_app.logger.warning('Provider {} now updated with priority of {}'.format( - new_provider.identifier, - new_provider.priority - )) diff --git a/tests/app/conftest.py b/tests/app/conftest.py index ee158a430..bbf2b8065 100644 --- a/tests/app/conftest.py +++ b/tests/app/conftest.py @@ -6,7 +6,6 @@ import pytest import pytz import requests_mock from flask import current_app, url_for -from sqlalchemy import asc from sqlalchemy.orm.session import make_transient from app import db @@ -548,15 +547,6 @@ def fake_uuid(): return "6ce466d0-fd6a-11e5-82f5-e0accb9d11a6" -@pytest.fixture(scope='function') -def current_sms_provider(): - return ProviderDetails.query.filter_by( - notification_type='sms' - ).order_by( - asc(ProviderDetails.priority) - ).first() - - @pytest.fixture(scope='function') def ses_provider(): return ProviderDetails.query.filter_by(identifier='ses').one() diff --git a/tests/app/dao/test_provider_details_dao.py b/tests/app/dao/test_provider_details_dao.py index 4445f3aa9..a764a3f48 100644 --- a/tests/app/dao/test_provider_details_dao.py +++ b/tests/app/dao/test_provider_details_dao.py @@ -13,6 +13,9 @@ from app.dao.provider_details_dao import ( dao_update_provider_details, dao_get_provider_stats, dao_reduce_sms_provider_priority, + dao_adjust_provider_priority_back_to_resting_points, + _adjust_provider_priority, + _get_sms_providers_for_update, ) from tests.app.db import ( create_ft_billing, @@ -122,6 +125,71 @@ def test_get_alternative_sms_provider_fails_if_unrecognised(): get_alternative_sms_provider('ses') +@freeze_time('2016-01-01 00:30') +def test_adjust_provider_priority_sets_priority( + restore_provider_details, + notify_user, + mmg_provider, +): + # need to update these manually to avoid triggering the `onupdate` clause of the updated_at column + ProviderDetails.query.filter(ProviderDetails.identifier == 'mmg').update({'updated_at': datetime.min}) + + _adjust_provider_priority(mmg_provider, 50) + + assert mmg_provider.updated_at == datetime.utcnow() + assert mmg_provider.created_by.id == notify_user.id + assert mmg_provider.priority == 50 + + +@freeze_time('2016-01-01 00:30') +def test_adjust_provider_priority_adds_history( + restore_provider_details, + notify_user, + mmg_provider, +): + # need to update these manually to avoid triggering the `onupdate` clause of the updated_at column + ProviderDetails.query.filter(ProviderDetails.identifier == 'mmg').update({'updated_at': datetime.min}) + + old_provider_history_rows = ProviderDetailsHistory.query.filter( + ProviderDetailsHistory.id == mmg_provider.id + ).order_by( + desc(ProviderDetailsHistory.version) + ).all() + + _adjust_provider_priority(mmg_provider, 50) + + updated_provider_history_rows = ProviderDetailsHistory.query.filter( + ProviderDetailsHistory.id == mmg_provider.id + ).order_by( + desc(ProviderDetailsHistory.version) + ).all() + + assert len(updated_provider_history_rows) - len(old_provider_history_rows) == 1 + assert updated_provider_history_rows[0].version - old_provider_history_rows[0].version == 1 + assert updated_provider_history_rows[0].priority == 50 + + +@freeze_time('2016-01-01 01:00') +def test_get_sms_providers_for_update_returns_providers(restore_provider_details): + sixty_one_minutes_ago = datetime(2015, 12, 31, 23, 59) + ProviderDetails.query.filter(ProviderDetails.identifier == 'mmg').update({'updated_at': sixty_one_minutes_ago}) + ProviderDetails.query.filter(ProviderDetails.identifier == 'firetext').update({'updated_at': None}) + + resp = _get_sms_providers_for_update(timedelta(hours=1)) + + assert {p.identifier for p in resp} == {'mmg', 'firetext'} + + +@freeze_time('2016-01-01 01:00') +def test_get_sms_providers_for_update_returns_nothing_if_recent_updates(restore_provider_details): + fifty_nine_minutes_ago = datetime(2016, 1, 1, 0, 1) + ProviderDetails.query.filter(ProviderDetails.identifier == 'mmg').update({'updated_at': fifty_nine_minutes_ago}) + + resp = _get_sms_providers_for_update(timedelta(hours=1)) + + assert not resp + + @pytest.mark.parametrize(['starting_priorities', 'expected_priorities'], [ ({'mmg': 50, 'firetext': 50}, {'mmg': 40, 'firetext': 60}), ({'mmg': 0, 'firetext': 20}, {'mmg': 0, 'firetext': 30}), # lower bound respected @@ -135,15 +203,15 @@ def test_get_alternative_sms_provider_fails_if_unrecognised(): ({'mmg': -100, 'firetext': 50}, {'mmg': 0, 'firetext': 60}), ({'mmg': 50, 'firetext': -100}, {'mmg': 40, 'firetext': -90}), ]) -def test_reduce_sms_provider_priority_switches_provider( - notify_db_session, +def test_reduce_sms_provider_priority_adjusts_provider_priorities( mocker, restore_provider_details, - sample_user, + notify_user, starting_priorities, expected_priorities, ): - mocker.patch('app.dao.provider_details_dao.get_user_by_id', return_value=sample_user) + mock_adjust = mocker.patch('app.dao.provider_details_dao._adjust_provider_priority') + mmg = get_provider_details_by_identifier('mmg') firetext = get_provider_details_by_identifier('firetext') @@ -155,56 +223,81 @@ def test_reduce_sms_provider_priority_switches_provider( # switch away from mmg. currently both 50/50 dao_reduce_sms_provider_priority('mmg', time_threshold=timedelta(minutes=10)) - assert firetext.priority == expected_priorities['firetext'] - assert mmg.priority == expected_priorities['mmg'] - assert mmg.created_by is sample_user - assert firetext.created_by is sample_user - - -def test_reduce_sms_provider_priority_adds_rows_to_history_table( - mocker, - restore_provider_details, - sample_user -): - mocker.patch('app.dao.provider_details_dao.get_user_by_id', return_value=sample_user) - mmg = get_provider_details_by_identifier('mmg') - # need to update these manually to avoid triggering the `onupdate` clause of the updated_at column - ProviderDetails.query.filter(ProviderDetails.notification_type == 'sms').update({'updated_at': datetime.min}) - - provider_history_rows = ProviderDetailsHistory.query.filter( - ProviderDetailsHistory.id == mmg.id - ).order_by( - desc(ProviderDetailsHistory.version) - ).all() - - dao_reduce_sms_provider_priority(mmg.identifier, time_threshold=timedelta(minutes=10)) - - updated_provider_history_rows = ProviderDetailsHistory.query.filter( - ProviderDetailsHistory.id == mmg.id - ).order_by( - desc(ProviderDetailsHistory.version) - ).all() - - assert len(updated_provider_history_rows) - len(provider_history_rows) == 1 - assert updated_provider_history_rows[0].version - provider_history_rows[0].version == 1 - assert updated_provider_history_rows[0].priority == 90 + mock_adjust.assert_any_call(firetext, expected_priorities['firetext']) + mock_adjust.assert_any_call(mmg, expected_priorities['mmg']) def test_reduce_sms_provider_priority_does_nothing_if_providers_have_recently_changed( mocker, restore_provider_details, ): - mock_is_slow = mocker.patch('app.celery.scheduled_tasks.is_delivery_slow_for_providers') - mock_reduce = mocker.patch('app.celery.scheduled_tasks.dao_reduce_sms_provider_priority') - ProviderDetails.query.filter(ProviderDetails.identifier == 'firetext').update({'updated_at': datetime.min}) - ProviderDetails.query.filter(ProviderDetails.identifier == 'mmg').update( - {'updated_at': datetime.utcnow() - timedelta(minutes=4, seconds=59)} - ) + mock_get_providers = mocker.patch('app.dao.provider_details_dao._get_sms_providers_for_update', return_value=None) + mock_adjust = mocker.patch('app.dao.provider_details_dao._adjust_provider_priority') dao_reduce_sms_provider_priority('firetext', time_threshold=timedelta(minutes=5)) - assert mock_is_slow.called is False - assert mock_reduce.called is False + mock_get_providers.assert_called_once_with(timedelta(minutes=5)) + assert mock_adjust.called is False + + +@pytest.mark.parametrize('existing_mmg, existing_firetext, new_mmg, new_firetext', [ + (50, 50, 60, 40), # not just 50/50 - 60/40 specifically + (65, 35, 60, 40), # doesn't overshoot if there's less than 10 difference + (0, 100, 10, 90), # only adjusts by 10 + (100, 100, 90, 90), # it tries to fix weird data - it will reduce both if needs be +]) +def test_adjust_provider_priority_back_to_resting_points_updates_all_providers( + restore_provider_details, + mocker, + existing_mmg, + existing_firetext, + new_mmg, + new_firetext +): + mmg = get_provider_details_by_identifier('mmg') + firetext = get_provider_details_by_identifier('firetext') + mmg.priority = existing_mmg + firetext.priority = existing_firetext + + mock_adjust = mocker.patch('app.dao.provider_details_dao._adjust_provider_priority') + mock_get_providers = mocker.patch('app.dao.provider_details_dao._get_sms_providers_for_update', return_value=[ + mmg, firetext + ]) + + dao_adjust_provider_priority_back_to_resting_points() + + mock_get_providers.assert_called_once_with(timedelta(hours=1)) + mock_adjust.assert_any_call(mmg, new_mmg) + mock_adjust.assert_any_call(firetext, new_firetext) + + +def test_adjust_provider_priority_back_to_resting_points_does_nothing_if_theyre_already_at_right_values( + restore_provider_details, + mocker, +): + mmg = get_provider_details_by_identifier('mmg') + firetext = get_provider_details_by_identifier('firetext') + mmg.priority = 60 + firetext.priority = 40 + + mock_adjust = mocker.patch('app.dao.provider_details_dao._adjust_provider_priority') + mocker.patch('app.dao.provider_details_dao._get_sms_providers_for_update', return_value=[mmg, firetext]) + + dao_adjust_provider_priority_back_to_resting_points() + + assert mock_adjust.called is False + + +def test_adjust_provider_priority_back_to_resting_points_does_nothing_if_no_providers_to_update( + restore_provider_details, + mocker, +): + mock_adjust = mocker.patch('app.dao.provider_details_dao._adjust_provider_priority') + mocker.patch('app.dao.provider_details_dao._get_sms_providers_for_update', return_value=[]) + + dao_adjust_provider_priority_back_to_resting_points() + + assert mock_adjust.called is False @freeze_time('2018-06-28 12:00') diff --git a/tests/app/test_schemas.py b/tests/app/test_schemas.py index db27c6707..df66bff93 100644 --- a/tests/app/test_schemas.py +++ b/tests/app/test_schemas.py @@ -105,7 +105,6 @@ def test_provider_details_schema_returns_user_details( ): from app.schemas import provider_details_schema current_sms_provider = get_provider_details_by_identifier('mmg') - mocker.patch('app.provider_details.switch_providers.get_user_by_id', return_value=sample_user) current_sms_provider.created_by = sample_user data = provider_details_schema.dump(current_sms_provider).data @@ -118,7 +117,6 @@ def test_provider_details_history_schema_returns_user_details( restore_provider_details, ): from app.schemas import provider_details_schema - mocker.patch('app.provider_details.switch_providers.get_user_by_id', return_value=sample_user) current_sms_provider = get_provider_details_by_identifier('mmg') current_sms_provider.created_by_id = sample_user.id data = provider_details_schema.dump(current_sms_provider).data