-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement backoff for Kafka output #17808
Implement backoff for Kafka output #17808
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks great, thank you! Just a couple nits
libbeat/outputs/kafka/config.go
Outdated
@@ -37,6 +38,11 @@ import ( | |||
"github.com/elastic/beats/v7/libbeat/outputs/codec" | |||
) | |||
|
|||
type backoffConfig struct { | |||
Init time.Duration |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do these need annotations (config:"init"
etc)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think they need them but I think it's a good idea to be explicit. Added in a0c8093f42c7e36f793e1629f6d8b0372496f644.
libbeat/common/backoff/backoff.go
Outdated
// Backoff defines the interface for backoff strategies. | ||
type Backoff interface { | ||
Wait() bool | ||
Reset() | ||
WaitDuration() time.Duration |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this interface workaround for the new setting! Please comment the new function to clarify that Wait
and WaitDuration
should have the same effect on the target's internal state, i.e. that repeatedly calling <-time.After(b.WaitDuration())
is ~equivalent to repeatedly calling b.Wait()
, to be used when the wait needs to be handled manually.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Excellent suggestion! Added in bb766b512220bc4bf0c1a422d6bb311ea186e358.
libbeat/outputs/kafka/config.go
Outdated
d := b.WaitDuration() | ||
log.Infof("backing off for %v (init: %v, max: %v)", d, config.Backoff.Init, config.Backoff.Max) | ||
return d | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is BackoffFunc called 'globally' in sarama, or per broker, or per partition? Is the function guaranteed to be called by one go-routine only?
Is a single 'backoff' instance good enough, or do we actually need multiple separate ones?
This function and reset are called from different go-routines. AFAIK the backoff instance used is not threadsafe. Given that we call this function potentially from multiple workers in sarama, this implementation might not exactly give us exponential backoff. If a subset of workers is active, the Reset will continue to reset our state, while errors in more than one worker can skip
one or two backoff states.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, good points @urso. Thanks!
Let me look into the first set of questions around Sarama's implementation.
As for the thread safety, I suppose we could make our backoff instance thread safe so interleaved calls to WaitDuration
and Reset
are at least operating on a consistent internal state of the backoff instance. But before doing this I think it's useful to know how Sarama is using BackoffFunc
internally, so let me start with that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The signature for BackoffFunc is: BackoffFunc func(retries, maxRetries int) time.Duration
. This indicates that we should have some stateless implementation (sarama tracks state):
e.g.
maxBackoffRetries := int(math.Ceil(math.Log2(float64(backoff.Max) / float64(backoff.Init))))
BackoffFunc: func(retries, _ int) time.Duration {
jitter := ...
if retries >= maxBackoffRetries {
return backoff.Max + jitter
}
return time.Duration(uint64(backoff.Init) * uint64(1 << retries)) + jitter
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is BackoffFunc called 'globally' in sarama, or per broker, or per partition?
It appears to be per partition: https://github.com/Shopify/sarama/blob/58123455d1a70c7f438871597d9a1715462e5d1c/async_producer.go#L497
Is the function guaranteed to be called by one go-routine only?
No, since there could be multiple goroutines in flight for different partitions: https://github.com/Shopify/sarama/blob/58123455d1a70c7f438871597d9a1715462e5d1c/async_producer.go#L489
Given the above, it means the state of the backoff instance would be shared across multiple partition producers, which is not good. Ideally the state would be per partition producer.
Alternatively we could provide a stateless implementation (like the one you suggested). Of course, that means we can't directly reuse the implementations used by other outputs, i.e. EqualJitterBackoff
. But I don't see a way around that. I think we'd just have to live with a stateless implementation where the state (i.e. retries
) is managed externally and passed into the backoff function for the kafka output or a stateful implementation for the other outputs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@urso I implemented a stateless backoff function in 0f7bfae01cd266cf9288d1132c1dc7da0f3ad53b per your suggestion but I deliberately made the calculations in it look more like the ones in the EqualJitterBackoff
Wait()
method.
💔 Build FailedExpand to view the summary
Build stats
Test stats 🧪
Test errorsExpand to view the tests failures
Steps errorsExpand to view the steps failures
Log outputExpand to view the last 100 lines of log output
|
} | ||
|
||
return backoff | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
checking the implementation for EqualJitterBackoff, we always apply jitter, even after hitting max.
maybe try this:
func(retries, _ int) time.Duration {
// compute 'base' duration for exponential backoff
var dur := cfg.Max
if retries < maxBackoffRetries {
dur = time.Duration((uint64(cfg.Init) * uint64(1<<retries)))
}
// apply about equaly distributed jitter in second half of the interval, such that the wait time falls into the interval [dur/2, dur]
limit := int64(dur / 2)
jitter := rand.Int63n(limit + 1)
return time.Duration(limit + jitter)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yes, you're right, 🤦. Will fix!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in 5eb1cbd13cb02ae25978fa3270e6cf36207d382d.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add unit tests for the backoff function :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a unit test but please take a look. It doesn't make very strong assertions about what the backoff values after each call to the backoff function should be. I'm not sure if/how we can make even stronger assertions that the test is making right now.
if !assert.LessOrEqual(t, backoff.Milliseconds(), cfg.Max.Milliseconds()) { | ||
t.Logf("init: %v, max: %v, retries: %v", cfg.Init, cfg.Max, retries) | ||
return false | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The range is much too big. It only checks that we are in between init/2 and max, but not the evolution of the values.
Maybe quick check only based testing is some overkill here, while not capturing all cases. How about creating simple tests with maxRetries=0, maxRetries=1, ...
? Based on this you can prepare an array of the expected exponential backoff values. The limit would be expected/2 <= current <= expected
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @urso. I tried to implement the stronger assertions testing the evolution of the backoff values in a8148fc08. Let me know what you think!
libbeat/outputs/kafka/config_test.go
Outdated
actualBackoff := backoffFn(retries, 50) | ||
|
||
require.GreaterOrEqual(t, float64(actualBackoff), expectedBackoff/2) | ||
require.LessOrEqual(t, float64(actualBackoff), expectedBackoff) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Normally I'd prefer not to use floats, but it seems to be ok. For error reporting, maybe just use t.Fatal like this?
if !(expected / 2 <= actual <= expected) {
t.Fatalf("backoff '%v' not in expected range [%v, %v] (retries: %v)", actual, expected/2, expected, retries)
}
libbeat/testing/seed.go
Outdated
SeedFlag = flag.Int64("seed", 0, "Randomization seed") | ||
) | ||
|
||
func SeedPRNG(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do not use the libbeat/testing
package for unit test helpers. The testing package is used to provide helpers for the <beatname> test ...
CLI command. By importing testing
here all Beats will link in the testing package and will have all -test.X
CLI flags.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like it's not easy to create shared test utilities that can be used across packages. I think the next best thing might be to create an internal package so at least all code within libbeat
can access packages under it but any packages outside libbeat cannot. I did this in 34fb1f9. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Works for me. We have some other test helpers (e.g. check that we don't have any leaky go-routines). Maybe create an issue to 'unify' testing helpers organization.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done: #18239
CI failures are unrelated. Merging. |
* Implement backoff for Kafka output (#17808) * Expose wait duration from backoff strategies * Add backoff section to kafka output config * Use equal jitter backoff strategy with kafka producer * Adding CHANGELOG entry * Adding backoff options to reference config template * Update reference config files * Implementing new Backoff interface * Adding explanation of strategy choice to comment * Implementing new Backoff interface * Adding godoc * Resetting backoff * Updating test * Adding godoc for interface * Adding struct tags for config settings * Implementing stateless backoff function * Undoing changes to backoff strategies * Adding godoc * Fixing backoff duration calculation * WIP: Add unit test * Refactor seed functionality into common package * Fix test * Testing evolution of backoff value over retries * Fixing refactoring of PRNG seeding after rebase * Better failure message from test * Moving test utilities to internal package * Fixing calls * Adding config options to template * Fixing up CHANGELOG * Fixing up CHANGELOG
What does this PR do?
Implements the equal-jitter backoff strategy on the Kafka output.
Note that we cannot simply use
outputs.WithBackoff
on the Kafka output client because the latter'sPublish()
method never returns anerror
. The wrapperPublish()
method implemented by the backoff client depends on the underlying output client returning anerror
so it can block (i.e. backoff for) a certain duration before returning.So instead we use
sarama
'sProducer.Retry.BackoffFunc
while reusing the same backoff API that is used by other libbeat network outputs, viz. Elasticsearch, Logstash, and Redis.Why is it important?
It will allow users to configure backoff options (
backoff.init
andbackoff.max
) for the Kafka output similar to what they can do for the Elasticsearch, Logstash, and Redis outputs.Checklist
I have added tests that prove my fix is effective or that my feature worksAn automated test is quite cumbersome as it requires the orchestration of several processes. So for now I've listed out the manual test steps below.CHANGELOG.next.asciidoc
orCHANGELOG-developer.next.asciidoc
.How to test this PR locally
To test this PR, one needs to cause the partition leader node to go away, which should trigger the backoff. One also needs to subsequently bring the partition leader node back online to ensure that the backoff duration has been reset.
Start a 3-node Kafka cluster. Follow the instructions through step 6 on https://kafka.apache.org/quickstart but skip steps 3, 4, and 5.
Configure
filebeat.yml
(could be any Beat, but Filebeat is easy for testing) like so:Start writing log lines to the test log file.
Start Filebeat.
Check that the
foobar
topic has been created in Kafka.This topic is expected to have only 1 partition, partition
0
. Note this partition's leader node ID.Now kill the Kafka node corresponding to the partition leader's node ID.
Re-run the command in step 5 and verify that that partition
0
has no leader.Check the Filebeat log. You should now see entries like this:
Re-start the node you killed in step 6.
Check the Filebeat log again. The "backing off for" entries should stop appearing in the log.
Repeat steps 6. through 10. Make sure that the backoff duration is reset to a low number (close to the
backoff.init
value) again.Stop all 6 processes (ZK, 3x Kafka nodes, log generator, FB) to end the test.
Related issues