mirror of
https://github.com/GSA/notifications-api.git
synced 2025-12-22 08:21:13 -05:00
DRY-up conditionally creating callback tasks
This removes 3 duplicate instances of the same code, which is still
tested implicitly via test_process_ses_receipt_tasks [1]. In the
next commit we'll make this test more explicit, to reflect that it's
now being reused elsewhere and shouldn't change arbitrarily.
We do lose the "print" statement from the command instance of the
code, but I think that's a very tolerable loss.
[1]: 16ec8ccb8a/tests/app/celery/test_process_ses_receipts_tasks.py (L94)
This commit is contained in:
@@ -10,10 +10,6 @@ from sqlalchemy.exc import SQLAlchemyError
|
|||||||
|
|
||||||
from app import notify_celery, zendesk_client
|
from app import notify_celery, zendesk_client
|
||||||
from app.aws import s3
|
from app.aws import s3
|
||||||
from app.celery.service_callback_tasks import (
|
|
||||||
create_delivery_status_callback_data,
|
|
||||||
send_delivery_status_to_service,
|
|
||||||
)
|
|
||||||
from app.config import QueueNames
|
from app.config import QueueNames
|
||||||
from app.cronitor import cronitor
|
from app.cronitor import cronitor
|
||||||
from app.dao.fact_processing_time_dao import insert_update_processing_time
|
from app.dao.fact_processing_time_dao import insert_update_processing_time
|
||||||
@@ -27,9 +23,6 @@ from app.dao.notifications_dao import (
|
|||||||
dao_timeout_notifications,
|
dao_timeout_notifications,
|
||||||
delete_notifications_older_than_retention_by_type,
|
delete_notifications_older_than_retention_by_type,
|
||||||
)
|
)
|
||||||
from app.dao.service_callback_api_dao import (
|
|
||||||
get_service_delivery_status_callback_api_for_service,
|
|
||||||
)
|
|
||||||
from app.models import (
|
from app.models import (
|
||||||
EMAIL_TYPE,
|
EMAIL_TYPE,
|
||||||
KEY_TYPE_NORMAL,
|
KEY_TYPE_NORMAL,
|
||||||
@@ -39,6 +32,9 @@ from app.models import (
|
|||||||
FactProcessingTime,
|
FactProcessingTime,
|
||||||
Notification,
|
Notification,
|
||||||
)
|
)
|
||||||
|
from app.notifications.notifications_ses_callback import (
|
||||||
|
check_and_queue_callback_task,
|
||||||
|
)
|
||||||
from app.utils import get_london_midnight_in_utc
|
from app.utils import get_london_midnight_in_utc
|
||||||
|
|
||||||
|
|
||||||
@@ -126,12 +122,7 @@ def timeout_notifications():
|
|||||||
dao_timeout_notifications(current_app.config.get('SENDING_NOTIFICATIONS_TIMEOUT_PERIOD'))
|
dao_timeout_notifications(current_app.config.get('SENDING_NOTIFICATIONS_TIMEOUT_PERIOD'))
|
||||||
|
|
||||||
for notification in notifications:
|
for notification in notifications:
|
||||||
# queue callback task only if the service_callback_api exists
|
check_and_queue_callback_task(notification)
|
||||||
service_callback_api = get_service_delivery_status_callback_api_for_service(service_id=notification.service_id) # noqa: E501
|
|
||||||
if service_callback_api:
|
|
||||||
encrypted_notification = create_delivery_status_callback_data(notification, service_callback_api)
|
|
||||||
send_delivery_status_to_service.apply_async([str(notification.id), encrypted_notification],
|
|
||||||
queue=QueueNames.CALLBACKS)
|
|
||||||
|
|
||||||
current_app.logger.info(
|
current_app.logger.info(
|
||||||
"Timeout period reached for {} notifications, status has been updated.".format(len(notifications)))
|
"Timeout period reached for {} notifications, status has been updated.".format(len(notifications)))
|
||||||
|
|||||||
@@ -11,8 +11,8 @@ from app.config import QueueNames
|
|||||||
from app.dao import notifications_dao
|
from app.dao import notifications_dao
|
||||||
from app.models import NOTIFICATION_PENDING, NOTIFICATION_SENDING
|
from app.models import NOTIFICATION_PENDING, NOTIFICATION_SENDING
|
||||||
from app.notifications.notifications_ses_callback import (
|
from app.notifications.notifications_ses_callback import (
|
||||||
_check_and_queue_callback_task,
|
|
||||||
_check_and_queue_complaint_callback_task,
|
_check_and_queue_complaint_callback_task,
|
||||||
|
check_and_queue_callback_task,
|
||||||
determine_notification_bounce_type,
|
determine_notification_bounce_type,
|
||||||
handle_complaint,
|
handle_complaint,
|
||||||
)
|
)
|
||||||
@@ -76,7 +76,7 @@ def process_ses_results(self, response):
|
|||||||
notification.sent_at
|
notification.sent_at
|
||||||
)
|
)
|
||||||
|
|
||||||
_check_and_queue_callback_task(notification)
|
check_and_queue_callback_task(notification)
|
||||||
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|||||||
@@ -5,20 +5,15 @@ from flask import current_app
|
|||||||
from notifications_utils.template import SMSMessageTemplate
|
from notifications_utils.template import SMSMessageTemplate
|
||||||
|
|
||||||
from app import notify_celery, statsd_client
|
from app import notify_celery, statsd_client
|
||||||
from app.celery.service_callback_tasks import (
|
|
||||||
create_delivery_status_callback_data,
|
|
||||||
send_delivery_status_to_service,
|
|
||||||
)
|
|
||||||
from app.clients import ClientException
|
from app.clients import ClientException
|
||||||
from app.clients.sms.firetext import get_firetext_responses
|
from app.clients.sms.firetext import get_firetext_responses
|
||||||
from app.clients.sms.mmg import get_mmg_responses
|
from app.clients.sms.mmg import get_mmg_responses
|
||||||
from app.config import QueueNames
|
|
||||||
from app.dao import notifications_dao
|
from app.dao import notifications_dao
|
||||||
from app.dao.service_callback_api_dao import (
|
|
||||||
get_service_delivery_status_callback_api_for_service,
|
|
||||||
)
|
|
||||||
from app.dao.templates_dao import dao_get_template_by_id
|
from app.dao.templates_dao import dao_get_template_by_id
|
||||||
from app.models import NOTIFICATION_PENDING
|
from app.models import NOTIFICATION_PENDING
|
||||||
|
from app.notifications.notifications_ses_callback import (
|
||||||
|
check_and_queue_callback_task,
|
||||||
|
)
|
||||||
|
|
||||||
sms_response_mapper = {
|
sms_response_mapper = {
|
||||||
'MMG': get_mmg_responses,
|
'MMG': get_mmg_responses,
|
||||||
@@ -94,9 +89,4 @@ def _process_for_status(notification_status, client_name, provider_reference, de
|
|||||||
notifications_dao.dao_update_notification(notification)
|
notifications_dao.dao_update_notification(notification)
|
||||||
|
|
||||||
if notification_status != NOTIFICATION_PENDING:
|
if notification_status != NOTIFICATION_PENDING:
|
||||||
service_callback_api = get_service_delivery_status_callback_api_for_service(service_id=notification.service_id)
|
check_and_queue_callback_task(notification)
|
||||||
# queue callback task only if the service_callback_api exists
|
|
||||||
if service_callback_api:
|
|
||||||
encrypted_notification = create_delivery_status_callback_data(notification, service_callback_api)
|
|
||||||
send_delivery_status_to_service.apply_async([str(notification.id), encrypted_notification],
|
|
||||||
queue=QueueNames.CALLBACKS)
|
|
||||||
|
|||||||
@@ -26,10 +26,6 @@ from app.celery.letters_pdf_tasks import (
|
|||||||
from app.celery.reporting_tasks import (
|
from app.celery.reporting_tasks import (
|
||||||
create_nightly_notification_status_for_day,
|
create_nightly_notification_status_for_day,
|
||||||
)
|
)
|
||||||
from app.celery.service_callback_tasks import (
|
|
||||||
create_delivery_status_callback_data,
|
|
||||||
send_delivery_status_to_service,
|
|
||||||
)
|
|
||||||
from app.celery.tasks import process_row, record_daily_sorted_counts
|
from app.celery.tasks import process_row, record_daily_sorted_counts
|
||||||
from app.config import QueueNames
|
from app.config import QueueNames
|
||||||
from app.dao.annual_billing_dao import (
|
from app.dao.annual_billing_dao import (
|
||||||
@@ -52,9 +48,6 @@ from app.dao.permissions_dao import permission_dao
|
|||||||
from app.dao.provider_rates_dao import (
|
from app.dao.provider_rates_dao import (
|
||||||
create_provider_rates as dao_create_provider_rates,
|
create_provider_rates as dao_create_provider_rates,
|
||||||
)
|
)
|
||||||
from app.dao.service_callback_api_dao import (
|
|
||||||
get_service_delivery_status_callback_api_for_service,
|
|
||||||
)
|
|
||||||
from app.dao.services_dao import (
|
from app.dao.services_dao import (
|
||||||
dao_fetch_all_services_by_user,
|
dao_fetch_all_services_by_user,
|
||||||
dao_fetch_all_services_created_by_user,
|
dao_fetch_all_services_created_by_user,
|
||||||
@@ -85,6 +78,9 @@ from app.models import (
|
|||||||
Service,
|
Service,
|
||||||
User,
|
User,
|
||||||
)
|
)
|
||||||
|
from app.notifications.notifications_ses_callback import (
|
||||||
|
check_and_queue_callback_task,
|
||||||
|
)
|
||||||
from app.utils import get_london_midnight_in_utc
|
from app.utils import get_london_midnight_in_utc
|
||||||
|
|
||||||
|
|
||||||
@@ -295,23 +291,7 @@ def replay_callbacks(file_name):
|
|||||||
for id in [id.strip() for id in file]:
|
for id in [id.strip() for id in file]:
|
||||||
try:
|
try:
|
||||||
notification = Notification.query.filter_by(id=id).one()
|
notification = Notification.query.filter_by(id=id).one()
|
||||||
|
check_and_queue_callback_task(notification)
|
||||||
callback_api = get_service_delivery_status_callback_api_for_service(
|
|
||||||
service_id=notification.service_id
|
|
||||||
)
|
|
||||||
|
|
||||||
if not callback_api:
|
|
||||||
print(f"Callback api was not found for notification: {id}.")
|
|
||||||
continue
|
|
||||||
|
|
||||||
encrypted_status_update = create_delivery_status_callback_data(
|
|
||||||
notification, callback_api
|
|
||||||
)
|
|
||||||
|
|
||||||
send_delivery_status_to_service.apply_async(
|
|
||||||
[id, encrypted_status_update], queue=QueueNames.CALLBACKS
|
|
||||||
)
|
|
||||||
|
|
||||||
print(f"Created callback task for notification: {id}.")
|
print(f"Created callback task for notification: {id}.")
|
||||||
except NoResultFound:
|
except NoResultFound:
|
||||||
print(f"ID: {id} was not found in notifications.")
|
print(f"ID: {id} was not found in notifications.")
|
||||||
|
|||||||
@@ -68,7 +68,7 @@ def remove_emails_from_complaint(complaint_dict):
|
|||||||
return complaint_dict['mail'].pop('destination')
|
return complaint_dict['mail'].pop('destination')
|
||||||
|
|
||||||
|
|
||||||
def _check_and_queue_callback_task(notification):
|
def check_and_queue_callback_task(notification):
|
||||||
# queue callback task only if the service_callback_api exists
|
# queue callback task only if the service_callback_api exists
|
||||||
service_callback_api = get_service_delivery_status_callback_api_for_service(service_id=notification.service_id)
|
service_callback_api = get_service_delivery_status_callback_api_for_service(service_id=notification.service_id)
|
||||||
if service_callback_api:
|
if service_callback_api:
|
||||||
|
|||||||
@@ -24,16 +24,11 @@ from app.celery.nightly_tasks import (
|
|||||||
save_daily_notification_processing_time,
|
save_daily_notification_processing_time,
|
||||||
timeout_notifications,
|
timeout_notifications,
|
||||||
)
|
)
|
||||||
from app.celery.service_callback_tasks import (
|
|
||||||
create_delivery_status_callback_data,
|
|
||||||
)
|
|
||||||
from app.config import QueueNames
|
|
||||||
from app.models import EMAIL_TYPE, LETTER_TYPE, SMS_TYPE, FactProcessingTime
|
from app.models import EMAIL_TYPE, LETTER_TYPE, SMS_TYPE, FactProcessingTime
|
||||||
from tests.app.db import (
|
from tests.app.db import (
|
||||||
create_job,
|
create_job,
|
||||||
create_notification,
|
create_notification,
|
||||||
create_service,
|
create_service,
|
||||||
create_service_callback_api,
|
|
||||||
create_service_data_retention,
|
create_service_data_retention,
|
||||||
create_template,
|
create_template,
|
||||||
)
|
)
|
||||||
@@ -166,8 +161,8 @@ def test_delete_letter_notifications_older_than_retention_calls_child_task(notif
|
|||||||
mocked.assert_called_once_with('letter')
|
mocked.assert_called_once_with('letter')
|
||||||
|
|
||||||
|
|
||||||
def test_timeout_notifications_no_callbacks(mocker, sample_notification):
|
def test_timeout_notifications(mocker, sample_notification):
|
||||||
mock_update = mocker.patch('app.celery.service_callback_tasks.send_delivery_status_to_service.apply_async')
|
mock_update = mocker.patch('app.celery.nightly_tasks.check_and_queue_callback_task')
|
||||||
mock_dao = mocker.patch('app.celery.nightly_tasks.dao_timeout_notifications')
|
mock_dao = mocker.patch('app.celery.nightly_tasks.dao_timeout_notifications')
|
||||||
mock_dao.return_value = [sample_notification]
|
mock_dao.return_value = [sample_notification]
|
||||||
|
|
||||||
@@ -177,19 +172,7 @@ def test_timeout_notifications_no_callbacks(mocker, sample_notification):
|
|||||||
current_app.config.get('SENDING_NOTIFICATIONS_TIMEOUT_PERIOD')
|
current_app.config.get('SENDING_NOTIFICATIONS_TIMEOUT_PERIOD')
|
||||||
)
|
)
|
||||||
|
|
||||||
mock_update.assert_not_called()
|
mock_update.assert_called_once_with(sample_notification)
|
||||||
|
|
||||||
|
|
||||||
def test_timeout_notifications_with_callbacks(mocker, sample_notification):
|
|
||||||
mock_update = mocker.patch('app.celery.service_callback_tasks.send_delivery_status_to_service.apply_async')
|
|
||||||
mock_dao = mocker.patch('app.celery.nightly_tasks.dao_timeout_notifications')
|
|
||||||
mock_dao.return_value = [sample_notification]
|
|
||||||
|
|
||||||
callback_api = create_service_callback_api(service=sample_notification.service)
|
|
||||||
timeout_notifications()
|
|
||||||
|
|
||||||
encrypted_data = create_delivery_status_callback_data(sample_notification, callback_api)
|
|
||||||
mock_update.assert_called_once_with([str(sample_notification.id), encrypted_data], queue=QueueNames.CALLBACKS)
|
|
||||||
|
|
||||||
|
|
||||||
def test_delete_inbound_sms_calls_child_task(notify_api, mocker):
|
def test_delete_inbound_sms_calls_child_task(notify_api, mocker):
|
||||||
|
|||||||
@@ -8,12 +8,8 @@ from app import statsd_client
|
|||||||
from app.celery.process_sms_client_response_tasks import (
|
from app.celery.process_sms_client_response_tasks import (
|
||||||
process_sms_client_response,
|
process_sms_client_response,
|
||||||
)
|
)
|
||||||
from app.celery.service_callback_tasks import (
|
|
||||||
create_delivery_status_callback_data,
|
|
||||||
)
|
|
||||||
from app.clients import ClientException
|
from app.clients import ClientException
|
||||||
from app.models import NOTIFICATION_TECHNICAL_FAILURE
|
from app.models import NOTIFICATION_TECHNICAL_FAILURE
|
||||||
from tests.app.db import create_service_callback_api
|
|
||||||
|
|
||||||
|
|
||||||
def test_process_sms_client_response_raises_error_if_reference_is_not_a_valid_uuid(client):
|
def test_process_sms_client_response_raises_error_if_reference_is_not_a_valid_uuid(client):
|
||||||
@@ -121,12 +117,7 @@ def test_process_sms_client_response_updates_notification_status_when_detailed_s
|
|||||||
|
|
||||||
|
|
||||||
def test_sms_response_does_not_send_callback_if_notification_is_not_in_the_db(sample_service, mocker):
|
def test_sms_response_does_not_send_callback_if_notification_is_not_in_the_db(sample_service, mocker):
|
||||||
mocker.patch(
|
send_mock = mocker.patch('app.celery.process_sms_client_response_tasks.check_and_queue_callback_task')
|
||||||
'app.celery.process_sms_client_response_tasks.get_service_delivery_status_callback_api_for_service',
|
|
||||||
return_value='mock-delivery-callback-for-service')
|
|
||||||
send_mock = mocker.patch(
|
|
||||||
'app.celery.service_callback_tasks.send_delivery_status_to_service.apply_async'
|
|
||||||
)
|
|
||||||
reference = str(uuid.uuid4())
|
reference = str(uuid.uuid4())
|
||||||
process_sms_client_response(status='3', provider_reference=reference, client_name='MMG')
|
process_sms_client_response(status='3', provider_reference=reference, client_name='MMG')
|
||||||
send_mock.assert_not_called()
|
send_mock.assert_not_called()
|
||||||
@@ -156,26 +147,17 @@ def test_process_sms_updates_billable_units_if_zero(sample_notification):
|
|||||||
|
|
||||||
|
|
||||||
def test_process_sms_response_does_not_send_service_callback_for_pending_notifications(sample_notification, mocker):
|
def test_process_sms_response_does_not_send_service_callback_for_pending_notifications(sample_notification, mocker):
|
||||||
mocker.patch(
|
send_mock = mocker.patch('app.celery.process_sms_client_response_tasks.check_and_queue_callback_task')
|
||||||
'app.celery.process_sms_client_response_tasks.get_service_delivery_status_callback_api_for_service',
|
|
||||||
return_value='fake-callback')
|
|
||||||
send_mock = mocker.patch('app.celery.service_callback_tasks.send_delivery_status_to_service.apply_async')
|
|
||||||
process_sms_client_response('2', str(sample_notification.id), 'Firetext')
|
process_sms_client_response('2', str(sample_notification.id), 'Firetext')
|
||||||
send_mock.assert_not_called()
|
send_mock.assert_not_called()
|
||||||
|
|
||||||
|
|
||||||
def test_outcome_statistics_called_for_successful_callback(sample_notification, mocker):
|
def test_outcome_statistics_called_for_successful_callback(sample_notification, mocker):
|
||||||
send_mock = mocker.patch(
|
send_mock = mocker.patch('app.celery.process_sms_client_response_tasks.check_and_queue_callback_task')
|
||||||
'app.celery.service_callback_tasks.send_delivery_status_to_service.apply_async'
|
|
||||||
)
|
|
||||||
callback_api = create_service_callback_api(service=sample_notification.service, url="https://original_url.com")
|
|
||||||
reference = str(sample_notification.id)
|
reference = str(sample_notification.id)
|
||||||
|
|
||||||
process_sms_client_response('3', reference, 'MMG')
|
process_sms_client_response('3', reference, 'MMG')
|
||||||
|
send_mock.assert_called_once_with(sample_notification)
|
||||||
encrypted_data = create_delivery_status_callback_data(sample_notification, callback_api)
|
|
||||||
send_mock.assert_called_once_with([reference, encrypted_data],
|
|
||||||
queue="service-callbacks")
|
|
||||||
|
|
||||||
|
|
||||||
def test_process_sms_updates_sent_by_with_client_name_if_not_in_noti(sample_notification):
|
def test_process_sms_updates_sent_by_with_client_name_if_not_in_noti(sample_notification):
|
||||||
|
|||||||
@@ -1,9 +1,8 @@
|
|||||||
import uuid
|
import uuid
|
||||||
|
|
||||||
from app.commands import local_dev_broadcast_permissions, replay_callbacks
|
from app.commands import local_dev_broadcast_permissions, replay_callbacks
|
||||||
from app.config import QueueNames
|
|
||||||
from app.dao.services_dao import dao_add_user_to_service
|
from app.dao.services_dao import dao_add_user_to_service
|
||||||
from tests.app.db import create_service_callback_api, create_user
|
from tests.app.db import create_user
|
||||||
|
|
||||||
|
|
||||||
def test_local_dev_broadcast_permissions(
|
def test_local_dev_broadcast_permissions(
|
||||||
@@ -33,10 +32,7 @@ def test_replay_callbacks(
|
|||||||
tmpdir,
|
tmpdir,
|
||||||
notify_api,
|
notify_api,
|
||||||
):
|
):
|
||||||
mock_apply = mocker.patch('app.commands.send_delivery_status_to_service.apply_async')
|
mock_task = mocker.patch('app.commands.check_and_queue_callback_task')
|
||||||
mock_update = mocker.patch('app.commands.create_delivery_status_callback_data')
|
|
||||||
mock_update.return_value = 'encrypted_status_update'
|
|
||||||
|
|
||||||
file_path = tmpdir + 'callback_ids.txt'
|
file_path = tmpdir + 'callback_ids.txt'
|
||||||
missing_notification_id = uuid.uuid4()
|
missing_notification_id = uuid.uuid4()
|
||||||
|
|
||||||
@@ -48,20 +44,6 @@ def test_replay_callbacks(
|
|||||||
replay_callbacks, ['-f', file_path]
|
replay_callbacks, ['-f', file_path]
|
||||||
)
|
)
|
||||||
|
|
||||||
mock_apply.assert_not_called()
|
|
||||||
assert f'{missing_notification_id} was not found' in result.output
|
assert f'{missing_notification_id} was not found' in result.output
|
||||||
assert "Callback api was not found" in result.output
|
mock_task.assert_called_once_with(sample_notification)
|
||||||
|
|
||||||
# Now re-run with the callback API in place
|
|
||||||
create_service_callback_api(service=sample_service, bearer_token='foo')
|
|
||||||
|
|
||||||
result = notify_api.test_cli_runner().invoke(
|
|
||||||
replay_callbacks, ['-f', file_path]
|
|
||||||
)
|
|
||||||
|
|
||||||
mock_apply.assert_called_once_with(
|
|
||||||
[str(sample_notification.id), 'encrypted_status_update'],
|
|
||||||
queue=QueueNames.CALLBACKS
|
|
||||||
)
|
|
||||||
|
|
||||||
assert result.exit_code == 0
|
assert result.exit_code == 0
|
||||||
|
|||||||
Reference in New Issue
Block a user