Skip to content

Commit

Permalink
Publisher options refactoring
Browse files Browse the repository at this point in the history
- have separate sync and guaranteed options:
  - sync: just blocks until output plugin finished processing events
    to be published
  - guaranteed: is passed and handled in output plugin. If options is set,
    sending is retried until success.

- guaranteed sending is handled by output plugin retrying infinitely until
  events have been ACKed.

- Introduce publish Signal option

- Update publisher options in filebeat/winlogbeat
  • Loading branch information
urso committed Jan 12, 2016
1 parent 4b29e74 commit addd132
Show file tree
Hide file tree
Showing 26 changed files with 185 additions and 141 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ https://github.com/elastic/beats/compare/1.0.0...master[Check the HEAD diff]
==== Breaking changes

*Affecting all Beats*
- Some publisher options refactoring in libbeat {pull}684[684]

*Packetbeat*

Expand Down
2 changes: 1 addition & 1 deletion filebeat/beat/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func Publish(beat *beat.Beat, fb *Filebeat) {
pubEvents = append(pubEvents, event.ToMapStr())
}

beat.Events.PublishEvents(pubEvents, publisher.Sync)
beat.Events.PublishEvents(pubEvents, publisher.Sync, publisher.Guaranteed)

logp.Info("Events sent: %d", len(events))

Expand Down
6 changes: 4 additions & 2 deletions libbeat/outputs/console/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package console
import (
"encoding/json"
"os"
"time"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
Expand Down Expand Up @@ -47,7 +46,7 @@ func writeBuffer(buf []byte) error {

func (c *console) PublishEvent(
s outputs.Signaler,
ts time.Time,
opts outputs.Options,
event common.MapStr,
) error {
var jsonEvent []byte
Expand All @@ -74,6 +73,9 @@ func (c *console) PublishEvent(
outputs.SignalCompleted(s)
return nil
fail:
if opts.Guaranteed {
logp.Critical("Unable to publish events to console: %v", err)
}
outputs.SignalFailed(s, err)
return err
}
4 changes: 2 additions & 2 deletions libbeat/outputs/console/console_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ import (
"io"
"os"
"testing"
"time"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/outputs"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -47,7 +47,7 @@ func event(k, v string) common.MapStr {
func run(c *console, events ...common.MapStr) (string, error) {
return withStdout(func() {
for _, event := range events {
c.PublishEvent(nil, time.Now(), event)
c.PublishEvent(nil, outputs.Options{}, event)
}
})
}
Expand Down
2 changes: 1 addition & 1 deletion libbeat/outputs/elasticsearch/api_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestIndex(t *testing.T) {
}
_, resp, err := client.Index(index, "test", "1", params, body)
if err != nil {
t.Errorf("Index() returns error: %s", err)
t.Fatalf("Index() returns error: %s", err)
}
if !resp.Created {
t.Errorf("Index() fails: %s", resp)
Expand Down
8 changes: 4 additions & 4 deletions libbeat/outputs/elasticsearch/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,16 +180,16 @@ func makeClientFactory(

func (out *elasticsearchOutput) PublishEvent(
signaler outputs.Signaler,
ts time.Time,
opts outputs.Options,
event common.MapStr,
) error {
return out.mode.PublishEvent(signaler, event)
return out.mode.PublishEvent(signaler, opts, event)
}

func (out *elasticsearchOutput) BulkPublish(
trans outputs.Signaler,
ts time.Time,
opts outputs.Options,
events []common.MapStr,
) error {
return out.mode.PublishEvents(trans, events)
return out.mode.PublishEvents(trans, opts, events)
}
10 changes: 6 additions & 4 deletions libbeat/outputs/elasticsearch/output_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"github.com/elastic/beats/libbeat/outputs"
)

var testOptions = outputs.Options{}

func createElasticsearchConnection(flushInterval int, bulkSize int) elasticsearchOutput {
index := fmt.Sprintf("packetbeat-unittest-%d", os.Getpid())

Expand Down Expand Up @@ -117,7 +119,7 @@ func TestOneEvent(t *testing.T) {
},
})

err := output.PublishEvent(nil, ts, event)
err := output.PublishEvent(nil, testOptions, event)
if err != nil {
t.Errorf("Failed to publish the event: %s", err)
}
Expand Down Expand Up @@ -189,7 +191,7 @@ func TestEvents(t *testing.T) {
},
})

err := output.PublishEvent(nil, ts, event)
err := output.PublishEvent(nil, testOptions, event)
if err != nil {
t.Errorf("Failed to publish the event: %s", err)
}
Expand All @@ -199,7 +201,7 @@ func TestEvents(t *testing.T) {
r["response"] = 0
event["redis"] = r

err = output.PublishEvent(nil, ts, event)
err = output.PublishEvent(nil, testOptions, event)
if err != nil {
t.Errorf("Failed to publish the event: %s", err)
}
Expand Down Expand Up @@ -259,7 +261,7 @@ func testBulkWithParams(t *testing.T, output elasticsearchOutput) {
r["response"] = "value" + strconv.Itoa(i)
event["redis"] = r

err := output.PublishEvent(nil, ts, event)
err := output.PublishEvent(nil, testOptions, event)
if err != nil {
t.Errorf("Failed to publish the event: %s", err)
}
Expand Down
10 changes: 6 additions & 4 deletions libbeat/outputs/fileout/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package fileout

import (
"encoding/json"
"time"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
Expand Down Expand Up @@ -74,7 +73,7 @@ func (out *fileOutput) init(config *outputs.MothershipConfig, topology_expire in

func (out *fileOutput) PublishEvent(
trans outputs.Signaler,
ts time.Time,
opts outputs.Options,
event common.MapStr,
) error {
jsonEvent, err := json.Marshal(event)
Expand All @@ -88,9 +87,12 @@ func (out *fileOutput) PublishEvent(

err = out.rotator.WriteLine(jsonEvent)
if err != nil {
logp.Err("Error when writing line to file: %s", err)
if opts.Guaranteed {
logp.Critical("Unable to write events to file: %s", err)
} else {
logp.Err("Error when writing line to file: %s", err)
}
}

outputs.Signal(trans, err)
return err
}
8 changes: 4 additions & 4 deletions libbeat/outputs/logstash/logstash.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,24 +150,24 @@ func makeClientFactory(
// send/receive overhead per event for other implementors too.
func (lj *logstash) PublishEvent(
signaler outputs.Signaler,
ts time.Time,
opts outputs.Options,
event common.MapStr,
) error {
lj.addMeta(event)
return lj.mode.PublishEvent(signaler, event)
return lj.mode.PublishEvent(signaler, opts, event)
}

// BulkPublish implements the BulkOutputer interface pushing a bulk of events
// via lumberjack.
func (lj *logstash) BulkPublish(
trans outputs.Signaler,
ts time.Time,
opts outputs.Options,
events []common.MapStr,
) error {
for _, event := range events {
lj.addMeta(event)
}
return lj.mode.PublishEvents(trans, events)
return lj.mode.PublishEvents(trans, opts, events)
}

// addMeta adapts events to be compatible with logstash forwarer messages by renaming
Expand Down
14 changes: 7 additions & 7 deletions libbeat/outputs/logstash/logstash_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ func testSendMessageViaLogstash(t *testing.T, name string, tls bool) {
"type": "log",
"message": "hello world",
}
ls.PublishEvent(nil, time.Now(), event)
ls.PublishEvent(nil, testOptions, event)

// wait for logstash event flush + elasticsearch
waitUntilTrue(5*time.Second, checkIndex(ls, 1))
Expand Down Expand Up @@ -313,7 +313,7 @@ func testSendMultipleViaLogstash(t *testing.T, name string, tls bool) {
"type": "log",
"message": fmt.Sprintf("hello world - %v", i),
}
ls.PublishEvent(nil, time.Now(), event)
ls.PublishEvent(nil, testOptions, event)
}

// wait for logstash event flush + elasticsearch
Expand Down Expand Up @@ -384,7 +384,7 @@ func testSendMultipleBatchesViaLogstash(

for _, batch := range batches {
sig := outputs.NewSyncSignal()
ls.BulkPublish(sig, time.Now(), batch)
ls.BulkPublish(sig, testOptions, batch)
ok := sig.Wait()
assert.Equal(t, true, ok)
}
Expand Down Expand Up @@ -432,10 +432,10 @@ func testLogstashElasticOutputPluginCompatibleMessage(t *testing.T, name string,
"message": "hello world",
}

es.PublishEvent(nil, ts, event)
es.PublishEvent(nil, testOptions, event)
waitUntilTrue(timeout, checkIndex(es, 1))

ls.PublishEvent(nil, ts, event)
ls.PublishEvent(nil, testOptions, event)
waitUntilTrue(timeout, checkIndex(ls, 1))

// search value in logstash elasticsearch index
Expand Down Expand Up @@ -487,10 +487,10 @@ func testLogstashElasticOutputPluginBulkCompatibleMessage(t *testing.T, name str
"message": "hello world",
},
}
es.BulkPublish(nil, ts, events)
es.BulkPublish(nil, testOptions, events)
waitUntilTrue(timeout, checkIndex(es, 1))

ls.BulkPublish(nil, ts, events)
ls.BulkPublish(nil, testOptions, events)
waitUntilTrue(timeout, checkIndex(ls, 1))

// search value in logstash elasticsearch index
Expand Down
6 changes: 4 additions & 2 deletions libbeat/outputs/logstash/logstash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ type mockLSServer struct {
handshake func(net.Conn)
}

var testOptions = outputs.Options{}

func newMockTLSServer(t *testing.T, to time.Duration, cert string) *mockLSServer {
tcpListener, err := net.Listen("tcp", "localhost:0")
if err != nil {
Expand Down Expand Up @@ -394,7 +396,7 @@ func testConnectionType(
output := makeOutputer()

signal := outputs.NewSyncSignal()
output.PublishEvent(signal, time.Now(), testEvent())
output.PublishEvent(signal, testOptions, testEvent())
result.signal = signal.Wait()
}()

Expand Down Expand Up @@ -472,7 +474,7 @@ func TestLogstashInvalidTLS(t *testing.T) {
output := newTestLumberjackOutput(t, "", &config)

signal := outputs.NewSyncSignal()
output.PublishEvent(signal, time.Now(), testEvent())
output.PublishEvent(signal, testOptions, testEvent())
result.signal = signal.Wait()
}()

Expand Down
48 changes: 32 additions & 16 deletions libbeat/outputs/mode/balance.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,13 @@ func NewLoadBalancerMode(
maxAttempts int,
waitRetry, timeout, maxWaitRetry time.Duration,
) (*LoadBalancerMode, error) {

// maxAttempts signals infinite retry. Convert to -1, so attempts left and
// and infinite retry can be more easily distinguished by load balancer
if maxAttempts == 0 {
maxAttempts = -1
}

m := &LoadBalancerMode{
timeout: timeout,
maxWaitRetry: maxWaitRetry,
Expand All @@ -94,28 +101,33 @@ func (m *LoadBalancerMode) Close() error {
// PublishEvents forwards events to some load balancing worker.
func (m *LoadBalancerMode) PublishEvents(
signaler outputs.Signaler,
opts outputs.Options,
events []common.MapStr,
) error {
return m.publishEventsMessage(eventsMessage{
attemptsLeft: m.maxAttempts,
signaler: signaler,
events: events,
})
return m.publishEventsMessage(opts,
eventsMessage{signaler: signaler, events: events})
}

// PublishEvent forwards the event to some load balancing worker.
func (m *LoadBalancerMode) PublishEvent(
signaler outputs.Signaler,
opts outputs.Options,
event common.MapStr,
) error {
return m.publishEventsMessage(eventsMessage{
attemptsLeft: m.maxAttempts,
signaler: signaler,
event: event,
})
return m.publishEventsMessage(opts,
eventsMessage{signaler: signaler, event: event})
}

func (m *LoadBalancerMode) publishEventsMessage(msg eventsMessage) error {
func (m *LoadBalancerMode) publishEventsMessage(
opts outputs.Options,
msg eventsMessage,
) error {
maxAttempts := m.maxAttempts
if opts.Guaranteed {
maxAttempts = -1
}
msg.attemptsLeft = maxAttempts

if ok := m.forwardEvent(m.work, msg); !ok {
dropping(msg)
}
Expand Down Expand Up @@ -170,7 +182,9 @@ func (m *LoadBalancerMode) start(clients []ProtocolClient) {
func (m *LoadBalancerMode) onMessage(client ProtocolClient, msg eventsMessage) {
if msg.event != nil {
if err := client.PublishEvent(msg.event); err != nil {
msg.attemptsLeft--
if msg.attemptsLeft > 0 {
msg.attemptsLeft--
}
m.onFail(msg, err)
return
}
Expand All @@ -184,10 +198,12 @@ func (m *LoadBalancerMode) onMessage(client ProtocolClient, msg eventsMessage) {

events, err = client.PublishEvents(events)
if err != nil {
msg.attemptsLeft--
if msg.attemptsLeft > 0 {
msg.attemptsLeft--
}

// reset attempt count if subset of messages has been processed
if len(events) < total {
if len(events) < total && msg.attemptsLeft >= 0 {
msg.attemptsLeft = m.maxAttempts
}

Expand All @@ -198,7 +214,7 @@ func (m *LoadBalancerMode) onMessage(client ProtocolClient, msg eventsMessage) {
return
}

if m.maxAttempts > 0 && msg.attemptsLeft <= 0 {
if m.maxAttempts > 0 && msg.attemptsLeft == 0 {
// no more attempts left => drop
dropping(msg)
return
Expand Down Expand Up @@ -239,7 +255,7 @@ func (m *LoadBalancerMode) forwardEvent(
ch chan eventsMessage,
msg eventsMessage,
) bool {
if m.maxAttempts == 0 {
if msg.attemptsLeft < 0 {
select {
case ch <- msg:
return true
Expand Down
Loading

0 comments on commit addd132

Please sign in to comment.