Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement backoff for Kafka output #17808

Merged
merged 27 commits into from
May 5, 2020
Merged

Implement backoff for Kafka output #17808

merged 27 commits into from
May 5, 2020

Conversation

ycombinator
Copy link
Contributor

@ycombinator ycombinator commented Apr 17, 2020

What does this PR do?

Implements the equal-jitter backoff strategy on the Kafka output.

Note that we cannot simply use outputs.WithBackoff on the Kafka output client because the latter's Publish() method never returns an error. The wrapper Publish() method implemented by the backoff client depends on the underlying output client returning an error so it can block (i.e. backoff for) a certain duration before returning.

So instead we use sarama's Producer.Retry.BackoffFunc while reusing the same backoff API that is used by other libbeat network outputs, viz. Elasticsearch, Logstash, and Redis.

Why is it important?

It will allow users to configure backoff options (backoff.init and backoff.max) for the Kafka output similar to what they can do for the Elasticsearch, Logstash, and Redis outputs.

Checklist

  • My code follows the style guidelines of this project
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
  • I have made corresponding change to the default configuration files
  • I have added tests that prove my fix is effective or that my feature works An automated test is quite cumbersome as it requires the orchestration of several processes. So for now I've listed out the manual test steps below.
  • I have added an entry in CHANGELOG.next.asciidoc or CHANGELOG-developer.next.asciidoc.

How to test this PR locally

To test this PR, one needs to cause the partition leader node to go away, which should trigger the backoff. One also needs to subsequently bring the partition leader node back online to ensure that the backoff duration has been reset.

  1. Start a 3-node Kafka cluster. Follow the instructions through step 6 on https://kafka.apache.org/quickstart but skip steps 3, 4, and 5.

  2. Configure filebeat.yml (could be any Beat, but Filebeat is easy for testing) like so:

    filebeat.inputs:
    - type: log
      enabled: true
      paths:
        - /tmp/logs/*.log
    
    output.kafka:
      enabled: true
      hosts: [ "localhost:9092", "localhost:9093", "localhost:9094" ]
      version: 2.1
      topic: foobar
      backoff:
        init: 3s
        max: 30s
    
  3. Start writing log lines to the test log file.

    mkdir /tmp/logs
    while true; do echo "$RANDOM new log entry" >> /tmp/logs/foo.log; sleep 0.1; done
    
  4. Start Filebeat.

    ./filebeat -c filebeat.test.yml -e
    
  5. Check that the foobar topic has been created in Kafka.

    bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic foobar
    

    This topic is expected to have only 1 partition, partition 0. Note this partition's leader node ID.

  6. Now kill the Kafka node corresponding to the partition leader's node ID.

  7. Re-run the command in step 5 and verify that that partition 0 has no leader.

  8. Check the Filebeat log. You should now see entries like this:

2020-04-22T07:06:35.852-0700    INFO    [kafka] kafka/config.go:288     backing off for 4.715506403s (init: 3s, max: 30s)
2020-04-22T07:06:42.094-0700    INFO    [kafka] kafka/config.go:288     backing off for 7.428653822s (init: 3s, max: 30s)
2020-04-22T07:06:51.053-0700    INFO    [kafka] kafka/config.go:288     backing off for 14.693773032s (init: 3s, max: 30s)
2020-04-22T07:07:07.272-0700    INFO    [kafka] kafka/config.go:288     backing off for 16.638039354s (init: 3s, max: 30s)
  1. Re-start the node you killed in step 6.

  2. Check the Filebeat log again. The "backing off for" entries should stop appearing in the log.

  3. Repeat steps 6. through 10. Make sure that the backoff duration is reset to a low number (close to the backoff.init value) again.

  4. Stop all 6 processes (ZK, 3x Kafka nodes, log generator, FB) to end the test.

Related issues

@ycombinator ycombinator added enhancement in progress Pull request is currently in progress. libbeat needs_backport PR is waiting to be backported to other branches. :Outputs v8.0.0 v7.8.0 labels Apr 17, 2020
@ycombinator ycombinator requested review from faec and urso April 22, 2020 14:34
@ycombinator ycombinator removed the in progress Pull request is currently in progress. label Apr 22, 2020
@ycombinator ycombinator marked this pull request as ready for review April 22, 2020 14:35
@ycombinator ycombinator requested review from a team as code owners April 22, 2020 14:35
Copy link
Contributor

@faec faec left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks great, thank you! Just a couple nits

@@ -37,6 +38,11 @@ import (
"github.com/elastic/beats/v7/libbeat/outputs/codec"
)

type backoffConfig struct {
Init time.Duration
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do these need annotations (config:"init" etc)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think they need them but I think it's a good idea to be explicit. Added in a0c8093f42c7e36f793e1629f6d8b0372496f644.

// Backoff defines the interface for backoff strategies.
type Backoff interface {
Wait() bool
Reset()
WaitDuration() time.Duration
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this interface workaround for the new setting! Please comment the new function to clarify that Wait and WaitDuration should have the same effect on the target's internal state, i.e. that repeatedly calling <-time.After(b.WaitDuration()) is ~equivalent to repeatedly calling b.Wait(), to be used when the wait needs to be handled manually.

Copy link
Contributor Author

@ycombinator ycombinator Apr 27, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent suggestion! Added in bb766b512220bc4bf0c1a422d6bb311ea186e358.

d := b.WaitDuration()
log.Infof("backing off for %v (init: %v, max: %v)", d, config.Backoff.Init, config.Backoff.Max)
return d
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is BackoffFunc called 'globally' in sarama, or per broker, or per partition? Is the function guaranteed to be called by one go-routine only?

Is a single 'backoff' instance good enough, or do we actually need multiple separate ones?

This function and reset are called from different go-routines. AFAIK the backoff instance used is not threadsafe. Given that we call this function potentially from multiple workers in sarama, this implementation might not exactly give us exponential backoff. If a subset of workers is active, the Reset will continue to reset our state, while errors in more than one worker can skip one or two backoff states.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, good points @urso. Thanks!

Let me look into the first set of questions around Sarama's implementation.

As for the thread safety, I suppose we could make our backoff instance thread safe so interleaved calls to WaitDuration and Reset are at least operating on a consistent internal state of the backoff instance. But before doing this I think it's useful to know how Sarama is using BackoffFunc internally, so let me start with that.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The signature for BackoffFunc is: BackoffFunc func(retries, maxRetries int) time.Duration. This indicates that we should have some stateless implementation (sarama tracks state):

e.g.

maxBackoffRetries := int(math.Ceil(math.Log2(float64(backoff.Max) / float64(backoff.Init))))

BackoffFunc: func(retries, _ int) time.Duration {
  jitter := ...
  if retries >= maxBackoffRetries {
    return backoff.Max + jitter
  }
  return time.Duration(uint64(backoff.Init) * uint64(1 << retries)) + jitter
}

Copy link
Contributor Author

@ycombinator ycombinator Apr 28, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is BackoffFunc called 'globally' in sarama, or per broker, or per partition?

It appears to be per partition: https://github.com/Shopify/sarama/blob/58123455d1a70c7f438871597d9a1715462e5d1c/async_producer.go#L497

Is the function guaranteed to be called by one go-routine only?

No, since there could be multiple goroutines in flight for different partitions: https://github.com/Shopify/sarama/blob/58123455d1a70c7f438871597d9a1715462e5d1c/async_producer.go#L489

Given the above, it means the state of the backoff instance would be shared across multiple partition producers, which is not good. Ideally the state would be per partition producer.

Alternatively we could provide a stateless implementation (like the one you suggested). Of course, that means we can't directly reuse the implementations used by other outputs, i.e. EqualJitterBackoff. But I don't see a way around that. I think we'd just have to live with a stateless implementation where the state (i.e. retries) is managed externally and passed into the backoff function for the kafka output or a stateful implementation for the other outputs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@urso I implemented a stateless backoff function in 0f7bfae01cd266cf9288d1132c1dc7da0f3ad53b per your suggestion but I deliberately made the calculations in it look more like the ones in the EqualJitterBackoff Wait() method.

@elasticmachine
Copy link
Collaborator

elasticmachine commented Apr 28, 2020

💔 Build Failed

Pipeline View Test View Changes Artifacts preview stats

Expand to view the summary

Build stats

Test stats 🧪

Test Results
Failed 1
Passed 7720
Skipped 1212
Total 8933

Test errors

Expand to view the tests failures

  • Name: Build and Test / Metricbeat OSS Unit tests / test_process – test_system.Test

    • Status: FAILED
    • Age: 1
    • Duration: 0.942
    • Error Details: False is not true : fd not found in any process events

Steps errors

Expand to view the steps failures

  • Name: Mage build unitTest

    • Description: mage build unitTest

    • Result: FAILURE

    • Duration: 9 min 2 sec<

    • Start Time: 2020-05-05T13:05:32.466+0000

  • Name: Report to Codecov

    • Description: curl -sSLo codecov https://codecov.io/bash for i in auditbeat filebeat heartbeat libbeat metricbeat packetbeat winlogbeat journalbeat do FILE="${i}/build/coverage/full.cov" if [ -f "${FILE}" ]; then bash codecov -f "${FILE}" fi done

    • Result: FAILURE

    • Duration: 2 min 22 sec<

    • Start Time: 2020-05-05T13:25:11.844+0000

Log output

Expand to view the last 100 lines of log output

[2020-05-05T14:13:43.425Z] + FILE=libbeat/build/coverage/full.cov
[2020-05-05T14:13:43.425Z] + [ -f libbeat/build/coverage/full.cov ]
[2020-05-05T14:13:43.425Z] + FILE=metricbeat/build/coverage/full.cov
[2020-05-05T14:13:43.425Z] + [ -f metricbeat/build/coverage/full.cov ]
[2020-05-05T14:13:43.425Z] + FILE=packetbeat/build/coverage/full.cov
[2020-05-05T14:13:43.425Z] + [ -f packetbeat/build/coverage/full.cov ]
[2020-05-05T14:13:43.425Z] + FILE=winlogbeat/build/coverage/full.cov
[2020-05-05T14:13:43.425Z] + [ -f winlogbeat/build/coverage/full.cov ]
[2020-05-05T14:13:43.425Z] + FILE=journalbeat/build/coverage/full.cov
[2020-05-05T14:13:43.425Z] + [ -f journalbeat/build/coverage/full.cov ]
[2020-05-05T14:13:44.945Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-17808/src/github.com/elastic/beats
[2020-05-05T14:13:45.407Z] + find . -type f -name TEST*.xml -path */build/* -delete
[2020-05-05T14:13:45.429Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-17808/src/github.com/elastic/beats/Lint
[2020-05-05T14:13:45.639Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-17808/src/github.com/elastic/beats/Elastic-Agent-x-pack
[2020-05-05T14:13:45.832Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-17808/src/github.com/elastic/beats/Winlogbeat-oss
[2020-05-05T14:13:45.987Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-17808/src/github.com/elastic/beats/Generators-Metricbeat-Linux
[2020-05-05T14:13:46.151Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-17808/src/github.com/elastic/beats/Elastic-Agent-Mac-OS-X
[2020-05-05T14:13:46.435Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-17808/src/github.com/elastic/beats/Journalbeat-oss
[2020-05-05T14:13:46.634Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-17808/src/github.com/elastic/beats/Dockerlogbeat
[2020-05-05T14:13:46.904Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-17808/src/github.com/elastic/beats/Functionbeat-x-pack
[2020-05-05T14:13:47.315Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-17808/src/github.com/elastic/beats/Elastic-Agent-x-pack-Windows
[2020-05-05T14:13:47.769Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-17808/src/github.com/elastic/beats/Metricbeat-OSS-Unit-tests
[2020-05-05T14:13:48.158Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-17808/src/github.com/elastic/beats/Filebeat-Mac-OS-X
[2020-05-05T14:13:48.468Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-17808/src/github.com/elastic/beats/Metricbeat-crosscompile
[2020-05-05T14:13:48.756Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-17808/src/github.com/elastic/beats/Metricbeat-Mac-OS-X
[2020-05-05T14:13:49.086Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-17808/src/github.com/elastic/beats/Auditbeat-x-pack
[2020-05-05T14:13:49.352Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-17808/src/github.com/elastic/beats/Heartbeat-oss
[2020-05-05T14:13:49.855Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-17808/src/github.com/elastic/beats/Winlogbeat-Windows-x-pack
[2020-05-05T14:13:50.142Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-17808/src/github.com/elastic/beats/Auditbeat-Linux
[2020-05-05T14:13:50.459Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-17808/src/github.com/elastic/beats/Libbeat-x-pack
[2020-05-05T14:13:50.833Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-17808/src/github.com/elastic/beats/Filebeat-Windows
[2020-05-05T14:13:51.104Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-17808/src/github.com/elastic/beats/Packetbeat-oss
[2020-05-05T14:13:51.464Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-17808/src/github.com/elastic/beats/Metricbeat-Windows
[2020-05-05T14:13:51.692Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-17808/src/github.com/elastic/beats/Auditbeat-crosscompile
[2020-05-05T14:13:51.900Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-17808/src/github.com/elastic/beats/Filebeat-x-pack
[2020-05-05T14:13:52.067Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-17808/src/github.com/elastic/beats/Heartbeat-Mac-OS-X
[2020-05-05T14:13:52.285Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-17808/src/github.com/elastic/beats/Functionbeat-Mac-OS-X-x-pack
[2020-05-05T14:13:52.528Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-17808/src/github.com/elastic/beats/Winlogbeat-Windows
[2020-05-05T14:13:52.886Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-17808/src/github.com/elastic/beats/Filebeat-oss
[2020-05-05T14:13:53.420Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-17808/src/github.com/elastic/beats/Generators-Beat-Linux
[2020-05-05T14:13:53.681Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-17808/src/github.com/elastic/beats/Metricbeat-OSS-Integration-tests
[2020-05-05T14:13:54.072Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-17808/src/github.com/elastic/beats/Metricbeat-Python-integration-tests
[2020-05-05T14:13:54.289Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-17808/src/github.com/elastic/beats/Libbeat-oss
[2020-05-05T14:13:54.536Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-17808/src/github.com/elastic/beats/Heartbeat-Windows
[2020-05-05T14:13:54.993Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-17808/src/github.com/elastic/beats/Auditbeat-Mac-OS-X
[2020-05-05T14:13:55.317Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-17808/src/github.com/elastic/beats/Functionbeat-Windows
[2020-05-05T14:13:55.700Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-17808/src/github.com/elastic/beats/Generators-Metricbeat-Mac-OS-X
[2020-05-05T14:13:55.975Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-17808/src/github.com/elastic/beats/Libbeat-crosscompile
[2020-05-05T14:13:56.276Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-17808/src/github.com/elastic/beats/Auditbeat-Windows
[2020-05-05T14:13:56.721Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-17808/src/github.com/elastic/beats/Libbeat-stress-tests
[2020-05-05T14:13:57.207Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-17808/src/github.com/elastic/beats/Generators-Beat-Mac-OS-X
[2020-05-05T14:13:57.531Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-17808/src/github.com/elastic/beats/Metricbeat-x-pack
[2020-05-05T14:13:58.029Z] + cat
[2020-05-05T14:13:58.029Z] + /usr/local/bin/runbld ./runbld-script
[2020-05-05T14:13:58.029Z] Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF8
[2020-05-05T14:14:04.726Z] runbld>>> runbld started
[2020-05-05T14:14:04.726Z] runbld>>> 1.6.11/a66728ff8f4356963772e6e6d2069392fa06acbe
[2020-05-05T14:14:06.129Z] runbld>>> The following profiles matched the job 'Beats/beats-beats-mbp/PR-17808' in order of occurrence in the config (last value wins).
[2020-05-05T14:14:07.553Z] runbld>>> Debug logging enabled.
[2020-05-05T14:14:07.553Z] runbld>>> Storing result
[2020-05-05T14:14:07.553Z] runbld>>> Store result: created {:total 2, :successful 2, :failed 0} 1
[2020-05-05T14:14:07.553Z] runbld>>> BUILD: https://c150076387b5421f9154dfbf536e5c60.us-west1.gcp.cloud.es.io:9243/build-1587637540455/t/20200505141407-4EA35421
[2020-05-05T14:14:07.554Z] runbld>>> Adding system facts.
[2020-05-05T14:14:08.521Z] runbld>>> Adding vcs info for the latest commit:  290e85f66152708873d4861322ab9f08b95af4c7
[2020-05-05T14:14:08.521Z] runbld>>> >>>>>>>>>>>> SCRIPT EXECUTION BEGIN >>>>>>>>>>>>
[2020-05-05T14:14:08.521Z] runbld>>> Adding /usr/lib/jvm/java-8-openjdk-amd64/bin to the path.
[2020-05-05T14:14:08.521Z] Processing JUnit reports with runbld...
[2020-05-05T14:14:08.521Z] + echo 'Processing JUnit reports with runbld...'
[2020-05-05T14:14:09.115Z] runbld>>> <<<<<<<<<<<< SCRIPT EXECUTION END <<<<<<<<<<<<
[2020-05-05T14:14:09.115Z] runbld>>> DURATION: 17ms
[2020-05-05T14:14:09.116Z] runbld>>> STDOUT: 40 bytes
[2020-05-05T14:14:09.116Z] runbld>>> STDERR: 49 bytes
[2020-05-05T14:14:09.116Z] runbld>>> WRAPPED PROCESS: SUCCESS (0)
[2020-05-05T14:14:09.116Z] runbld>>> Searching for build metadata in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-17808/src/github.com/elastic/beats
[2020-05-05T14:14:10.520Z] runbld>>> Storing build metadata: 
[2020-05-05T14:14:10.520Z] runbld>>> Adding test report.
[2020-05-05T14:14:10.520Z] runbld>>> Searching for junit test output files with the pattern: TEST-.*\.xml$ in: /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-17808/src/github.com/elastic/beats
[2020-05-05T14:14:11.472Z] runbld>>> Found 102 test output files
[2020-05-05T14:14:12.048Z] runbld>>> No testsuite node found in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-17808/src/github.com/elastic/beats/Metricbeat-x-pack/x-pack/metricbeat/build/TEST-go-integration-openmetrics.xml
[2020-05-05T14:14:12.048Z] runbld>>> No testsuite node found in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-17808/src/github.com/elastic/beats/Metricbeat-x-pack/x-pack/metricbeat/build/TEST-go-integration-istio.xml
[2020-05-05T14:14:12.048Z] runbld>>> No testsuite node found in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-17808/src/github.com/elastic/beats/Metricbeat-x-pack/x-pack/metricbeat/build/TEST-go-integration-iis.xml
[2020-05-05T14:14:12.048Z] runbld>>> No testsuite node found in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-17808/src/github.com/elastic/beats/Metricbeat-x-pack/x-pack/metricbeat/build/TEST-go-integration-activemq.xml
[2020-05-05T14:14:12.048Z] runbld>>> No testsuite node found in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-17808/src/github.com/elastic/beats/Metricbeat-x-pack/x-pack/metricbeat/build/TEST-go-integration-tomcat.xml
[2020-05-05T14:14:12.048Z] runbld>>> No testsuite node found in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-17808/src/github.com/elastic/beats/Metricbeat-x-pack/x-pack/metricbeat/build/TEST-go-integration-cloudfoundry.xml
[2020-05-05T14:14:12.622Z] runbld>>> No testsuite node found in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-17808/src/github.com/elastic/beats/Metricbeat-OSS-Integration-tests/metricbeat/build/TEST-go-integration-windows.xml
[2020-05-05T14:14:12.887Z] runbld>>> No testsuite node found in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-17808/src/github.com/elastic/beats/Metricbeat-OSS-Integration-tests/metricbeat/build/TEST-go-integration-graphite.xml
[2020-05-05T14:14:13.157Z] runbld>>> Test output logs contained: Errors: 0 Failures: 1 Tests: 8783 Skipped: 1008
[2020-05-05T14:14:13.158Z] runbld>>> Storing result
[2020-05-05T14:14:13.158Z] runbld>>> FAILURES: 1
[2020-05-05T14:14:13.742Z] runbld>>> Store result: updated {:total 2, :successful 2, :failed 0} 2
[2020-05-05T14:14:13.742Z] runbld>>> BUILD: https://c150076387b5421f9154dfbf536e5c60.us-west1.gcp.cloud.es.io:9243/build-1587637540455/t/20200505141407-4EA35421
[2020-05-05T14:14:13.742Z] runbld>>> Email notification disabled by environment variable.
[2020-05-05T14:14:13.742Z] runbld>>> Slack notification disabled by environment variable.
[2020-05-05T14:14:20.404Z] Running on Jenkins in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-17808
[2020-05-05T14:14:21.076Z] [INFO] getVaultSecret: Getting secrets
[2020-05-05T14:14:21.214Z] Masking supported pattern matches of $VAULT_ADDR or $VAULT_ROLE_ID or $VAULT_SECRET_ID
[2020-05-05T14:14:22.197Z] + chmod 755 generate-build-data.sh
[2020-05-05T14:14:22.197Z] + ./generate-build-data.sh https://beats-ci.elastic.co/blue/rest/organizations/jenkins/pipelines/Beats/beats-beats-mbp/PR-17808/ https://beats-ci.elastic.co/blue/rest/organizations/jenkins/pipelines/Beats/beats-beats-mbp/PR-17808/runs/26 FAILURE 7416050
[2020-05-05T14:14:23.108Z] INFO: curl https://beats-ci.elastic.co/blue/rest/organizations/jenkins/pipelines/Beats/beats-beats-mbp/PR-17808/runs/26/steps/?limit=10000 -o steps-info.json
[2020-05-05T14:14:24.974Z] INFO: curl https://beats-ci.elastic.co/blue/rest/organizations/jenkins/pipelines/Beats/beats-beats-mbp/PR-17808/runs/26/tests/?status=FAILED -o tests-errors.json

}

return backoff
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checking the implementation for EqualJitterBackoff, we always apply jitter, even after hitting max.

maybe try this:

func(retries, _ int) time.Duration {
  // compute 'base' duration for exponential backoff
  var dur := cfg.Max
  if retries < maxBackoffRetries {
    dur = time.Duration((uint64(cfg.Init) * uint64(1<<retries)))
  }

  // apply about equaly distributed jitter in second half of the interval, such that the wait time falls into the interval [dur/2, dur]
  limit := int64(dur / 2)
  jitter := rand.Int63n(limit + 1)
  return time.Duration(limit + jitter)
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, you're right, 🤦. Will fix!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 5eb1cbd13cb02ae25978fa3270e6cf36207d382d.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add unit tests for the backoff function :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a unit test but please take a look. It doesn't make very strong assertions about what the backoff values after each call to the backoff function should be. I'm not sure if/how we can make even stronger assertions that the test is making right now.

if !assert.LessOrEqual(t, backoff.Milliseconds(), cfg.Max.Milliseconds()) {
t.Logf("init: %v, max: %v, retries: %v", cfg.Init, cfg.Max, retries)
return false
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The range is much too big. It only checks that we are in between init/2 and max, but not the evolution of the values.

Maybe quick check only based testing is some overkill here, while not capturing all cases. How about creating simple tests with maxRetries=0, maxRetries=1, ...? Based on this you can prepare an array of the expected exponential backoff values. The limit would be expected/2 <= current <= expected.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @urso. I tried to implement the stronger assertions testing the evolution of the backoff values in a8148fc08. Let me know what you think!

actualBackoff := backoffFn(retries, 50)

require.GreaterOrEqual(t, float64(actualBackoff), expectedBackoff/2)
require.LessOrEqual(t, float64(actualBackoff), expectedBackoff)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Normally I'd prefer not to use floats, but it seems to be ok. For error reporting, maybe just use t.Fatal like this?

if !(expected / 2 <= actual <= expected) {
  t.Fatalf("backoff '%v' not in expected range [%v, %v] (retries: %v)", actual, expected/2, expected, retries)
}

SeedFlag = flag.Int64("seed", 0, "Randomization seed")
)

func SeedPRNG(t *testing.T) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not use the libbeat/testing package for unit test helpers. The testing package is used to provide helpers for the <beatname> test ... CLI command. By importing testing here all Beats will link in the testing package and will have all -test.X CLI flags.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like it's not easy to create shared test utilities that can be used across packages. I think the next best thing might be to create an internal package so at least all code within libbeat can access packages under it but any packages outside libbeat cannot. I did this in 34fb1f9. WDYT?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Works for me. We have some other test helpers (e.g. check that we don't have any leaky go-routines). Maybe create an issue to 'unify' testing helpers organization.

Copy link
Contributor Author

@ycombinator ycombinator May 5, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done: #18239

@ycombinator
Copy link
Contributor Author

CI failures are unrelated. Merging.

@ycombinator ycombinator merged commit f6883ab into elastic:master May 5, 2020
@ycombinator ycombinator deleted the lb-kafka-output-backoff branch May 5, 2020 16:46
ycombinator added a commit that referenced this pull request May 5, 2020
* Implement backoff for Kafka output (#17808)

* Expose wait duration from backoff strategies

* Add backoff section to kafka output config

* Use equal jitter backoff strategy with kafka producer

* Adding CHANGELOG entry

* Adding backoff options to reference config template

* Update reference config files

* Implementing new Backoff interface

* Adding explanation of strategy choice to comment

* Implementing new Backoff interface

* Adding godoc

* Resetting backoff

* Updating test

* Adding godoc for interface

* Adding struct tags for config settings

* Implementing stateless backoff function

* Undoing changes to backoff strategies

* Adding godoc

* Fixing backoff duration calculation

* WIP: Add unit test

* Refactor seed functionality into common package

* Fix test

* Testing evolution of backoff value over retries

* Fixing refactoring of PRNG seeding after rebase

* Better failure message from test

* Moving test utilities to internal package

* Fixing calls

* Adding config options to template

* Fixing up CHANGELOG

* Fixing up CHANGELOG
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement libbeat needs_backport PR is waiting to be backported to other branches. :Outputs v7.8.0 v8.0.0
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Implement backoff for Kafka output
5 participants