Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Publisher options cleanup #684

Merged
merged 1 commit into from
Jan 12, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ https://github.com/elastic/beats/compare/1.0.0...master[Check the HEAD diff]
==== Breaking changes

*Affecting all Beats*
- Some publisher options refactoring in libbeat {pull}684[684]

*Packetbeat*

Expand Down
2 changes: 1 addition & 1 deletion filebeat/beat/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func Publish(beat *beat.Beat, fb *Filebeat) {
pubEvents = append(pubEvents, event.ToMapStr())
}

beat.Events.PublishEvents(pubEvents, publisher.Sync)
beat.Events.PublishEvents(pubEvents, publisher.Sync, publisher.Guaranteed)

logp.Info("Events sent: %d", len(events))

Expand Down
6 changes: 4 additions & 2 deletions libbeat/outputs/console/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package console
import (
"encoding/json"
"os"
"time"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
Expand Down Expand Up @@ -47,7 +46,7 @@ func writeBuffer(buf []byte) error {

func (c *console) PublishEvent(
s outputs.Signaler,
ts time.Time,
opts outputs.Options,
event common.MapStr,
) error {
var jsonEvent []byte
Expand All @@ -74,6 +73,9 @@ func (c *console) PublishEvent(
outputs.SignalCompleted(s)
return nil
fail:
if opts.Guaranteed {
logp.Critical("Unable to publish events to console: %v", err)
}
outputs.SignalFailed(s, err)
return err
}
4 changes: 2 additions & 2 deletions libbeat/outputs/console/console_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ import (
"io"
"os"
"testing"
"time"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/outputs"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -47,7 +47,7 @@ func event(k, v string) common.MapStr {
func run(c *console, events ...common.MapStr) (string, error) {
return withStdout(func() {
for _, event := range events {
c.PublishEvent(nil, time.Now(), event)
c.PublishEvent(nil, outputs.Options{}, event)
}
})
}
Expand Down
2 changes: 1 addition & 1 deletion libbeat/outputs/elasticsearch/api_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestIndex(t *testing.T) {
}
_, resp, err := client.Index(index, "test", "1", params, body)
if err != nil {
t.Errorf("Index() returns error: %s", err)
t.Fatalf("Index() returns error: %s", err)
}
if !resp.Created {
t.Errorf("Index() fails: %s", resp)
Expand Down
8 changes: 4 additions & 4 deletions libbeat/outputs/elasticsearch/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,16 +180,16 @@ func makeClientFactory(

func (out *elasticsearchOutput) PublishEvent(
signaler outputs.Signaler,
ts time.Time,
opts outputs.Options,
event common.MapStr,
) error {
return out.mode.PublishEvent(signaler, event)
return out.mode.PublishEvent(signaler, opts, event)
}

func (out *elasticsearchOutput) BulkPublish(
trans outputs.Signaler,
ts time.Time,
opts outputs.Options,
events []common.MapStr,
) error {
return out.mode.PublishEvents(trans, events)
return out.mode.PublishEvents(trans, opts, events)
}
10 changes: 6 additions & 4 deletions libbeat/outputs/elasticsearch/output_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"github.com/elastic/beats/libbeat/outputs"
)

var testOptions = outputs.Options{}

func createElasticsearchConnection(flushInterval int, bulkSize int) elasticsearchOutput {
index := fmt.Sprintf("packetbeat-unittest-%d", os.Getpid())

Expand Down Expand Up @@ -117,7 +119,7 @@ func TestOneEvent(t *testing.T) {
},
})

err := output.PublishEvent(nil, ts, event)
err := output.PublishEvent(nil, testOptions, event)
if err != nil {
t.Errorf("Failed to publish the event: %s", err)
}
Expand Down Expand Up @@ -189,7 +191,7 @@ func TestEvents(t *testing.T) {
},
})

err := output.PublishEvent(nil, ts, event)
err := output.PublishEvent(nil, testOptions, event)
if err != nil {
t.Errorf("Failed to publish the event: %s", err)
}
Expand All @@ -199,7 +201,7 @@ func TestEvents(t *testing.T) {
r["response"] = 0
event["redis"] = r

err = output.PublishEvent(nil, ts, event)
err = output.PublishEvent(nil, testOptions, event)
if err != nil {
t.Errorf("Failed to publish the event: %s", err)
}
Expand Down Expand Up @@ -259,7 +261,7 @@ func testBulkWithParams(t *testing.T, output elasticsearchOutput) {
r["response"] = "value" + strconv.Itoa(i)
event["redis"] = r

err := output.PublishEvent(nil, ts, event)
err := output.PublishEvent(nil, testOptions, event)
if err != nil {
t.Errorf("Failed to publish the event: %s", err)
}
Expand Down
10 changes: 6 additions & 4 deletions libbeat/outputs/fileout/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package fileout

import (
"encoding/json"
"time"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
Expand Down Expand Up @@ -74,7 +73,7 @@ func (out *fileOutput) init(config *outputs.MothershipConfig, topology_expire in

func (out *fileOutput) PublishEvent(
trans outputs.Signaler,
ts time.Time,
opts outputs.Options,
event common.MapStr,
) error {
jsonEvent, err := json.Marshal(event)
Expand All @@ -88,9 +87,12 @@ func (out *fileOutput) PublishEvent(

err = out.rotator.WriteLine(jsonEvent)
if err != nil {
logp.Err("Error when writing line to file: %s", err)
if opts.Guaranteed {
logp.Critical("Unable to write events to file: %s", err)
} else {
logp.Err("Error when writing line to file: %s", err)
}
}

outputs.Signal(trans, err)
return err
}
8 changes: 4 additions & 4 deletions libbeat/outputs/logstash/logstash.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,24 +150,24 @@ func makeClientFactory(
// send/receive overhead per event for other implementors too.
func (lj *logstash) PublishEvent(
signaler outputs.Signaler,
ts time.Time,
opts outputs.Options,
event common.MapStr,
) error {
lj.addMeta(event)
return lj.mode.PublishEvent(signaler, event)
return lj.mode.PublishEvent(signaler, opts, event)
}

// BulkPublish implements the BulkOutputer interface pushing a bulk of events
// via lumberjack.
func (lj *logstash) BulkPublish(
trans outputs.Signaler,
ts time.Time,
opts outputs.Options,
events []common.MapStr,
) error {
for _, event := range events {
lj.addMeta(event)
}
return lj.mode.PublishEvents(trans, events)
return lj.mode.PublishEvents(trans, opts, events)
}

// addMeta adapts events to be compatible with logstash forwarer messages by renaming
Expand Down
14 changes: 7 additions & 7 deletions libbeat/outputs/logstash/logstash_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ func testSendMessageViaLogstash(t *testing.T, name string, tls bool) {
"type": "log",
"message": "hello world",
}
ls.PublishEvent(nil, time.Now(), event)
ls.PublishEvent(nil, testOptions, event)

// wait for logstash event flush + elasticsearch
waitUntilTrue(5*time.Second, checkIndex(ls, 1))
Expand Down Expand Up @@ -313,7 +313,7 @@ func testSendMultipleViaLogstash(t *testing.T, name string, tls bool) {
"type": "log",
"message": fmt.Sprintf("hello world - %v", i),
}
ls.PublishEvent(nil, time.Now(), event)
ls.PublishEvent(nil, testOptions, event)
}

// wait for logstash event flush + elasticsearch
Expand Down Expand Up @@ -384,7 +384,7 @@ func testSendMultipleBatchesViaLogstash(

for _, batch := range batches {
sig := outputs.NewSyncSignal()
ls.BulkPublish(sig, time.Now(), batch)
ls.BulkPublish(sig, testOptions, batch)
ok := sig.Wait()
assert.Equal(t, true, ok)
}
Expand Down Expand Up @@ -432,10 +432,10 @@ func testLogstashElasticOutputPluginCompatibleMessage(t *testing.T, name string,
"message": "hello world",
}

es.PublishEvent(nil, ts, event)
es.PublishEvent(nil, testOptions, event)
waitUntilTrue(timeout, checkIndex(es, 1))

ls.PublishEvent(nil, ts, event)
ls.PublishEvent(nil, testOptions, event)
waitUntilTrue(timeout, checkIndex(ls, 1))

// search value in logstash elasticsearch index
Expand Down Expand Up @@ -487,10 +487,10 @@ func testLogstashElasticOutputPluginBulkCompatibleMessage(t *testing.T, name str
"message": "hello world",
},
}
es.BulkPublish(nil, ts, events)
es.BulkPublish(nil, testOptions, events)
waitUntilTrue(timeout, checkIndex(es, 1))

ls.BulkPublish(nil, ts, events)
ls.BulkPublish(nil, testOptions, events)
waitUntilTrue(timeout, checkIndex(ls, 1))

// search value in logstash elasticsearch index
Expand Down
6 changes: 4 additions & 2 deletions libbeat/outputs/logstash/logstash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ type mockLSServer struct {
handshake func(net.Conn)
}

var testOptions = outputs.Options{}

func newMockTLSServer(t *testing.T, to time.Duration, cert string) *mockLSServer {
tcpListener, err := net.Listen("tcp", "localhost:0")
if err != nil {
Expand Down Expand Up @@ -394,7 +396,7 @@ func testConnectionType(
output := makeOutputer()

signal := outputs.NewSyncSignal()
output.PublishEvent(signal, time.Now(), testEvent())
output.PublishEvent(signal, testOptions, testEvent())
result.signal = signal.Wait()
}()

Expand Down Expand Up @@ -472,7 +474,7 @@ func TestLogstashInvalidTLS(t *testing.T) {
output := newTestLumberjackOutput(t, "", &config)

signal := outputs.NewSyncSignal()
output.PublishEvent(signal, time.Now(), testEvent())
output.PublishEvent(signal, testOptions, testEvent())
result.signal = signal.Wait()
}()

Expand Down
48 changes: 32 additions & 16 deletions libbeat/outputs/mode/balance.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,13 @@ func NewLoadBalancerMode(
maxAttempts int,
waitRetry, timeout, maxWaitRetry time.Duration,
) (*LoadBalancerMode, error) {

// maxAttempts signals infinite retry. Convert to -1, so attempts left and
// and infinite retry can be more easily distinguished by load balancer
if maxAttempts == 0 {
maxAttempts = -1
}

m := &LoadBalancerMode{
timeout: timeout,
maxWaitRetry: maxWaitRetry,
Expand All @@ -94,28 +101,33 @@ func (m *LoadBalancerMode) Close() error {
// PublishEvents forwards events to some load balancing worker.
func (m *LoadBalancerMode) PublishEvents(
signaler outputs.Signaler,
opts outputs.Options,
events []common.MapStr,
) error {
return m.publishEventsMessage(eventsMessage{
attemptsLeft: m.maxAttempts,
signaler: signaler,
events: events,
})
return m.publishEventsMessage(opts,
eventsMessage{signaler: signaler, events: events})
}

// PublishEvent forwards the event to some load balancing worker.
func (m *LoadBalancerMode) PublishEvent(
signaler outputs.Signaler,
opts outputs.Options,
event common.MapStr,
) error {
return m.publishEventsMessage(eventsMessage{
attemptsLeft: m.maxAttempts,
signaler: signaler,
event: event,
})
return m.publishEventsMessage(opts,
eventsMessage{signaler: signaler, event: event})
}

func (m *LoadBalancerMode) publishEventsMessage(msg eventsMessage) error {
func (m *LoadBalancerMode) publishEventsMessage(
opts outputs.Options,
msg eventsMessage,
) error {
maxAttempts := m.maxAttempts
if opts.Guaranteed {
maxAttempts = -1
}
msg.attemptsLeft = maxAttempts

if ok := m.forwardEvent(m.work, msg); !ok {
dropping(msg)
}
Expand Down Expand Up @@ -170,7 +182,9 @@ func (m *LoadBalancerMode) start(clients []ProtocolClient) {
func (m *LoadBalancerMode) onMessage(client ProtocolClient, msg eventsMessage) {
if msg.event != nil {
if err := client.PublishEvent(msg.event); err != nil {
msg.attemptsLeft--
if msg.attemptsLeft > 0 {
msg.attemptsLeft--
}
m.onFail(msg, err)
return
}
Expand All @@ -184,10 +198,12 @@ func (m *LoadBalancerMode) onMessage(client ProtocolClient, msg eventsMessage) {

events, err = client.PublishEvents(events)
if err != nil {
msg.attemptsLeft--
if msg.attemptsLeft > 0 {
msg.attemptsLeft--
}

// reset attempt count if subset of messages has been processed
if len(events) < total {
if len(events) < total && msg.attemptsLeft >= 0 {
msg.attemptsLeft = m.maxAttempts
}

Expand All @@ -198,7 +214,7 @@ func (m *LoadBalancerMode) onMessage(client ProtocolClient, msg eventsMessage) {
return
}

if m.maxAttempts > 0 && msg.attemptsLeft <= 0 {
if m.maxAttempts > 0 && msg.attemptsLeft == 0 {
// no more attempts left => drop
dropping(msg)
return
Expand Down Expand Up @@ -239,7 +255,7 @@ func (m *LoadBalancerMode) forwardEvent(
ch chan eventsMessage,
msg eventsMessage,
) bool {
if m.maxAttempts == 0 {
if msg.attemptsLeft < 0 {
select {
case ch <- msg:
return true
Expand Down
Loading