diff --git a/libbeat/monitoring/report/elasticsearch/client.go b/libbeat/monitoring/report/elasticsearch/client.go index 10fd613c9c4..4da37db60a0 100644 --- a/libbeat/monitoring/report/elasticsearch/client.go +++ b/libbeat/monitoring/report/elasticsearch/client.go @@ -152,3 +152,7 @@ func (c *publishClient) Publish(batch publisher.Batch) error { func (c *publishClient) Test(d testing.Driver) { c.es.Test(d) } + +func (c *publishClient) String() string { + return "publish(" + c.es.String() + ")" +} diff --git a/libbeat/outputs/backoff.go b/libbeat/outputs/backoff.go index 085b1b6c842..1484bc42c00 100644 --- a/libbeat/outputs/backoff.go +++ b/libbeat/outputs/backoff.go @@ -77,3 +77,7 @@ func (b *backoffClient) Test(d testing.Driver) { c.Test(d) } + +func (b *backoffClient) String() string { + return "backoff(" + b.client.String() + ")" +} diff --git a/libbeat/outputs/console/console.go b/libbeat/outputs/console/console.go index ab02e92aca7..7958b8da6bd 100644 --- a/libbeat/outputs/console/console.go +++ b/libbeat/outputs/console/console.go @@ -160,3 +160,7 @@ func (c *console) writeBuffer(buf []byte) error { } return nil } + +func (c *console) String() string { + return "console" +} diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index 5e822f9df17..2bd1c06ab37 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -664,6 +664,10 @@ func (client *Client) Test(d testing.Driver) { }) } +func (client *Client) String() string { + return "elasticsearch(" + client.Connection.URL + ")" +} + // Connect connects the client. func (conn *Connection) Connect() error { var err error diff --git a/libbeat/outputs/failover.go b/libbeat/outputs/failover.go index 6f24e3a2eb3..99d379a3943 100644 --- a/libbeat/outputs/failover.go +++ b/libbeat/outputs/failover.go @@ -21,6 +21,7 @@ import ( "errors" "fmt" "math/rand" + "strings" "github.com/elastic/beats/libbeat/publisher" "github.com/elastic/beats/libbeat/testing" @@ -109,3 +110,13 @@ func (f *failoverClient) Test(d testing.Driver) { }) } } + +func (f *failoverClient) String() string { + names := make([]string, len(f.clients)) + + for i, client := range f.clients { + names[i] = client.String() + } + + return "failover(" + strings.Join(names, ",") + ")" +} diff --git a/libbeat/outputs/fileout/file.go b/libbeat/outputs/fileout/file.go index 049f6af5a2d..75ea3092463 100644 --- a/libbeat/outputs/fileout/file.go +++ b/libbeat/outputs/fileout/file.go @@ -35,6 +35,7 @@ func init() { } type fileOutput struct { + filePath string beat beat.Info observer outputs.Observer rotator *file.Rotator @@ -74,6 +75,8 @@ func (out *fileOutput) init(beat beat.Info, c config) error { path = filepath.Join(c.Path, out.beat.Beat) } + out.filePath = path + var err error out.rotator, err = file.NewFileRotator( path, @@ -149,3 +152,7 @@ func (out *fileOutput) Publish( return nil } + +func (out *fileOutput) String() string { + return "file(" + out.filePath + ")" +} diff --git a/libbeat/outputs/kafka/client.go b/libbeat/outputs/kafka/client.go index c20b4bd3f86..cae9c262eb5 100644 --- a/libbeat/outputs/kafka/client.go +++ b/libbeat/outputs/kafka/client.go @@ -20,6 +20,7 @@ package kafka import ( "errors" "fmt" + "strings" "sync" "sync/atomic" @@ -141,6 +142,10 @@ func (c *client) Publish(batch publisher.Batch) error { return nil } +func (c *client) String() string { + return "kafka(" + strings.Join(c.hosts, ",") + ")" +} + func (c *client) getEventMessage(data *publisher.Event) (*message, error) { event := &data.Content msg := &message{partition: -1, data: *data} diff --git a/libbeat/outputs/logstash/async.go b/libbeat/outputs/logstash/async.go index 2d080a50dd1..44950853616 100644 --- a/libbeat/outputs/logstash/async.go +++ b/libbeat/outputs/logstash/async.go @@ -169,6 +169,10 @@ func (c *asyncClient) Publish(batch publisher.Batch) error { return nil } +func (c *asyncClient) String() string { + return "async(" + c.Client.String() + ")" +} + func (c *asyncClient) publishWindowed( ref *msgRef, events []publisher.Event, diff --git a/libbeat/outputs/outputs.go b/libbeat/outputs/outputs.go index 2f771049537..aff2bf14574 100644 --- a/libbeat/outputs/outputs.go +++ b/libbeat/outputs/outputs.go @@ -35,6 +35,8 @@ type Client interface { // the publisher pipeline. The publisher pipeline (if configured by the output // factory) will take care of retrying/dropping events. Publish(publisher.Batch) error + + String() string } // NetworkClient defines the required client capabilities for network based diff --git a/libbeat/outputs/redis/backoff.go b/libbeat/outputs/redis/backoff.go index b3470725a92..7c63574d92f 100644 --- a/libbeat/outputs/redis/backoff.go +++ b/libbeat/outputs/redis/backoff.go @@ -112,3 +112,7 @@ func (b *backoffClient) resetFail() { b.reason = failNone b.backoff.Reset() } + +func (b *backoffClient) String() string { + return b.client.String() +} diff --git a/libbeat/outputs/redis/client.go b/libbeat/outputs/redis/client.go index df3e14b7b44..fc395070e27 100644 --- a/libbeat/outputs/redis/client.go +++ b/libbeat/outputs/redis/client.go @@ -152,6 +152,10 @@ func (c *client) Publish(batch publisher.Batch) error { return err } +func (c *client) String() string { + return "redis(" + c.Client.String() + ")" +} + func (c *client) makePublish( conn redis.Conn, ) (publishFn, error) { diff --git a/libbeat/outputs/transport/client.go b/libbeat/outputs/transport/client.go index d337d5f4d82..8313b592e10 100644 --- a/libbeat/outputs/transport/client.go +++ b/libbeat/outputs/transport/client.go @@ -247,3 +247,7 @@ func (c *Client) Test(d testing.Driver) { d.Fatal("talk to server", err) }) } + +func (c *Client) String() string { + return c.network + "://" + c.host +} diff --git a/libbeat/publisher/pipeline/output.go b/libbeat/publisher/pipeline/output.go index 4a7b59c9f06..60ed3519ae0 100644 --- a/libbeat/publisher/pipeline/output.go +++ b/libbeat/publisher/pipeline/output.go @@ -77,21 +77,33 @@ func (w *netClientWorker) Close() error { func (w *netClientWorker) run() { for !w.closed.Load() { + reconnectAttempts := 0 + // start initial connect loop from first batch, but return // batch to pipeline for other outputs to catch up while we're trying to connect for batch := range w.qu { batch.Cancelled() if w.closed.Load() { + logp.Info("Closed connection to %v", w.client) return } + if reconnectAttempts > 0 { + logp.Info("Attempting to reconnect to %v with %d reconnect attempt(s)", w.client, reconnectAttempts) + } else { + logp.Info("Connecting to %v", w.client) + } + err := w.client.Connect() if err != nil { - logp.Err("Failed to connect: %v", err) + logp.Err("Failed to connect to %v: %v", w.client, err) + reconnectAttempts++ continue } + logp.Info("Connection to %v established", w.client) + reconnectAttempts = 0 break } diff --git a/libbeat/publisher/pipeline/stress/out.go b/libbeat/publisher/pipeline/stress/out.go index 6ca2b09fdd6..5bc72ed33cd 100644 --- a/libbeat/publisher/pipeline/stress/out.go +++ b/libbeat/publisher/pipeline/stress/out.go @@ -106,3 +106,7 @@ func (t *testOutput) Publish(batch publisher.Batch) error { return nil } + +func (t *testOutput) String() string { + return "test" +}