Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Logstash client window size enhancement #598

Merged
merged 1 commit into from
Dec 30, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ https://github.com/elastic/beats/compare/1.0.0...master[Check the HEAD diff]
==== Bugfixes

*Affecting all Beats*
- Fix logstash window size of 1 not increasing. {pull}598[598]

*Packetbeat*
- Fix setting direction to out and use its value to decide when dropping events if ignore_outgoing is enabled {pull}557[557]
Expand Down
79 changes: 56 additions & 23 deletions libbeat/outputs/logstash/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"errors"
"expvar"
"io"
"math"
"net"
"time"

Expand Down Expand Up @@ -71,6 +72,16 @@ func newLumberjackClient(
}
}

func (l *lumberjackClient) Connect(timeout time.Duration) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trying to follow how this change is related to the other changes? Is this Method used? Same for close.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Connect and Close are used by output/modes module. Just adding some debug here to better see how window size changes and reconnects/errors happen (as these are related). We re-connect on failure + we adapt window sizes on error.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

logp.Debug("logstash", "connect")
return l.TransportClient.Connect(timeout)
}

func (l *lumberjackClient) Close() error {
logp.Debug("logstash", "close connection")
return l.TransportClient.Close()
}

func (l *lumberjackClient) PublishEvent(event common.MapStr) error {
_, err := l.PublishEvents([]common.MapStr{event})
return err
Expand Down Expand Up @@ -106,10 +117,12 @@ func (l *lumberjackClient) publishWindowed(events []common.MapStr) (int, error)
return 0, nil
}

logp.Debug("logstash", "Try to publish %v events to logstash with window size %v", len(events), l.windowSize)
batchSize := len(events)

logp.Debug("logstash", "Try to publish %v events to logstash with window size %v", batchSize, l.windowSize)

// prepare message payload
if len(events) > l.windowSize {
if batchSize > l.windowSize {
events = events[:l.windowSize]
}
count, payload, err := l.compressEvents(events)
Expand Down Expand Up @@ -146,29 +159,13 @@ func (l *lumberjackClient) publishWindowed(events []common.MapStr) (int, error)
}
}

// success: increase window size by factor 1.5 until max window size
// (window size grows exponentially)
// TODO: use duration until ACK to estimate an ok max window size value
if l.maxOkWindowSize < l.windowSize {
l.maxOkWindowSize = l.windowSize

if l.windowSize < l.maxWindowSize {
l.windowSize = l.windowSize + l.windowSize/2
if l.windowSize > l.maxWindowSize {
l.windowSize = l.maxWindowSize
}
}
} else if l.windowSize < l.maxOkWindowSize {
l.windowSize = l.windowSize + l.windowSize/2
if l.windowSize > l.maxOkWindowSize {
l.windowSize = l.maxOkWindowSize
}
}

l.tryGrowWindowSize(batchSize)
return len(events), nil
}

func (l *lumberjackClient) onFail(n int, err error) (int, error) {
l.shrinkWindow()

// if timeout error, back off and ignore error
nerr, ok := err.(net.Error)
if !ok || !nerr.Timeout() {
Expand All @@ -184,14 +181,50 @@ func (l *lumberjackClient) onFail(n int, err error) (int, error) {
return n, err
}

// timeout error. reduce window size and return 0 published events. Send
// timeout error. events. Send
// mode might try to publish again with reduce window size or ask another
// client to send events
return n, nil
}

// Increase window size by factor 1.5 until max window size
// (window size grows exponentially)
// TODO: use duration until ACK to estimate an ok max window size value
func (l *lumberjackClient) tryGrowWindowSize(batchSize int) {
if l.windowSize <= batchSize {
if l.maxOkWindowSize < l.windowSize {
logp.Debug("logstash", "update max ok window size: %v < %v", l.maxOkWindowSize, l.windowSize)
l.maxOkWindowSize = l.windowSize

newWindowSize := int(math.Ceil(1.5 * float64(l.windowSize)))
logp.Debug("logstash", "increase window size to: %v", newWindowSize)

if l.windowSize <= batchSize && batchSize < newWindowSize {
logp.Debug("logstash", "set to batchSize: %v", batchSize)
newWindowSize = batchSize
}
if newWindowSize > l.maxWindowSize {
logp.Debug("logstash", "set to max window size: %v", l.maxWindowSize)
newWindowSize = l.maxWindowSize
}
l.windowSize = newWindowSize
} else if l.windowSize < l.maxOkWindowSize {
logp.Debug("logstash", "update current window size: %v", l.windowSize)

l.windowSize = int(math.Ceil(1.5 * float64(l.windowSize)))
if l.windowSize > l.maxOkWindowSize {
logp.Debug("logstash", "set to max ok window size: %v", l.maxOkWindowSize)
l.windowSize = l.maxOkWindowSize
}
}
}
}

func (l *lumberjackClient) shrinkWindow() {
l.windowSize = l.windowSize / 2
if l.windowSize < minWindowSize {
l.windowSize = minWindowSize
}
return n, nil
}

func (l *lumberjackClient) compressEvents(
Expand Down
58 changes: 58 additions & 0 deletions libbeat/outputs/logstash/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/streambuf"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/outputs/mode"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -392,3 +393,60 @@ func TestStructuredEvent(t *testing.T) {
assert.Equal(t, true, msg.doc.get("struct.field2"))
assert.Equal(t, 2.0, msg.doc.get("struct.field5.sub1"))
}

func enableLogging(selectors []string) {
logp.LogInit(logp.LOG_DEBUG, "", false, true, selectors)
}

func TestGrowWindowSizeUpToBatchSizes(t *testing.T) {
batchSize := 114
windowSize := 1024
testGrowWindowSize(t, 10, 0, windowSize, batchSize, batchSize)
}

func TestGrowWindowSizeUpToMax(t *testing.T) {
batchSize := 114
windowSize := 64
testGrowWindowSize(t, 10, 0, windowSize, batchSize, windowSize)
}

func TestGrowWindowSizeOf1(t *testing.T) {
batchSize := 114
windowSize := 1024
testGrowWindowSize(t, 1, 0, windowSize, batchSize, batchSize)
}

func TestGrowWindowSizeToMaxOKOnly(t *testing.T) {
batchSize := 114
windowSize := 1024
maxOK := 71
testGrowWindowSize(t, 1, maxOK, windowSize, batchSize, maxOK)
}

func testGrowWindowSize(t *testing.T,
initial, maxOK, windowSize, batchSize, expected int,
) {
enableLogging([]string{"logstash"})
c := newLumberjackClient(nil, windowSize, 1*time.Second)
c.windowSize = initial
c.maxOkWindowSize = maxOK
for i := 0; i < 100; i++ {
c.tryGrowWindowSize(batchSize)
}

assert.Equal(t, expected, c.windowSize)
assert.Equal(t, expected, c.maxOkWindowSize)
}

func TestShrinkWindowSizeNeverZero(t *testing.T) {
enableLogging([]string{"logstash"})

windowSize := 124
c := newLumberjackClient(nil, windowSize, 1*time.Second)
c.windowSize = windowSize
for i := 0; i < 100; i++ {
c.shrinkWindow()
}

assert.Equal(t, 1, c.windowSize)
}