mirror of
https://github.com/GSA/notifications-api.git
synced 2025-12-22 08:21:13 -05:00
reduce broadcast retry delay to 4 mins and drop prefetch.
### The facts * Celery grabs up to 10 tasks from an SQS queue by default * Each broadcast task takes a couple of seconds to execute, or double that if it has to go to the failover proxy * Broadcast tasks delay retry exponentially, up to 300 seconds. * Tasks are acknowledged when celery starts executing them. * If a task is not acknowledged before its visibility timeout of 310 seconds, sqs assumes the celery app has died, and puts it back on the queue. ### The situation A task stuck in a retry loop was reaching its visbility timeout, and as such SQS was duplicating it. We're unsure of the exact cause of reaching its visibility timeout, but there were two contributing factors: The celery prefetch and the delay of 300 seconds. Essentially, celery grabs the task, keeps an eye on it locally while waiting for the delay ETA to come round, then gives the task to a worker to do. However, that worker might already have up to ten tasks that it's grabbed from SQS. This means the worker only has 10 seconds to get through all those tasks and start working on the delayed task, before SQS moves the task back into available. (Note that the delay of 300 seconds is translated into a timestamp based on the time you called self.retry and put the task back on the queue. Whereas the visibility timeout starts ticking from the time that a celery worker picked up the task.) ### The fix #### Set the max retry delay for broadcast tasks to 240 seconds Setting the max delay to 240 seconds means that instead of a 10 second buffer before the visibility timeout is tripped, we've got a 70 second buffer. #### Set the prefetch limit to 1 for broadcast workers This means that each worker will have up to 1 currently executing task, and 1 task pending execution. If it has these, it won't grab any more off the queue, so they can sit there without their visibility timeout ticking up. Setting a prefetch limit to 1 will result in more queries to SQS and a lower throughput. This might be relevant in, eg, sending emails. But the broadcast worker is not hyper-time critical. https://docs.celeryproject.org/en/3.1/getting-started/brokers/sqs.html?highlight=acknowledge#caveats https://docs.celeryproject.org/en/3.1/userguide/optimizing.html?highlight=prefetch#reserve-one-task-at-a-time
This commit is contained in:
@@ -28,8 +28,8 @@ def get_retry_delay(retry_count):
|
|||||||
|
|
||||||
# 2 to the power of x. 1, 2, 4, 8, 16, 32, ...
|
# 2 to the power of x. 1, 2, 4, 8, 16, 32, ...
|
||||||
delay = 2**retry_count
|
delay = 2**retry_count
|
||||||
# never wait longer than 5 minutes
|
# never wait longer than 4 minutes
|
||||||
return min(delay, 300)
|
return min(delay, 240)
|
||||||
|
|
||||||
|
|
||||||
def check_provider_message_should_send(broadcast_event, provider):
|
def check_provider_message_should_send(broadcast_event, provider):
|
||||||
|
|||||||
@@ -64,7 +64,9 @@
|
|||||||
'notify-delivery-worker-letters': {'memory': '2G'},
|
'notify-delivery-worker-letters': {'memory': '2G'},
|
||||||
'notify-delivery-worker-retry-tasks': {},
|
'notify-delivery-worker-retry-tasks': {},
|
||||||
'notify-delivery-worker-internal': {},
|
'notify-delivery-worker-internal': {},
|
||||||
'notify-delivery-worker-broadcasts': {},
|
'notify-delivery-worker-broadcasts': {
|
||||||
|
'CELERYD_PREFETCH_MULTIPLIER': 1,
|
||||||
|
},
|
||||||
'notify-delivery-worker-receipts': {},
|
'notify-delivery-worker-receipts': {},
|
||||||
'notify-delivery-worker-service-callbacks': {'disk_quota': '2G'},
|
'notify-delivery-worker-service-callbacks': {'disk_quota': '2G'},
|
||||||
'notify-delivery-worker-save-api-notifications': {'disk_quota': '2G'},
|
'notify-delivery-worker-save-api-notifications': {'disk_quota': '2G'},
|
||||||
|
|||||||
@@ -505,7 +505,7 @@ def test_send_broadcast_provider_message_errors(mocker, sample_service, provider
|
|||||||
@pytest.mark.parametrize('num_retries, expected_countdown', [
|
@pytest.mark.parametrize('num_retries, expected_countdown', [
|
||||||
(0, 1),
|
(0, 1),
|
||||||
(5, 32),
|
(5, 32),
|
||||||
(20, 300),
|
(20, 240),
|
||||||
])
|
])
|
||||||
def test_send_broadcast_provider_message_delays_retry_exponentially(
|
def test_send_broadcast_provider_message_delays_retry_exponentially(
|
||||||
mocker,
|
mocker,
|
||||||
@@ -587,10 +587,10 @@ def test_trigger_link_tests_invokes_cbc_proxy_client(
|
|||||||
(0, 1),
|
(0, 1),
|
||||||
(1, 2),
|
(1, 2),
|
||||||
(2, 4),
|
(2, 4),
|
||||||
(8, 256),
|
(7, 128),
|
||||||
(9, 300),
|
(8, 240),
|
||||||
(10, 300),
|
(9, 240),
|
||||||
(1000, 300),
|
(1000, 240),
|
||||||
])
|
])
|
||||||
def test_get_retry_delay_has_capped_backoff(retry_count, expected_delay):
|
def test_get_retry_delay_has_capped_backoff(retry_count, expected_delay):
|
||||||
assert get_retry_delay(retry_count) == expected_delay
|
assert get_retry_delay(retry_count) == expected_delay
|
||||||
|
|||||||
Reference in New Issue
Block a user