Replace celery[sqs] with celery[redis]

This commit is contained in:
Ryan Ahearn
2022-09-28 16:27:37 -04:00
parent fd6007656f
commit e3ad01119d
13 changed files with 13 additions and 21 deletions

View File

@@ -9,7 +9,6 @@ env:
DEBUG: True
ANTIVIRUS_ENABLED: 0
NOTIFY_ENVIRONMENT: test
NOTIFICATION_QUEUE_PREFIX: local_dev_10x
STATSD_HOST: localhost
SES_STUB_URL: None
NOTIFY_APP_NAME: api

View File

@@ -13,7 +13,6 @@ env:
DEBUG: True
ANTIVIRUS_ENABLED: 0
NOTIFY_ENVIRONMENT: test
NOTIFICATION_QUEUE_PREFIX: local_dev_10x
STATSD_HOST: localhost
SES_STUB_URL: None
NOTIFY_APP_NAME: api

View File

@@ -49,7 +49,6 @@ NOTE: when you change .env in the future, you'll need to rebuild the devcontaine
Things to change:
- If you're not the first to deploy, only replace the aws creds, get these from team lead
- Replace `NOTIFICATION_QUEUE_PREFIX` with `local_dev_<your org>_`
- Replace `NOTIFY_EMAIL_DOMAIN` with the domain your emails will come from (i.e. the "origination email" in your SES project)
- Replace `SECRET_KEY` and `DANGEROUS_SALT` with high-entropy secret values
- Set up AWS SES and SNS as indicated in next section (AWS Setup), fill in missing AWS env vars

View File

@@ -449,6 +449,7 @@ def handle_exception(task, notification, notification_id, exc):
# Sometimes, SQS plays the same message twice. We should be able to catch an IntegrityError, but it seems
# SQLAlchemy is throwing a FlushError. So we check if the notification id already exists then do not
# send to the retry queue.
# This probably (hopefully) is not an issue with Redis as the celery backing store
current_app.logger.exception('Retry' + retry_msg)
try:
task.retry(queue=QueueNames.RETRY, exc=exc)

View File

@@ -114,9 +114,6 @@ class Config(object):
FIRETEXT_API_KEY = os.environ.get("FIRETEXT_API_KEY", "placeholder")
FIRETEXT_INTERNATIONAL_API_KEY = os.environ.get("FIRETEXT_INTERNATIONAL_API_KEY", "placeholder")
# Prefix to identify queues in SQS
NOTIFICATION_QUEUE_PREFIX = os.environ.get('NOTIFICATION_QUEUE_PREFIX')
# Use notify.sandbox.10x sending domain unless overwritten by environment
NOTIFY_EMAIL_DOMAIN = 'notify.sandbox.10x.gsa.gov'
@@ -200,11 +197,9 @@ class Config(object):
DVLA_EMAIL_ADDRESSES = json.loads(os.environ.get('DVLA_EMAIL_ADDRESSES', '[]'))
CELERY = {
'broker_url': 'sqs://',
'broker_url': REDIS_URL,
'broker_transport_options': {
'region': AWS_REGION,
'visibility_timeout': 310,
'queue_name_prefix': NOTIFICATION_QUEUE_PREFIX,
},
'timezone': 'Europe/London',
'imports': [
@@ -447,7 +442,6 @@ class Development(Config):
NOTIFY_EMAIL_DOMAIN = os.getenv('NOTIFY_EMAIL_DOMAIN', 'notify.sandbox.10x.gsa.gov')
SQLALCHEMY_DATABASE_URI = os.environ.get('SQLALCHEMY_DATABASE_URI', 'postgresql://postgres:chummy@db:5432/notification_api')
REDIS_URL = os.environ.get('REDIS_URL')
ANTIVIRUS_ENABLED = os.environ.get('ANTIVIRUS_ENABLED') == '1'
@@ -572,7 +566,6 @@ class Live(Config):
REDIS_ENABLED = os.environ.get('REDIS_ENABLED')
NOTIFY_LOG_PATH = os.environ.get('NOTIFY_LOG_PATH', 'application.log')
REDIS_URL = os.environ.get('REDIS_URL')
class CloudFoundryConfig(Config):

View File

@@ -10,6 +10,7 @@ letter_job = Blueprint("letter-job", __name__)
register_errors(letter_job)
# too many references will make SQS error (as the task can only be 256kb)
# Maybe doesn't matter anymore with Redis as the celery backing store
MAX_REFERENCES_PER_TASK = 5000

View File

@@ -236,6 +236,7 @@ def process_sms_or_email_notification(
# If SQS cannot put the task on the queue, it's probably because the notification body was too long and it
# went over SQS's 256kb message limit. If the body is very large, it may exceed the HTTP max content length;
# the exception we get here isn't handled correctly by botocore - we get a ResponseParserError instead.
# Hopefully this is no longer an issue with Redis as celery's backing store
current_app.logger.info(
f'Notification {notification_id} failed to save to high volume queue. Using normal flow instead'
)

View File

@@ -27,7 +27,6 @@ applications:
NOTIFY_ENVIRONMENT: live
API_HOST_NAME: https://notifications-api.app.cloud.gov
ADMIN_BASE_URL: https://notifications-admin.app.cloud.gov
NOTIFICATION_QUEUE_PREFIX: prototype_10x
STATSD_HOST: localhost
# Credentials variables

View File

@@ -4,7 +4,6 @@ env =
NOTIFY_ENVIRONMENT=test
MMG_API_KEY=mmg-secret-key
FIRETEXT_API_KEY=Firetext
NOTIFICATION_QUEUE_PREFIX=testing
REDIS_ENABLED=0
addopts = -p no:warnings
xfail_strict = true

View File

@@ -2,7 +2,7 @@
# with package version changes made in requirements.in
cffi==1.15.0
celery[sqs]==5.2.6
celery[redis]==5.2.7
Flask-Bcrypt==1.0.1
flask-marshmallow==0.14.0
Flask-Migrate==3.1.0

View File

@@ -39,7 +39,7 @@ cachetools==5.1.0
# via
# -r requirements.in
# notifications-utils
celery[sqs]==5.2.6
celery[redis]==5.2.7
# via -r requirements.in
certifi==2022.5.18.1
# via
@@ -219,7 +219,9 @@ pyyaml==5.4.1
# awscli
# notifications-utils
redis==4.3.1
# via flask-redis
# via
# celery
# flask-redis
requests==2.27.1
# via
# awscli-cwlogs

View File

@@ -4,7 +4,6 @@
DEBUG=True
ANTIVIRUS_ENABLED=0
NOTIFY_ENVIRONMENT=development
NOTIFICATION_QUEUE_PREFIX=local_dev_YOURNAME_
STATSD_HOST=localhost
SES_STUB_URL=None
NOTIFY_APP_NAME=api

View File

@@ -683,7 +683,7 @@ def test_should_persist_notification(
(SMS_TYPE, 'send-sms-tasks'),
(EMAIL_TYPE, 'send-email-tasks')
])
def test_should_delete_notification_and_return_error_if_sqs_fails(
def test_should_delete_notification_and_return_error_if_redis_fails(
client,
sample_email_template,
sample_template,
@@ -694,7 +694,7 @@ def test_should_delete_notification_and_return_error_if_sqs_fails(
):
mocked = mocker.patch(
'app.celery.provider_tasks.deliver_{}.apply_async'.format(template_type),
side_effect=Exception("failed to talk to SQS")
side_effect=Exception("failed to talk to redis")
)
mocker.patch('app.notifications.process_notifications.uuid.uuid4', return_value=fake_uuid)
@@ -719,7 +719,7 @@ def test_should_delete_notification_and_return_error_if_sqs_fails(
data=json.dumps(data),
headers=[('Content-Type', 'application/json'), ('Authorization', 'Bearer {}'.format(auth_header))]
)
assert str(e.value) == 'failed to talk to SQS'
assert str(e.value) == 'failed to talk to redis'
mocked.assert_called_once_with([fake_uuid], queue=queue_name)
assert not notifications_dao.get_notification_by_id(fake_uuid)