From b9ac89ce2e8e9391bb57f31d8ebcb488c475e209 Mon Sep 17 00:00:00 2001 From: urso Date: Tue, 29 Dec 2015 17:54:24 +0100 Subject: [PATCH] Logstash client window size enhancement - Have window size converge towards batch size requested to be published. If maximum batch size ever published is less then maximum allowed window size. - Always shrink window size on error. If connection was lost due to logstash closing it by internal timeouts (circuite breaker) we want to decrease the window size on next try. - Add unit tests for logstash client window sizing behavior - Add changelog entry --- CHANGELOG.asciidoc | 1 + libbeat/outputs/logstash/client.go | 79 ++++++++++++++++++------- libbeat/outputs/logstash/client_test.go | 58 ++++++++++++++++++ 3 files changed, 115 insertions(+), 23 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index d47796f8fea..e569f9ef65f 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -20,6 +20,7 @@ https://github.com/elastic/beats/compare/1.0.0...master[Check the HEAD diff] ==== Bugfixes *Affecting all Beats* +- Fix logstash window size of 1 not increasing. {pull}598[598] *Packetbeat* - Fix setting direction to out and use its value to decide when dropping events if ignore_outgoing is enabled {pull}557[557] diff --git a/libbeat/outputs/logstash/client.go b/libbeat/outputs/logstash/client.go index 38bfbaa3709..5a7f1aaac92 100644 --- a/libbeat/outputs/logstash/client.go +++ b/libbeat/outputs/logstash/client.go @@ -8,6 +8,7 @@ import ( "errors" "expvar" "io" + "math" "net" "time" @@ -71,6 +72,16 @@ func newLumberjackClient( } } +func (l *lumberjackClient) Connect(timeout time.Duration) error { + logp.Debug("logstash", "connect") + return l.TransportClient.Connect(timeout) +} + +func (l *lumberjackClient) Close() error { + logp.Debug("logstash", "close connection") + return l.TransportClient.Close() +} + func (l *lumberjackClient) PublishEvent(event common.MapStr) error { _, err := l.PublishEvents([]common.MapStr{event}) return err @@ -106,10 +117,12 @@ func (l *lumberjackClient) publishWindowed(events []common.MapStr) (int, error) return 0, nil } - logp.Debug("logstash", "Try to publish %v events to logstash with window size %v", len(events), l.windowSize) + batchSize := len(events) + + logp.Debug("logstash", "Try to publish %v events to logstash with window size %v", batchSize, l.windowSize) // prepare message payload - if len(events) > l.windowSize { + if batchSize > l.windowSize { events = events[:l.windowSize] } count, payload, err := l.compressEvents(events) @@ -146,29 +159,13 @@ func (l *lumberjackClient) publishWindowed(events []common.MapStr) (int, error) } } - // success: increase window size by factor 1.5 until max window size - // (window size grows exponentially) - // TODO: use duration until ACK to estimate an ok max window size value - if l.maxOkWindowSize < l.windowSize { - l.maxOkWindowSize = l.windowSize - - if l.windowSize < l.maxWindowSize { - l.windowSize = l.windowSize + l.windowSize/2 - if l.windowSize > l.maxWindowSize { - l.windowSize = l.maxWindowSize - } - } - } else if l.windowSize < l.maxOkWindowSize { - l.windowSize = l.windowSize + l.windowSize/2 - if l.windowSize > l.maxOkWindowSize { - l.windowSize = l.maxOkWindowSize - } - } - + l.tryGrowWindowSize(batchSize) return len(events), nil } func (l *lumberjackClient) onFail(n int, err error) (int, error) { + l.shrinkWindow() + // if timeout error, back off and ignore error nerr, ok := err.(net.Error) if !ok || !nerr.Timeout() { @@ -184,14 +181,50 @@ func (l *lumberjackClient) onFail(n int, err error) (int, error) { return n, err } - // timeout error. reduce window size and return 0 published events. Send + // timeout error. events. Send // mode might try to publish again with reduce window size or ask another // client to send events + return n, nil +} + +// Increase window size by factor 1.5 until max window size +// (window size grows exponentially) +// TODO: use duration until ACK to estimate an ok max window size value +func (l *lumberjackClient) tryGrowWindowSize(batchSize int) { + if l.windowSize <= batchSize { + if l.maxOkWindowSize < l.windowSize { + logp.Debug("logstash", "update max ok window size: %v < %v", l.maxOkWindowSize, l.windowSize) + l.maxOkWindowSize = l.windowSize + + newWindowSize := int(math.Ceil(1.5 * float64(l.windowSize))) + logp.Debug("logstash", "increase window size to: %v", newWindowSize) + + if l.windowSize <= batchSize && batchSize < newWindowSize { + logp.Debug("logstash", "set to batchSize: %v", batchSize) + newWindowSize = batchSize + } + if newWindowSize > l.maxWindowSize { + logp.Debug("logstash", "set to max window size: %v", l.maxWindowSize) + newWindowSize = l.maxWindowSize + } + l.windowSize = newWindowSize + } else if l.windowSize < l.maxOkWindowSize { + logp.Debug("logstash", "update current window size: %v", l.windowSize) + + l.windowSize = int(math.Ceil(1.5 * float64(l.windowSize))) + if l.windowSize > l.maxOkWindowSize { + logp.Debug("logstash", "set to max ok window size: %v", l.maxOkWindowSize) + l.windowSize = l.maxOkWindowSize + } + } + } +} + +func (l *lumberjackClient) shrinkWindow() { l.windowSize = l.windowSize / 2 if l.windowSize < minWindowSize { l.windowSize = minWindowSize } - return n, nil } func (l *lumberjackClient) compressEvents( diff --git a/libbeat/outputs/logstash/client_test.go b/libbeat/outputs/logstash/client_test.go index 8f548b2d306..3fd8ec3094c 100644 --- a/libbeat/outputs/logstash/client_test.go +++ b/libbeat/outputs/logstash/client_test.go @@ -17,6 +17,7 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/streambuf" + "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/outputs/mode" "github.com/stretchr/testify/assert" @@ -392,3 +393,60 @@ func TestStructuredEvent(t *testing.T) { assert.Equal(t, true, msg.doc.get("struct.field2")) assert.Equal(t, 2.0, msg.doc.get("struct.field5.sub1")) } + +func enableLogging(selectors []string) { + logp.LogInit(logp.LOG_DEBUG, "", false, true, selectors) +} + +func TestGrowWindowSizeUpToBatchSizes(t *testing.T) { + batchSize := 114 + windowSize := 1024 + testGrowWindowSize(t, 10, 0, windowSize, batchSize, batchSize) +} + +func TestGrowWindowSizeUpToMax(t *testing.T) { + batchSize := 114 + windowSize := 64 + testGrowWindowSize(t, 10, 0, windowSize, batchSize, windowSize) +} + +func TestGrowWindowSizeOf1(t *testing.T) { + batchSize := 114 + windowSize := 1024 + testGrowWindowSize(t, 1, 0, windowSize, batchSize, batchSize) +} + +func TestGrowWindowSizeToMaxOKOnly(t *testing.T) { + batchSize := 114 + windowSize := 1024 + maxOK := 71 + testGrowWindowSize(t, 1, maxOK, windowSize, batchSize, maxOK) +} + +func testGrowWindowSize(t *testing.T, + initial, maxOK, windowSize, batchSize, expected int, +) { + enableLogging([]string{"logstash"}) + c := newLumberjackClient(nil, windowSize, 1*time.Second) + c.windowSize = initial + c.maxOkWindowSize = maxOK + for i := 0; i < 100; i++ { + c.tryGrowWindowSize(batchSize) + } + + assert.Equal(t, expected, c.windowSize) + assert.Equal(t, expected, c.maxOkWindowSize) +} + +func TestShrinkWindowSizeNeverZero(t *testing.T) { + enableLogging([]string{"logstash"}) + + windowSize := 124 + c := newLumberjackClient(nil, windowSize, 1*time.Second) + c.windowSize = windowSize + for i := 0; i < 100; i++ { + c.shrinkWindow() + } + + assert.Equal(t, 1, c.windowSize) +}