From addd13255e78f079d50970ad2f735b1d89582409 Mon Sep 17 00:00:00 2001 From: urso Date: Wed, 23 Dec 2015 22:13:40 +0100 Subject: [PATCH] Publisher options refactoring - have separate sync and guaranteed options: - sync: just blocks until output plugin finished processing events to be published - guaranteed: is passed and handled in output plugin. If options is set, sending is retried until success. - guaranteed sending is handled by output plugin retrying infinitely until events have been ACKed. - Introduce publish Signal option - Update publisher options in filebeat/winlogbeat --- CHANGELOG.asciidoc | 1 + filebeat/beat/filebeat.go | 2 +- libbeat/outputs/console/console.go | 6 ++- libbeat/outputs/console/console_test.go | 4 +- .../elasticsearch/api_integration_test.go | 2 +- libbeat/outputs/elasticsearch/output.go | 8 +-- libbeat/outputs/elasticsearch/output_test.go | 10 ++-- libbeat/outputs/fileout/file.go | 10 ++-- libbeat/outputs/logstash/logstash.go | 8 +-- .../logstash/logstash_integration_test.go | 14 ++--- libbeat/outputs/logstash/logstash_test.go | 6 ++- libbeat/outputs/mode/balance.go | 48 +++++++++++------ libbeat/outputs/mode/failover.go | 11 ++-- libbeat/outputs/mode/mode.go | 4 +- libbeat/outputs/mode/mode_test.go | 6 ++- libbeat/outputs/mode/single.go | 11 ++-- libbeat/outputs/outputs.go | 15 +++--- libbeat/outputs/redis/redis.go | 31 ++++++++--- libbeat/publisher/client.go | 52 +++++++++++-------- libbeat/publisher/client_test.go | 7 +-- libbeat/publisher/common_test.go | 4 +- libbeat/publisher/output.go | 47 ++++------------- libbeat/publisher/output_test.go | 3 +- libbeat/publisher/publish.go | 4 +- libbeat/publisher/sync.go | 10 +++- winlogbeat/beat/winlogbeat.go | 2 +- 26 files changed, 185 insertions(+), 141 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index da966de79f8..ac658d47624 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -13,6 +13,7 @@ https://github.com/elastic/beats/compare/1.0.0...master[Check the HEAD diff] ==== Breaking changes *Affecting all Beats* +- Some publisher options refactoring in libbeat {pull}684[684] *Packetbeat* diff --git a/filebeat/beat/filebeat.go b/filebeat/beat/filebeat.go index 579a35778c0..4051daa2fa7 100644 --- a/filebeat/beat/filebeat.go +++ b/filebeat/beat/filebeat.go @@ -125,7 +125,7 @@ func Publish(beat *beat.Beat, fb *Filebeat) { pubEvents = append(pubEvents, event.ToMapStr()) } - beat.Events.PublishEvents(pubEvents, publisher.Sync) + beat.Events.PublishEvents(pubEvents, publisher.Sync, publisher.Guaranteed) logp.Info("Events sent: %d", len(events)) diff --git a/libbeat/outputs/console/console.go b/libbeat/outputs/console/console.go index 0f8179a1ce1..f1f23080ed9 100644 --- a/libbeat/outputs/console/console.go +++ b/libbeat/outputs/console/console.go @@ -3,7 +3,6 @@ package console import ( "encoding/json" "os" - "time" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" @@ -47,7 +46,7 @@ func writeBuffer(buf []byte) error { func (c *console) PublishEvent( s outputs.Signaler, - ts time.Time, + opts outputs.Options, event common.MapStr, ) error { var jsonEvent []byte @@ -74,6 +73,9 @@ func (c *console) PublishEvent( outputs.SignalCompleted(s) return nil fail: + if opts.Guaranteed { + logp.Critical("Unable to publish events to console: %v", err) + } outputs.SignalFailed(s, err) return err } diff --git a/libbeat/outputs/console/console_test.go b/libbeat/outputs/console/console_test.go index 4f2a58e8bdc..e029f6ba70d 100644 --- a/libbeat/outputs/console/console_test.go +++ b/libbeat/outputs/console/console_test.go @@ -5,9 +5,9 @@ import ( "io" "os" "testing" - "time" "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/outputs" "github.com/stretchr/testify/assert" ) @@ -47,7 +47,7 @@ func event(k, v string) common.MapStr { func run(c *console, events ...common.MapStr) (string, error) { return withStdout(func() { for _, event := range events { - c.PublishEvent(nil, time.Now(), event) + c.PublishEvent(nil, outputs.Options{}, event) } }) } diff --git a/libbeat/outputs/elasticsearch/api_integration_test.go b/libbeat/outputs/elasticsearch/api_integration_test.go index 1268cf3a20c..2d5472e4c64 100644 --- a/libbeat/outputs/elasticsearch/api_integration_test.go +++ b/libbeat/outputs/elasticsearch/api_integration_test.go @@ -32,7 +32,7 @@ func TestIndex(t *testing.T) { } _, resp, err := client.Index(index, "test", "1", params, body) if err != nil { - t.Errorf("Index() returns error: %s", err) + t.Fatalf("Index() returns error: %s", err) } if !resp.Created { t.Errorf("Index() fails: %s", resp) diff --git a/libbeat/outputs/elasticsearch/output.go b/libbeat/outputs/elasticsearch/output.go index 5a6f1d34935..8f889aaa762 100644 --- a/libbeat/outputs/elasticsearch/output.go +++ b/libbeat/outputs/elasticsearch/output.go @@ -180,16 +180,16 @@ func makeClientFactory( func (out *elasticsearchOutput) PublishEvent( signaler outputs.Signaler, - ts time.Time, + opts outputs.Options, event common.MapStr, ) error { - return out.mode.PublishEvent(signaler, event) + return out.mode.PublishEvent(signaler, opts, event) } func (out *elasticsearchOutput) BulkPublish( trans outputs.Signaler, - ts time.Time, + opts outputs.Options, events []common.MapStr, ) error { - return out.mode.PublishEvents(trans, events) + return out.mode.PublishEvents(trans, opts, events) } diff --git a/libbeat/outputs/elasticsearch/output_test.go b/libbeat/outputs/elasticsearch/output_test.go index b660dec683f..42a9da4be55 100644 --- a/libbeat/outputs/elasticsearch/output_test.go +++ b/libbeat/outputs/elasticsearch/output_test.go @@ -12,6 +12,8 @@ import ( "github.com/elastic/beats/libbeat/outputs" ) +var testOptions = outputs.Options{} + func createElasticsearchConnection(flushInterval int, bulkSize int) elasticsearchOutput { index := fmt.Sprintf("packetbeat-unittest-%d", os.Getpid()) @@ -117,7 +119,7 @@ func TestOneEvent(t *testing.T) { }, }) - err := output.PublishEvent(nil, ts, event) + err := output.PublishEvent(nil, testOptions, event) if err != nil { t.Errorf("Failed to publish the event: %s", err) } @@ -189,7 +191,7 @@ func TestEvents(t *testing.T) { }, }) - err := output.PublishEvent(nil, ts, event) + err := output.PublishEvent(nil, testOptions, event) if err != nil { t.Errorf("Failed to publish the event: %s", err) } @@ -199,7 +201,7 @@ func TestEvents(t *testing.T) { r["response"] = 0 event["redis"] = r - err = output.PublishEvent(nil, ts, event) + err = output.PublishEvent(nil, testOptions, event) if err != nil { t.Errorf("Failed to publish the event: %s", err) } @@ -259,7 +261,7 @@ func testBulkWithParams(t *testing.T, output elasticsearchOutput) { r["response"] = "value" + strconv.Itoa(i) event["redis"] = r - err := output.PublishEvent(nil, ts, event) + err := output.PublishEvent(nil, testOptions, event) if err != nil { t.Errorf("Failed to publish the event: %s", err) } diff --git a/libbeat/outputs/fileout/file.go b/libbeat/outputs/fileout/file.go index 10977b050f3..58eaeb33f06 100644 --- a/libbeat/outputs/fileout/file.go +++ b/libbeat/outputs/fileout/file.go @@ -2,7 +2,6 @@ package fileout import ( "encoding/json" - "time" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" @@ -74,7 +73,7 @@ func (out *fileOutput) init(config *outputs.MothershipConfig, topology_expire in func (out *fileOutput) PublishEvent( trans outputs.Signaler, - ts time.Time, + opts outputs.Options, event common.MapStr, ) error { jsonEvent, err := json.Marshal(event) @@ -88,9 +87,12 @@ func (out *fileOutput) PublishEvent( err = out.rotator.WriteLine(jsonEvent) if err != nil { - logp.Err("Error when writing line to file: %s", err) + if opts.Guaranteed { + logp.Critical("Unable to write events to file: %s", err) + } else { + logp.Err("Error when writing line to file: %s", err) + } } - outputs.Signal(trans, err) return err } diff --git a/libbeat/outputs/logstash/logstash.go b/libbeat/outputs/logstash/logstash.go index 5db95330c9c..ce3867d863a 100644 --- a/libbeat/outputs/logstash/logstash.go +++ b/libbeat/outputs/logstash/logstash.go @@ -150,24 +150,24 @@ func makeClientFactory( // send/receive overhead per event for other implementors too. func (lj *logstash) PublishEvent( signaler outputs.Signaler, - ts time.Time, + opts outputs.Options, event common.MapStr, ) error { lj.addMeta(event) - return lj.mode.PublishEvent(signaler, event) + return lj.mode.PublishEvent(signaler, opts, event) } // BulkPublish implements the BulkOutputer interface pushing a bulk of events // via lumberjack. func (lj *logstash) BulkPublish( trans outputs.Signaler, - ts time.Time, + opts outputs.Options, events []common.MapStr, ) error { for _, event := range events { lj.addMeta(event) } - return lj.mode.PublishEvents(trans, events) + return lj.mode.PublishEvents(trans, opts, events) } // addMeta adapts events to be compatible with logstash forwarer messages by renaming diff --git a/libbeat/outputs/logstash/logstash_integration_test.go b/libbeat/outputs/logstash/logstash_integration_test.go index 3cf27eaf62a..91e2f0af99a 100644 --- a/libbeat/outputs/logstash/logstash_integration_test.go +++ b/libbeat/outputs/logstash/logstash_integration_test.go @@ -276,7 +276,7 @@ func testSendMessageViaLogstash(t *testing.T, name string, tls bool) { "type": "log", "message": "hello world", } - ls.PublishEvent(nil, time.Now(), event) + ls.PublishEvent(nil, testOptions, event) // wait for logstash event flush + elasticsearch waitUntilTrue(5*time.Second, checkIndex(ls, 1)) @@ -313,7 +313,7 @@ func testSendMultipleViaLogstash(t *testing.T, name string, tls bool) { "type": "log", "message": fmt.Sprintf("hello world - %v", i), } - ls.PublishEvent(nil, time.Now(), event) + ls.PublishEvent(nil, testOptions, event) } // wait for logstash event flush + elasticsearch @@ -384,7 +384,7 @@ func testSendMultipleBatchesViaLogstash( for _, batch := range batches { sig := outputs.NewSyncSignal() - ls.BulkPublish(sig, time.Now(), batch) + ls.BulkPublish(sig, testOptions, batch) ok := sig.Wait() assert.Equal(t, true, ok) } @@ -432,10 +432,10 @@ func testLogstashElasticOutputPluginCompatibleMessage(t *testing.T, name string, "message": "hello world", } - es.PublishEvent(nil, ts, event) + es.PublishEvent(nil, testOptions, event) waitUntilTrue(timeout, checkIndex(es, 1)) - ls.PublishEvent(nil, ts, event) + ls.PublishEvent(nil, testOptions, event) waitUntilTrue(timeout, checkIndex(ls, 1)) // search value in logstash elasticsearch index @@ -487,10 +487,10 @@ func testLogstashElasticOutputPluginBulkCompatibleMessage(t *testing.T, name str "message": "hello world", }, } - es.BulkPublish(nil, ts, events) + es.BulkPublish(nil, testOptions, events) waitUntilTrue(timeout, checkIndex(es, 1)) - ls.BulkPublish(nil, ts, events) + ls.BulkPublish(nil, testOptions, events) waitUntilTrue(timeout, checkIndex(ls, 1)) // search value in logstash elasticsearch index diff --git a/libbeat/outputs/logstash/logstash_test.go b/libbeat/outputs/logstash/logstash_test.go index 8b9e6306570..1a80279b6dd 100644 --- a/libbeat/outputs/logstash/logstash_test.go +++ b/libbeat/outputs/logstash/logstash_test.go @@ -31,6 +31,8 @@ type mockLSServer struct { handshake func(net.Conn) } +var testOptions = outputs.Options{} + func newMockTLSServer(t *testing.T, to time.Duration, cert string) *mockLSServer { tcpListener, err := net.Listen("tcp", "localhost:0") if err != nil { @@ -394,7 +396,7 @@ func testConnectionType( output := makeOutputer() signal := outputs.NewSyncSignal() - output.PublishEvent(signal, time.Now(), testEvent()) + output.PublishEvent(signal, testOptions, testEvent()) result.signal = signal.Wait() }() @@ -472,7 +474,7 @@ func TestLogstashInvalidTLS(t *testing.T) { output := newTestLumberjackOutput(t, "", &config) signal := outputs.NewSyncSignal() - output.PublishEvent(signal, time.Now(), testEvent()) + output.PublishEvent(signal, testOptions, testEvent()) result.signal = signal.Wait() }() diff --git a/libbeat/outputs/mode/balance.go b/libbeat/outputs/mode/balance.go index bc0fe0cc790..91d33c79cea 100644 --- a/libbeat/outputs/mode/balance.go +++ b/libbeat/outputs/mode/balance.go @@ -68,6 +68,13 @@ func NewLoadBalancerMode( maxAttempts int, waitRetry, timeout, maxWaitRetry time.Duration, ) (*LoadBalancerMode, error) { + + // maxAttempts signals infinite retry. Convert to -1, so attempts left and + // and infinite retry can be more easily distinguished by load balancer + if maxAttempts == 0 { + maxAttempts = -1 + } + m := &LoadBalancerMode{ timeout: timeout, maxWaitRetry: maxWaitRetry, @@ -94,28 +101,33 @@ func (m *LoadBalancerMode) Close() error { // PublishEvents forwards events to some load balancing worker. func (m *LoadBalancerMode) PublishEvents( signaler outputs.Signaler, + opts outputs.Options, events []common.MapStr, ) error { - return m.publishEventsMessage(eventsMessage{ - attemptsLeft: m.maxAttempts, - signaler: signaler, - events: events, - }) + return m.publishEventsMessage(opts, + eventsMessage{signaler: signaler, events: events}) } // PublishEvent forwards the event to some load balancing worker. func (m *LoadBalancerMode) PublishEvent( signaler outputs.Signaler, + opts outputs.Options, event common.MapStr, ) error { - return m.publishEventsMessage(eventsMessage{ - attemptsLeft: m.maxAttempts, - signaler: signaler, - event: event, - }) + return m.publishEventsMessage(opts, + eventsMessage{signaler: signaler, event: event}) } -func (m *LoadBalancerMode) publishEventsMessage(msg eventsMessage) error { +func (m *LoadBalancerMode) publishEventsMessage( + opts outputs.Options, + msg eventsMessage, +) error { + maxAttempts := m.maxAttempts + if opts.Guaranteed { + maxAttempts = -1 + } + msg.attemptsLeft = maxAttempts + if ok := m.forwardEvent(m.work, msg); !ok { dropping(msg) } @@ -170,7 +182,9 @@ func (m *LoadBalancerMode) start(clients []ProtocolClient) { func (m *LoadBalancerMode) onMessage(client ProtocolClient, msg eventsMessage) { if msg.event != nil { if err := client.PublishEvent(msg.event); err != nil { - msg.attemptsLeft-- + if msg.attemptsLeft > 0 { + msg.attemptsLeft-- + } m.onFail(msg, err) return } @@ -184,10 +198,12 @@ func (m *LoadBalancerMode) onMessage(client ProtocolClient, msg eventsMessage) { events, err = client.PublishEvents(events) if err != nil { - msg.attemptsLeft-- + if msg.attemptsLeft > 0 { + msg.attemptsLeft-- + } // reset attempt count if subset of messages has been processed - if len(events) < total { + if len(events) < total && msg.attemptsLeft >= 0 { msg.attemptsLeft = m.maxAttempts } @@ -198,7 +214,7 @@ func (m *LoadBalancerMode) onMessage(client ProtocolClient, msg eventsMessage) { return } - if m.maxAttempts > 0 && msg.attemptsLeft <= 0 { + if m.maxAttempts > 0 && msg.attemptsLeft == 0 { // no more attempts left => drop dropping(msg) return @@ -239,7 +255,7 @@ func (m *LoadBalancerMode) forwardEvent( ch chan eventsMessage, msg eventsMessage, ) bool { - if m.maxAttempts == 0 { + if msg.attemptsLeft < 0 { select { case ch <- msg: return true diff --git a/libbeat/outputs/mode/failover.go b/libbeat/outputs/mode/failover.go index b576c1696bc..bfa1bda3dcf 100644 --- a/libbeat/outputs/mode/failover.go +++ b/libbeat/outputs/mode/failover.go @@ -106,9 +106,10 @@ func (f *FailOverConnectionMode) connect(active int) error { // connection by random. func (f *FailOverConnectionMode) PublishEvents( signaler outputs.Signaler, + opts outputs.Options, events []common.MapStr, ) error { - return f.publish(signaler, func() (bool, bool) { + return f.publish(signaler, opts, func() (bool, bool) { // loop until all events have been send in case client supports partial sends for len(events) > 0 { var err error @@ -130,9 +131,10 @@ func (f *FailOverConnectionMode) PublishEvents( // PublishEvent forwards a single event. On failure PublishEvent tries to reconnect. func (f *FailOverConnectionMode) PublishEvent( signaler outputs.Signaler, + opts outputs.Options, event common.MapStr, ) error { - return f.publish(signaler, func() (bool, bool) { + return f.publish(signaler, opts, func() (bool, bool) { if err := f.conns[f.active].PublishEvent(event); err != nil { logp.Info("Error publishing events (retrying): %s", err) return false, false @@ -151,6 +153,7 @@ func (f *FailOverConnectionMode) PublishEvent( // to maxAttempts send attempts without any progress might be executed. func (f *FailOverConnectionMode) publish( signaler outputs.Signaler, + opts outputs.Options, send func() (ok bool, resetFail bool), ) error { fails := 0 @@ -159,7 +162,7 @@ func (f *FailOverConnectionMode) publish( // TODO: we want back off support here? Fail over normally will try another // connection. - for !f.closed && (f.maxAttempts == 0 || fails < f.maxAttempts) { + for !f.closed && (!opts.Guaranteed || f.maxAttempts == 0 || fails < f.maxAttempts) { ok := false resetFail := false @@ -181,7 +184,7 @@ func (f *FailOverConnectionMode) publish( if resetFail { fails = 0 } - if f.maxAttempts > 0 && fails == f.maxAttempts { + if !opts.Guaranteed && (f.maxAttempts > 0 && fails == f.maxAttempts) { // max number of attempts reached break } diff --git a/libbeat/outputs/mode/mode.go b/libbeat/outputs/mode/mode.go index 58dec13926f..85c9d6a8c72 100644 --- a/libbeat/outputs/mode/mode.go +++ b/libbeat/outputs/mode/mode.go @@ -28,10 +28,10 @@ type ConnectionMode interface { // PublishEvents will send all events (potentially asynchronous) to its // clients. - PublishEvents(trans outputs.Signaler, events []common.MapStr) error + PublishEvents(trans outputs.Signaler, opts outputs.Options, events []common.MapStr) error // PublishEvent will send an event to its clients. - PublishEvent(trans outputs.Signaler, event common.MapStr) error + PublishEvent(trans outputs.Signaler, opts outputs.Options, event common.MapStr) error } // ProtocolClient interface is a output plugin specific client implementation diff --git a/libbeat/outputs/mode/mode_test.go b/libbeat/outputs/mode/mode_test.go index 6d7a961601f..b6dcaa3dc41 100644 --- a/libbeat/outputs/mode/mode_test.go +++ b/libbeat/outputs/mode/mode_test.go @@ -126,6 +126,8 @@ var testEvent = common.MapStr{ "msg": "hello world", } +var testOpts = outputs.Options{} + func testMode( t *testing.T, mode ConnectionMode, @@ -155,14 +157,14 @@ func testMode( for _, pubEvents := range events { if pubEvents.single { for _, event := range pubEvents.events { - _ = mode.PublishEvent(signal, event) + _ = mode.PublishEvent(signal, testOpts, event) if expectedSignals[idx] { expectedEvents = append(expectedEvents, []common.MapStr{event}) } idx++ } } else { - _ = mode.PublishEvents(signal, pubEvents.events) + _ = mode.PublishEvents(signal, testOpts, pubEvents.events) if expectedSignals[idx] { expectedEvents = append(expectedEvents, pubEvents.events) } diff --git a/libbeat/outputs/mode/single.go b/libbeat/outputs/mode/single.go index dd3c7b11f50..a4530fdf482 100644 --- a/libbeat/outputs/mode/single.go +++ b/libbeat/outputs/mode/single.go @@ -63,9 +63,10 @@ func (s *SingleConnectionMode) Close() error { // unavailable. On failure PublishEvents tries to reconnect. func (s *SingleConnectionMode) PublishEvents( signaler outputs.Signaler, + opts outputs.Options, events []common.MapStr, ) error { - return s.publish(signaler, func() (bool, bool) { + return s.publish(signaler, opts, func() (bool, bool) { for len(events) > 0 { var err error @@ -86,9 +87,10 @@ func (s *SingleConnectionMode) PublishEvents( // PublishEvent forwards a single event. On failure PublishEvent tries to reconnect. func (s *SingleConnectionMode) PublishEvent( signaler outputs.Signaler, + opts outputs.Options, event common.MapStr, ) error { - return s.publish(signaler, func() (bool, bool) { + return s.publish(signaler, opts, func() (bool, bool) { if err := s.conn.PublishEvent(event); err != nil { logp.Info("Error publishing event (retrying): %s", err) return false, false @@ -107,13 +109,14 @@ func (s *SingleConnectionMode) PublishEvent( // to maxAttempts send attempts without any progress might be executed. func (s *SingleConnectionMode) publish( signaler outputs.Signaler, + opts outputs.Options, send func() (ok bool, resetFail bool), ) error { fails := 0 var backoffCount uint var err error - for !s.closed && (s.maxAttempts == 0 || fails < s.maxAttempts) { + for !s.closed && (!opts.Guaranteed || s.maxAttempts == 0 || fails < s.maxAttempts) { ok := false resetFail := false @@ -135,7 +138,7 @@ func (s *SingleConnectionMode) publish( if resetFail { fails = 0 } - if s.maxAttempts > 0 && fails == s.maxAttempts { + if !opts.Guaranteed && (s.maxAttempts > 0 && fails == s.maxAttempts) { // max number of attempts reached break } diff --git a/libbeat/outputs/outputs.go b/libbeat/outputs/outputs.go index f76d0ec2c8d..d73907ef765 100644 --- a/libbeat/outputs/outputs.go +++ b/libbeat/outputs/outputs.go @@ -1,8 +1,6 @@ package outputs import ( - "time" - "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" ) @@ -35,9 +33,14 @@ type MothershipConfig struct { Worker int } +type Options struct { + Guaranteed bool +} + type Outputer interface { // Publish event - PublishEvent(trans Signaler, ts time.Time, event common.MapStr) error + + PublishEvent(trans Signaler, opts Options, event common.MapStr) error } type TopologyOutputer interface { @@ -52,7 +55,7 @@ type TopologyOutputer interface { // Outputers still might loop on events or use more efficient bulk-apis if present. type BulkOutputer interface { Outputer - BulkPublish(trans Signaler, ts time.Time, event []common.MapStr) error + BulkPublish(trans Signaler, opts Options, event []common.MapStr) error } type OutputBuilder interface { @@ -129,12 +132,12 @@ func CastBulkOutputer(out Outputer) BulkOutputer { func (b *bulkOutputAdapter) BulkPublish( signal Signaler, - ts time.Time, + opts Options, events []common.MapStr, ) error { signal = NewSplitSignaler(signal, len(events)) for _, evt := range events { - err := b.PublishEvent(signal, ts, evt) + err := b.PublishEvent(signal, opts, evt) if err != nil { return err } diff --git a/libbeat/outputs/redis/redis.go b/libbeat/outputs/redis/redis.go index adcedc911ea..5758a73ea05 100644 --- a/libbeat/outputs/redis/redis.go +++ b/libbeat/outputs/redis/redis.go @@ -252,17 +252,36 @@ func (out *redisOutput) UpdateLocalTopologyMap(conn redis.Conn) { func (out *redisOutput) PublishEvent( signal outputs.Signaler, - ts time.Time, + opts outputs.Options, event common.MapStr, ) error { - return out.BulkPublish(signal, ts, []common.MapStr{event}) + return out.BulkPublish(signal, opts, []common.MapStr{event}) } func (out *redisOutput) BulkPublish( signal outputs.Signaler, - ts time.Time, + opts outputs.Options, events []common.MapStr, ) error { + if !opts.Guaranteed { + err := out.doBulkPublish(events) + outputs.Signal(signal, err) + return err + } + + for { + err := out.doBulkPublish(events) + if err == nil { + outputs.SignalCompleted(signal) + return nil + } + + // TODO: add backoff + time.Sleep(1) + } +} + +func (out *redisOutput) doBulkPublish(events []common.MapStr) error { if !out.connected { logp.Debug("output_redis", "Droping pkt ...") return errors.New("Not connected") @@ -278,12 +297,10 @@ func (out *redisOutput) BulkPublish( jsonEvent, err := json.Marshal(event) if err != nil { logp.Err("Fail to convert the event to JSON: %s", err) - outputs.SignalCompleted(signal) return err } _, err = out.Conn.Do(command, out.Index, string(jsonEvent)) - outputs.Signal(signal, err) out.onFail(err) return err } @@ -296,20 +313,18 @@ func (out *redisOutput) BulkPublish( } err = out.Conn.Send(command, out.Index, string(jsonEvent)) if err != nil { - outputs.SignalFailed(signal, err) out.onFail(err) return err } } if err := out.Conn.Flush(); err != nil { - outputs.Signal(signal, err) out.onFail(err) return err } _, err := out.Conn.Receive() - outputs.Signal(signal, err) out.onFail(err) return err + } func (out *redisOutput) onFail(err error) { diff --git a/libbeat/publisher/client.go b/libbeat/publisher/client.go index ec07dcc3d27..0bfcb3d2c9e 100644 --- a/libbeat/publisher/client.go +++ b/libbeat/publisher/client.go @@ -4,6 +4,7 @@ import ( "expvar" "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/outputs" ) // Metrics that can retrieved through the expvar web interface. @@ -13,12 +14,12 @@ var ( // Client is used by beats to publish new events. type Client interface { - // PublishEvent publishes one event with given options. If Confirm option is set, + // PublishEvent publishes one event with given options. If Sync option is set, // PublishEvent will block until output plugins report success or failure state // being returned by this method. PublishEvent(event common.MapStr, opts ...ClientOption) bool - // PublishEvents publishes multiple events with given options. If Confirm + // PublishEvents publishes multiple events with given options. If Guaranteed // option is set, PublishEvent will block until output plugins report // success or failure state being returned by this method. PublishEvents(events []common.MapStr, opts ...ClientOption) bool @@ -34,46 +35,55 @@ type client struct { } // ClientOption allows API users to set additional options when publishing events. -type ClientOption func(option publishOptions) publishOptions +type ClientOption func(option context) context -// Confirm option will block the event publisher until event has been send and ACKed -// by output plugin or fail is reported. -func Confirm(o publishOptions) publishOptions { - o.confirm = true +// Guaranteed option will retry publishing the event, until send attempt have +// been ACKed by output plugin. +func Guaranteed(o context) context { + o.guaranteed = true return o } // Sync option will block the event publisher until an event has been ACKed by -// the output plugin. If output plugin signals failure, the client will retry -// until success is signaled. -func Sync(o publishOptions) publishOptions { - o.confirm = true +// the output plugin or failed. +func Sync(o context) context { o.sync = true return o } +func Signal(signaler outputs.Signaler) ClientOption { + return func(ctx context) context { + if ctx.signal == nil { + ctx.signal = signaler + } else { + ctx.signal = outputs.NewCompositeSignaler(ctx.signal, signaler) + } + return ctx + } +} + func (c *client) PublishEvent(event common.MapStr, opts ...ClientOption) bool { - options, client := c.getClient(opts) + ctx, client := c.getClient(opts) publishedEvents.Add(1) - return client.PublishEvent(context{publishOptions: options}, event) + return client.PublishEvent(ctx, event) } func (c *client) PublishEvents(events []common.MapStr, opts ...ClientOption) bool { - options, client := c.getClient(opts) + ctx, client := c.getClient(opts) publishedEvents.Add(int64(len(events))) - return client.PublishEvents(context{publishOptions: options}, events) + return client.PublishEvents(ctx, events) } -func (c *client) getClient(opts []ClientOption) (publishOptions, eventPublisher) { - var options publishOptions +func (c *client) getClient(opts []ClientOption) (context, eventPublisher) { + var ctx context for _, opt := range opts { - options = opt(options) + ctx = opt(ctx) } - if options.confirm { - return options, c.publisher.syncPublisher.client() + if ctx.sync { + return ctx, c.publisher.syncPublisher.client() } - return options, c.publisher.asyncPublisher.client() + return ctx, c.publisher.asyncPublisher.client() } // PublishEvent will publish the event on the channel. Options will be ignored. diff --git a/libbeat/publisher/client_test.go b/libbeat/publisher/client_test.go index 2990c892a57..9667925db97 100644 --- a/libbeat/publisher/client_test.go +++ b/libbeat/publisher/client_test.go @@ -18,8 +18,9 @@ func TestGetClient(t *testing.T) { }, } asyncClient := c.publisher.asyncPublisher.client() - confirmClient := c.publisher.syncPublisher.client() syncClient := c.publisher.syncPublisher.client() + guaranteedClient := asyncClient + guaranteedSyncClient := syncClient var testCases = []struct { in []ClientOption @@ -28,8 +29,8 @@ func TestGetClient(t *testing.T) { // Add new client options here: {[]ClientOption{}, asyncClient}, {[]ClientOption{Sync}, syncClient}, - {[]ClientOption{Confirm}, confirmClient}, - {[]ClientOption{Confirm, Sync}, syncClient}, + {[]ClientOption{Guaranteed}, guaranteedClient}, + {[]ClientOption{Guaranteed, Sync}, guaranteedSyncClient}, } for _, test := range testCases { diff --git a/libbeat/publisher/common_test.go b/libbeat/publisher/common_test.go index 76c6becc573..eeb0b8f0421 100644 --- a/libbeat/publisher/common_test.go +++ b/libbeat/publisher/common_test.go @@ -168,12 +168,12 @@ func (t *testPublisher) asyncPublishEvents(events []common.MapStr) bool { } func (t *testPublisher) syncPublishEvent(event common.MapStr) bool { - ctx := context{publishOptions: publishOptions{confirm: true}} + ctx := context{publishOptions: publishOptions{guaranteed: true}} return t.pub.syncPublisher.client().PublishEvent(ctx, event) } func (t *testPublisher) syncPublishEvents(events []common.MapStr) bool { - ctx := context{publishOptions: publishOptions{confirm: true}} + ctx := context{publishOptions: publishOptions{guaranteed: true}} return t.pub.syncPublisher.client().PublishEvents(ctx, events) } diff --git a/libbeat/publisher/output.go b/libbeat/publisher/output.go index 53d69bb23ed..1f2f6afacb0 100644 --- a/libbeat/publisher/output.go +++ b/libbeat/publisher/output.go @@ -1,7 +1,7 @@ package publisher import ( - "time" + "errors" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" @@ -15,6 +15,10 @@ type outputWorker struct { maxBulkSize int } +var ( + errSendFailed = errors.New("failed send attempt") +) + func newOutputWorker( config outputs.MothershipConfig, out outputs.Outputer, @@ -48,21 +52,7 @@ func (o *outputWorker) onMessage(m message) { func (o *outputWorker) onEvent(ctx *context, event common.MapStr) { debug("output worker: publish single event") - ts := time.Time(event["@timestamp"].(common.Time)).UTC() - - if !ctx.sync { - _ = o.out.PublishEvent(ctx.signal, ts, event) - return - } - - signal := outputs.NewSyncSignal() - for { - o.out.PublishEvent(signal, ts, event) - if signal.Wait() { - outputs.SignalCompleted(ctx.signal) - break - } - } + o.out.PublishEvent(ctx.signal, outputs.Options{ctx.guaranteed}, event) } func (o *outputWorker) onBulk(ctx *context, events []common.MapStr) { @@ -72,13 +62,8 @@ func (o *outputWorker) onBulk(ctx *context, events []common.MapStr) { return } - var sync *outputs.SyncSignal - if ctx.sync { - sync = outputs.NewSyncSignal() - } - if o.maxBulkSize < 0 || len(events) <= o.maxBulkSize { - o.sendBulk(sync, ctx, events) + o.sendBulk(ctx, events) return } @@ -90,29 +75,19 @@ func (o *outputWorker) onBulk(ctx *context, events []common.MapStr) { if sz > len(events) { sz = len(events) } - o.sendBulk(sync, ctx, events[:sz]) + o.sendBulk(ctx, events[:sz]) events = events[sz:] } } func (o *outputWorker) sendBulk( - sync *outputs.SyncSignal, ctx *context, events []common.MapStr, ) { debug("output worker: publish %v events", len(events)) - ts := time.Time(events[0]["@timestamp"].(common.Time)).UTC() - - if sync == nil { - err := o.out.BulkPublish(ctx.signal, ts, events) - if err != nil { - logp.Info("Error bulk publishing events: %s", err) - } - return - } - for done := false; !done; done = sync.Wait() { - o.out.BulkPublish(sync, ts, events) + err := o.out.BulkPublish(ctx.signal, outputs.Options{ctx.guaranteed}, events) + if err != nil { + logp.Info("Error bulk publishing events: %s", err) } - outputs.SignalCompleted(ctx.signal) } diff --git a/libbeat/publisher/output_test.go b/libbeat/publisher/output_test.go index 46b0f154cad..06e28114ad7 100644 --- a/libbeat/publisher/output_test.go +++ b/libbeat/publisher/output_test.go @@ -2,7 +2,6 @@ package publisher import ( "testing" - "time" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/outputs" @@ -18,7 +17,7 @@ var _ outputs.Outputer = &testOutputer{} // PublishEvent writes events to a channel then calls Completed on trans. // It always returns nil. -func (t *testOutputer) PublishEvent(trans outputs.Signaler, ts time.Time, +func (t *testOutputer) PublishEvent(trans outputs.Signaler, opts outputs.Options, event common.MapStr) error { t.events <- event outputs.SignalCompleted(trans) diff --git a/libbeat/publisher/publish.go b/libbeat/publisher/publish.go index 35d69bcdebe..3afc2321b8e 100644 --- a/libbeat/publisher/publish.go +++ b/libbeat/publisher/publish.go @@ -37,8 +37,8 @@ type context struct { } type publishOptions struct { - confirm bool - sync bool + guaranteed bool + sync bool } type TransactionalEventPublisher interface { diff --git a/libbeat/publisher/sync.go b/libbeat/publisher/sync.go index d561ce34b33..099b89a92d0 100644 --- a/libbeat/publisher/sync.go +++ b/libbeat/publisher/sync.go @@ -42,7 +42,15 @@ func (c syncClient) PublishEvents(ctx context, events []common.MapStr) bool { func (p *syncPublisher) forward(m message) bool { sync := outputs.NewSyncSignal() + signal := m.context.signal m.context.signal = sync p.send(m) - return sync.Wait() + if sync.Wait() { + outputs.SignalCompleted(signal) + return true + } + if signal != nil { + signal.Failed() + } + return false } diff --git a/winlogbeat/beat/winlogbeat.go b/winlogbeat/beat/winlogbeat.go index 8dbebbde63b..f8de9c93829 100644 --- a/winlogbeat/beat/winlogbeat.go +++ b/winlogbeat/beat/winlogbeat.go @@ -252,7 +252,7 @@ loop: // Publish events. numEvents := int64(len(events)) - ok := eb.client.PublishEvents(events, publisher.Sync) + ok := eb.client.PublishEvents(events, publisher.Sync, publisher.Guaranteed) if ok { publishedEvents.Add("total", numEvents) publishedEvents.Add(api.Name(), numEvents)