diff --git a/app/celery/celery.py b/app/celery/celery.py index 16c50da23..98ca6812f 100644 --- a/app/celery/celery.py +++ b/app/celery/celery.py @@ -22,6 +22,7 @@ def make_task(app): class NotifyTask(Task): abstract = True start = None + typing = False def on_success(self, retval, task_id, args, kwargs): elapsed_time = time.monotonic() - self.start @@ -81,11 +82,11 @@ class NotifyCelery(Celery): def init_app(self, app): super().__init__( app.import_name, - broker=app.config['BROKER_URL'], + broker=app.config['CELERY']['broker_url'], task_cls=make_task(app), ) - self.conf.update(app.config) + self.conf.update(app.config['CELERY']) self._app = app def send_task(self, name, args=None, kwargs=None, **other_kwargs): diff --git a/app/celery/letters_pdf_tasks.py b/app/celery/letters_pdf_tasks.py index 3d4b335aa..8616c9fe5 100644 --- a/app/celery/letters_pdf_tasks.py +++ b/app/celery/letters_pdf_tasks.py @@ -85,12 +85,12 @@ def get_pdf_for_templated_letter(self, notification_id): args=(encrypted_data,), queue=QueueNames.SANITISE_LETTERS ) - except Exception: + except Exception as e: try: current_app.logger.exception( f"RETRY: calling create-letter-pdf task for notification {notification_id} failed" ) - self.retry(queue=QueueNames.RETRY) + self.retry(exc=e, queue=QueueNames.RETRY) except self.MaxRetriesExceededError: message = f"RETRY FAILED: Max retries reached. " \ f"The task create-letter-pdf failed for notification id {notification_id}. " \ diff --git a/app/config.py b/app/config.py index 0d29a02d1..fa62f2b98 100644 --- a/app/config.py +++ b/app/config.py @@ -187,156 +187,160 @@ class Config(object): # we only need real email in Live environment (production) DVLA_EMAIL_ADDRESSES = json.loads(os.environ.get('DVLA_EMAIL_ADDRESSES', '[]')) - BROKER_URL = 'sqs://' - BROKER_TRANSPORT_OPTIONS = { - 'region': AWS_REGION, - 'polling_interval': 1, # 1 second - 'visibility_timeout': 310, - 'queue_name_prefix': NOTIFICATION_QUEUE_PREFIX + CELERY = { + 'broker_url': 'sqs://', + 'broker_transport_options': { + 'region': AWS_REGION, + 'visibility_timeout': 310, + 'queue_name_prefix': NOTIFICATION_QUEUE_PREFIX, + }, + 'timezone': 'Europe/London', + 'imports': [ + 'app.celery.tasks', + 'app.celery.scheduled_tasks', + 'app.celery.reporting_tasks', + 'app.celery.nightly_tasks', + ], + # this is overriden by the -Q command, but locally, we should read from all queues + 'task_queues': [ + Queue(queue, Exchange('default'), routing_key=queue) for queue in QueueNames.all_queues() + ], + 'beat_schedule': { + # app/celery/scheduled_tasks.py + 'run-scheduled-jobs': { + 'task': 'run-scheduled-jobs', + 'schedule': crontab(minute='0,15,30,45'), + 'options': {'queue': QueueNames.PERIODIC} + }, + 'delete-verify-codes': { + 'task': 'delete-verify-codes', + 'schedule': timedelta(minutes=63), + 'options': {'queue': QueueNames.PERIODIC} + }, + 'delete-invitations': { + 'task': 'delete-invitations', + 'schedule': timedelta(minutes=66), + 'options': {'queue': QueueNames.PERIODIC} + }, + 'switch-current-sms-provider-on-slow-delivery': { + 'task': 'switch-current-sms-provider-on-slow-delivery', + 'schedule': crontab(), # Every minute + 'options': {'queue': QueueNames.PERIODIC} + }, + 'check-job-status': { + 'task': 'check-job-status', + 'schedule': crontab(), + 'options': {'queue': QueueNames.PERIODIC} + }, + 'tend-providers-back-to-middle': { + 'task': 'tend-providers-back-to-middle', + 'schedule': crontab(minute='*/5'), + 'options': {'queue': QueueNames.PERIODIC} + }, + 'check-for-missing-rows-in-completed-jobs': { + 'task': 'check-for-missing-rows-in-completed-jobs', + 'schedule': crontab(minute='*/10'), + 'options': {'queue': QueueNames.PERIODIC} + }, + 'replay-created-notifications': { + 'task': 'replay-created-notifications', + 'schedule': crontab(minute='0, 15, 30, 45'), + 'options': {'queue': QueueNames.PERIODIC} + }, + # app/celery/nightly_tasks.py + 'timeout-sending-notifications': { + 'task': 'timeout-sending-notifications', + 'schedule': crontab(hour=0, minute=5), + 'options': {'queue': QueueNames.PERIODIC} + }, + 'create-nightly-billing': { + 'task': 'create-nightly-billing', + 'schedule': crontab(hour=0, minute=15), + 'options': {'queue': QueueNames.REPORTING} + }, + 'create-nightly-notification-status': { + 'task': 'create-nightly-notification-status', + 'schedule': crontab(hour=0, minute=30), # after 'timeout-sending-notifications' + 'options': {'queue': QueueNames.REPORTING} + }, + 'delete-notifications-older-than-retention': { + 'task': 'delete-notifications-older-than-retention', + 'schedule': crontab(hour=3, minute=0), # after 'create-nightly-notification-status' + 'options': {'queue': QueueNames.PERIODIC} + }, + 'delete-inbound-sms': { + 'task': 'delete-inbound-sms', + 'schedule': crontab(hour=1, minute=40), + 'options': {'queue': QueueNames.PERIODIC} + }, + 'save-daily-notification-processing-time': { + 'task': 'save-daily-notification-processing-time', + 'schedule': crontab(hour=2, minute=0), + 'options': {'queue': QueueNames.PERIODIC} + }, + 'remove_sms_email_jobs': { + 'task': 'remove_sms_email_jobs', + 'schedule': crontab(hour=4, minute=0), + 'options': {'queue': QueueNames.PERIODIC}, + }, + 'remove_letter_jobs': { + 'task': 'remove_letter_jobs', + 'schedule': crontab(hour=4, minute=20), + # since we mark jobs as archived + 'options': {'queue': QueueNames.PERIODIC}, + }, + 'check-if-letters-still-in-created': { + 'task': 'check-if-letters-still-in-created', + 'schedule': crontab(day_of_week='mon-fri', hour=7, minute=0), + 'options': {'queue': QueueNames.PERIODIC} + }, + 'check-if-letters-still-pending-virus-check': { + 'task': 'check-if-letters-still-pending-virus-check', + 'schedule': crontab(day_of_week='mon-fri', hour='9,15', minute=0), + 'options': {'queue': QueueNames.PERIODIC} + }, + 'check-for-services-with-high-failure-rates-or-sending-to-tv-numbers': { + 'task': 'check-for-services-with-high-failure-rates-or-sending-to-tv-numbers', + 'schedule': crontab(day_of_week='mon-fri', hour=10, minute=30), + 'options': {'queue': QueueNames.PERIODIC} + }, + 'raise-alert-if-letter-notifications-still-sending': { + 'task': 'raise-alert-if-letter-notifications-still-sending', + 'schedule': crontab(hour=15, minute=30), + 'options': {'queue': QueueNames.PERIODIC} + }, + # The collate-letter-pdf does assume it is called in an hour that BST does not make a + # difference to the truncate date which translates to the filename to process + 'collate-letter-pdfs-to-be-sent': { + 'task': 'collate-letter-pdfs-to-be-sent', + 'schedule': crontab(hour=17, minute=50), + 'options': {'queue': QueueNames.PERIODIC} + }, + 'raise-alert-if-no-letter-ack-file': { + 'task': 'raise-alert-if-no-letter-ack-file', + 'schedule': crontab(hour=23, minute=00), + 'options': {'queue': QueueNames.PERIODIC} + }, + 'trigger-link-tests': { + 'task': 'trigger-link-tests', + 'schedule': timedelta(minutes=15), + 'options': {'queue': QueueNames.PERIODIC} + }, + 'auto-expire-broadcast-messages': { + 'task': 'auto-expire-broadcast-messages', + 'schedule': timedelta(minutes=5), + 'options': {'queue': QueueNames.PERIODIC} + }, + } } - CELERY_ENABLE_UTC = True - CELERY_TIMEZONE = 'Europe/London' - CELERY_ACCEPT_CONTENT = ['json'] - CELERY_TASK_SERIALIZER = 'json' - # on reporting worker, restart workers after each task is executed to help prevent memory leaks - CELERYD_MAX_TASKS_PER_CHILD = os.getenv('CELERYD_MAX_TASKS_PER_CHILD') + # we can set celeryd_prefetch_multiplier to be 1 for celery apps which handle only long running tasks if os.getenv('CELERYD_PREFETCH_MULTIPLIER'): - CELERYD_PREFETCH_MULTIPLIER = os.getenv('CELERYD_PREFETCH_MULTIPLIER') - CELERY_IMPORTS = ( - 'app.celery.tasks', - 'app.celery.scheduled_tasks', - 'app.celery.reporting_tasks', - 'app.celery.nightly_tasks', - ) - CELERYBEAT_SCHEDULE = { - # app/celery/scheduled_tasks.py - 'run-scheduled-jobs': { - 'task': 'run-scheduled-jobs', - 'schedule': crontab(minute='0,15,30,45'), - 'options': {'queue': QueueNames.PERIODIC} - }, - 'delete-verify-codes': { - 'task': 'delete-verify-codes', - 'schedule': timedelta(minutes=63), - 'options': {'queue': QueueNames.PERIODIC} - }, - 'delete-invitations': { - 'task': 'delete-invitations', - 'schedule': timedelta(minutes=66), - 'options': {'queue': QueueNames.PERIODIC} - }, - 'switch-current-sms-provider-on-slow-delivery': { - 'task': 'switch-current-sms-provider-on-slow-delivery', - 'schedule': crontab(), # Every minute - 'options': {'queue': QueueNames.PERIODIC} - }, - 'check-job-status': { - 'task': 'check-job-status', - 'schedule': crontab(), - 'options': {'queue': QueueNames.PERIODIC} - }, - 'tend-providers-back-to-middle': { - 'task': 'tend-providers-back-to-middle', - 'schedule': crontab(minute='*/5'), - 'options': {'queue': QueueNames.PERIODIC} - }, - 'check-for-missing-rows-in-completed-jobs': { - 'task': 'check-for-missing-rows-in-completed-jobs', - 'schedule': crontab(minute='*/10'), - 'options': {'queue': QueueNames.PERIODIC} - }, - 'replay-created-notifications': { - 'task': 'replay-created-notifications', - 'schedule': crontab(minute='0, 15, 30, 45'), - 'options': {'queue': QueueNames.PERIODIC} - }, - # app/celery/nightly_tasks.py - 'timeout-sending-notifications': { - 'task': 'timeout-sending-notifications', - 'schedule': crontab(hour=0, minute=5), - 'options': {'queue': QueueNames.PERIODIC} - }, - 'create-nightly-billing': { - 'task': 'create-nightly-billing', - 'schedule': crontab(hour=0, minute=15), - 'options': {'queue': QueueNames.REPORTING} - }, - 'create-nightly-notification-status': { - 'task': 'create-nightly-notification-status', - 'schedule': crontab(hour=0, minute=30), # after 'timeout-sending-notifications' - 'options': {'queue': QueueNames.REPORTING} - }, - 'delete-notifications-older-than-retention': { - 'task': 'delete-notifications-older-than-retention', - 'schedule': crontab(hour=3, minute=0), # after 'create-nightly-notification-status' - 'options': {'queue': QueueNames.PERIODIC} - }, - 'delete-inbound-sms': { - 'task': 'delete-inbound-sms', - 'schedule': crontab(hour=1, minute=40), - 'options': {'queue': QueueNames.PERIODIC} - }, - 'save-daily-notification-processing-time': { - 'task': 'save-daily-notification-processing-time', - 'schedule': crontab(hour=2, minute=0), - 'options': {'queue': QueueNames.PERIODIC} - }, - 'remove_sms_email_jobs': { - 'task': 'remove_sms_email_jobs', - 'schedule': crontab(hour=4, minute=0), - 'options': {'queue': QueueNames.PERIODIC}, - }, - 'remove_letter_jobs': { - 'task': 'remove_letter_jobs', - 'schedule': crontab(hour=4, minute=20), - # since we mark jobs as archived - 'options': {'queue': QueueNames.PERIODIC}, - }, - 'check-if-letters-still-in-created': { - 'task': 'check-if-letters-still-in-created', - 'schedule': crontab(day_of_week='mon-fri', hour=7, minute=0), - 'options': {'queue': QueueNames.PERIODIC} - }, - 'check-if-letters-still-pending-virus-check': { - 'task': 'check-if-letters-still-pending-virus-check', - 'schedule': crontab(day_of_week='mon-fri', hour='9,15', minute=0), - 'options': {'queue': QueueNames.PERIODIC} - }, - 'check-for-services-with-high-failure-rates-or-sending-to-tv-numbers': { - 'task': 'check-for-services-with-high-failure-rates-or-sending-to-tv-numbers', - 'schedule': crontab(day_of_week='mon-fri', hour=10, minute=30), - 'options': {'queue': QueueNames.PERIODIC} - }, - 'raise-alert-if-letter-notifications-still-sending': { - 'task': 'raise-alert-if-letter-notifications-still-sending', - 'schedule': crontab(hour=15, minute=30), - 'options': {'queue': QueueNames.PERIODIC} - }, - # The collate-letter-pdf does assume it is called in an hour that BST does not make a - # difference to the truncate date which translates to the filename to process - 'collate-letter-pdfs-to-be-sent': { - 'task': 'collate-letter-pdfs-to-be-sent', - 'schedule': crontab(hour=17, minute=50), - 'options': {'queue': QueueNames.PERIODIC} - }, - 'raise-alert-if-no-letter-ack-file': { - 'task': 'raise-alert-if-no-letter-ack-file', - 'schedule': crontab(hour=23, minute=00), - 'options': {'queue': QueueNames.PERIODIC} - }, - 'trigger-link-tests': { - 'task': 'trigger-link-tests', - 'schedule': timedelta(minutes=15), - 'options': {'queue': QueueNames.PERIODIC} - }, - 'auto-expire-broadcast-messages': { - 'task': 'auto-expire-broadcast-messages', - 'schedule': timedelta(minutes=5), - 'options': {'queue': QueueNames.PERIODIC} - }, - } - CELERY_QUEUES = [] + CELERY['worker_prefetch_multiplier'] = os.getenv('CELERYD_PREFETCH_MULTIPLIER') + + # on reporting worker, restart workers after each task is executed to help prevent memory leaks + if os.getenv('CELERYD_MAX_TASKS_PER_CHILD'): + CELERY['worker_max_tasks_per_child'] = int(os.getenv('CELERYD_MAX_TASKS_PER_CHILD')) FROM_NUMBER = 'development' @@ -419,7 +423,6 @@ class Development(Config): NOTIFY_ENVIRONMENT = 'development' NOTIFY_LOG_PATH = 'application.log' - NOTIFICATION_QUEUE_PREFIX = 'development' NOTIFY_EMAIL_DOMAIN = "notify.tools" SQLALCHEMY_DATABASE_URI = 'postgresql://localhost/notification_api' @@ -427,11 +430,6 @@ class Development(Config): ANTIVIRUS_ENABLED = os.getenv('ANTIVIRUS_ENABLED') == '1' - for queue in QueueNames.all_queues(): - Config.CELERY_QUEUES.append( - Queue(queue, Exchange('default'), routing_key=queue) - ) - API_HOST_NAME = "http://localhost:6011" API_RATE_LIMIT_ENABLED = True DVLA_EMAIL_ADDRESSES = ['success@simulator.amazonses.com'] @@ -465,15 +463,13 @@ class Test(Development): # this is overriden in jenkins and on cloudfoundry SQLALCHEMY_DATABASE_URI = os.getenv('SQLALCHEMY_DATABASE_URI', 'postgresql://localhost/test_notification_api') - BROKER_URL = 'you-forgot-to-mock-celery-in-your-tests://' + CELERY = { + **Config.CELERY, + 'broker_url': 'you-forgot-to-mock-celery-in-your-tests://' + } ANTIVIRUS_ENABLED = True - for queue in QueueNames.all_queues(): - Config.CELERY_QUEUES.append( - Queue(queue, Exchange('default'), routing_key=queue) - ) - API_RATE_LIMIT_ENABLED = True API_HOST_NAME = "http://localhost:6011" diff --git a/app/v2/notifications/post_notifications.py b/app/v2/notifications/post_notifications.py index 40cee1678..01cf21a56 100644 --- a/app/v2/notifications/post_notifications.py +++ b/app/v2/notifications/post_notifications.py @@ -3,7 +3,7 @@ import functools import uuid from datetime import datetime -from boto.exception import SQSError +import botocore from flask import abort, current_app, jsonify, request from gds_metrics import Histogram from notifications_utils.recipients import try_validate_and_format_phone_number @@ -232,7 +232,7 @@ def process_sms_or_email_notification( reply_to_text=reply_to_text ) return resp - except SQSError: + except botocore.exceptions.ClientError: # if SQS cannot put the task on the queue, it's probably because the notification body was too long and it # went over SQS's 256kb message limit. If so, we current_app.logger.info( diff --git a/requirements-app.txt b/requirements-app.txt index 32efa05d5..45e5a6197 100644 --- a/requirements-app.txt +++ b/requirements-app.txt @@ -2,7 +2,7 @@ # with package version changes made in requirements-app.txt cffi==1.14.5 -celery[sqs]==3.1.26.post2 # pyup: <4 +celery[sqs]==5.1.2 docopt==0.6.2 Flask-Bcrypt==0.7.1 flask-marshmallow==0.14.0 diff --git a/requirements.txt b/requirements.txt index 7514b7f7e..e14f0d527 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,7 +4,7 @@ # with package version changes made in requirements-app.txt cffi==1.14.5 -celery[sqs]==3.1.26.post2 # pyup: <4 +celery[sqs]==5.1.2 docopt==0.6.2 Flask-Bcrypt==0.7.1 flask-marshmallow==0.14.0 @@ -46,20 +46,22 @@ gds-metrics==0.2.4 ## The following requirements were added by pip freeze: alembic==1.7.4 -amqp==1.4.9 -anyjson==0.3.3 +amqp==5.0.6 attrs==21.2.0 awscli==1.21.4 bcrypt==3.2.0 -billiard==3.3.0.23 +billiard==3.6.4.0 bleach==4.1.0 blinker==1.4 -boto==2.49.0 boto3==1.19.4 botocore==1.22.4 +cached-property==1.5.2 certifi==2021.10.8 charset-normalizer==2.0.7 -click==8.0.3 +click==7.1.2 +click-didyoumean==0.3.0 +click-plugins==1.1.1 +click-repl==0.2.0 colorama==0.4.3 dataclasses==0.8 dnspython==1.16.0 @@ -73,15 +75,17 @@ importlib-metadata==4.8.1 importlib-resources==5.3.0 Jinja2==3.0.2 jmespath==0.10.0 -kombu==3.0.37 +kombu==5.1.0 Mako==1.1.5 MarkupSafe==2.0.1 mistune==0.8.4 orderedset==2.0.3 packaging==21.0 phonenumbers==8.12.36 +prompt-toolkit==3.0.21 pyasn1==0.4.8 pycparser==2.20 +pycurl==7.43.0.5 pyparsing==3.0.1 PyPDF2==1.26.0 pyrsistent==0.18.0 @@ -100,5 +104,7 @@ soupsieve==2.2.1 statsd==3.3.0 typing-extensions==3.10.0.2 urllib3==1.26.7 +vine==5.0.0 +wcwidth==0.2.5 webencodings==0.5.1 zipp==3.6.0 diff --git a/scripts/paas_app_wrapper.sh b/scripts/paas_app_wrapper.sh index 7bb89d759..ce192555c 100755 --- a/scripts/paas_app_wrapper.sh +++ b/scripts/paas_app_wrapper.sh @@ -5,15 +5,15 @@ case $NOTIFY_APP_NAME in exec scripts/run_app_paas.sh gunicorn -c /home/vcap/app/gunicorn_config.py application ;; delivery-worker-retry-tasks) - exec scripts/run_app_paas.sh celery -A run_celery.notify_celery worker --loglevel=INFO --concurrency=11 \ + exec scripts/run_app_paas.sh celery -A run_celery.notify_celery worker --loglevel=INFO --concurrency=4 \ -Q retry-tasks 2> /dev/null ;; delivery-worker-letters) - exec scripts/run_app_paas.sh celery -A run_celery.notify_celery worker --loglevel=INFO --concurrency=11 \ + exec scripts/run_app_paas.sh celery -A run_celery.notify_celery worker --loglevel=INFO --concurrency=4 \ -Q create-letters-pdf-tasks,letter-tasks 2> /dev/null ;; delivery-worker-jobs) - exec scripts/run_app_paas.sh celery -A run_celery.notify_celery worker --loglevel=INFO --concurrency=11 \ + exec scripts/run_app_paas.sh celery -A run_celery.notify_celery worker --loglevel=INFO --concurrency=4 \ -Q database-tasks,job-tasks 2> /dev/null ;; delivery-worker-research) @@ -21,7 +21,7 @@ case $NOTIFY_APP_NAME in -Q research-mode-tasks 2> /dev/null ;; delivery-worker-sender) - exec scripts/run_multi_worker_app_paas.sh celery multi start 3 -c 10 -A run_celery.notify_celery --loglevel=INFO \ + exec scripts/run_multi_worker_app_paas.sh celery multi start 3 -c 4 -A run_celery.notify_celery --loglevel=INFO \ --logfile=/dev/null --pidfile=/tmp/celery%N.pid -Q send-sms-tasks,send-email-tasks ;; delivery-worker-periodic) @@ -33,12 +33,12 @@ case $NOTIFY_APP_NAME in -Q reporting-tasks 2> /dev/null ;; delivery-worker-priority) - exec scripts/run_app_paas.sh celery -A run_celery.notify_celery worker --loglevel=INFO --concurrency=5 \ + exec scripts/run_app_paas.sh celery -A run_celery.notify_celery worker --loglevel=INFO --concurrency=4 \ -Q priority-tasks 2> /dev/null ;; # Only consume the notify-internal-tasks queue on this app so that Notify messages are processed as a priority delivery-worker-internal) - exec scripts/run_app_paas.sh celery -A run_celery.notify_celery worker --loglevel=INFO --concurrency=11 \ + exec scripts/run_app_paas.sh celery -A run_celery.notify_celery worker --loglevel=INFO --concurrency=4 \ -Q notify-internal-tasks 2> /dev/null ;; delivery-worker-broadcasts) @@ -46,15 +46,15 @@ case $NOTIFY_APP_NAME in -Q broadcast-tasks 2> /dev/null ;; delivery-worker-receipts) - exec scripts/run_app_paas.sh celery -A run_celery.notify_celery worker --loglevel=INFO --concurrency=11 \ + exec scripts/run_app_paas.sh celery -A run_celery.notify_celery worker --loglevel=INFO --concurrency=4 \ -Q ses-callbacks,sms-callbacks 2> /dev/null ;; delivery-worker-service-callbacks) - exec scripts/run_app_paas.sh celery -A run_celery.notify_celery worker --loglevel=INFO --concurrency=11 \ + exec scripts/run_app_paas.sh celery -A run_celery.notify_celery worker --loglevel=INFO --concurrency=4 \ -Q service-callbacks,service-callbacks-retry 2> /dev/null ;; delivery-worker-save-api-notifications) - exec scripts/run_app_paas.sh celery -A run_celery.notify_celery worker --loglevel=INFO --concurrency=11 \ + exec scripts/run_app_paas.sh celery -A run_celery.notify_celery worker --loglevel=INFO --concurrency=4 \ -Q save-api-email-tasks,save-api-sms-tasks 2> /dev/null ;; delivery-celery-beat) diff --git a/tests/app/v2/notifications/test_post_notifications.py b/tests/app/v2/notifications/test_post_notifications.py index dde524dea..7d500d84a 100644 --- a/tests/app/v2/notifications/test_post_notifications.py +++ b/tests/app/v2/notifications/test_post_notifications.py @@ -2,8 +2,8 @@ import uuid from unittest import mock from unittest.mock import call +import botocore import pytest -from boto.exception import SQSError from flask import current_app, json from app.dao import templates_dao @@ -1074,7 +1074,7 @@ def test_post_notifications_saves_email_or_sms_normally_if_saving_to_queue_fails ): save_task = mocker.patch( f"app.celery.tasks.save_api_{notification_type}.apply_async", - side_effect=SQSError({'some': 'json'}, 'some opname') + side_effect=botocore.exceptions.ClientError({'some': 'json'}, 'some opname') ) mock_send_task = mocker.patch(f'app.celery.provider_tasks.deliver_{notification_type}.apply_async')