From 2e95ecb9fa643a5742695fb1af15148c9d51a9e6 Mon Sep 17 00:00:00 2001 From: urso Date: Tue, 7 Jul 2020 22:13:54 +0200 Subject: [PATCH 1/5] Convert cloudfoundry input to v2 --- x-pack/filebeat/input/cloudfoundry/input.go | 58 ++++++------ .../cloudfoundry/input_integration_test.go | 70 +++++--------- x-pack/filebeat/input/cloudfoundry/v1.go | 91 +++++++----------- x-pack/filebeat/input/cloudfoundry/v2.go | 92 ++++++------------- .../filebeat/input/default-inputs/inputs.go | 2 + .../common/cloudfoundry/rlplistener.go | 4 + 6 files changed, 120 insertions(+), 197 deletions(-) diff --git a/x-pack/filebeat/input/cloudfoundry/input.go b/x-pack/filebeat/input/cloudfoundry/input.go index fc38d85fc34..036a61b9d1e 100644 --- a/x-pack/filebeat/input/cloudfoundry/input.go +++ b/x-pack/filebeat/input/cloudfoundry/input.go @@ -6,49 +6,51 @@ package cloudfoundry import ( "fmt" + "time" - "github.com/elastic/beats/v7/filebeat/channel" - "github.com/elastic/beats/v7/filebeat/input" + v2 "github.com/elastic/beats/v7/filebeat/input/v2" + stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless" + "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/beats/v7/libbeat/common/cfgwarn" - "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/libbeat/feature" "github.com/elastic/beats/v7/x-pack/libbeat/common/cloudfoundry" ) -func init() { - err := input.Register("cloudfoundry", NewInput) - if err != nil { - panic(err) - } +type cloudfoundryEvent interface { + Timestamp() time.Time + ToFields() common.MapStr } -// NewInput creates a new udp input -func NewInput( - cfg *common.Config, - outlet channel.Connector, - context input.Context, -) (input.Input, error) { - cfgwarn.Beta("The cloudfoundry input is beta") - - log := logp.NewLogger("cloudfoundry") - - out, err := outlet.Connect(cfg) - if err != nil { - return nil, err +func Plugin() v2.Plugin { + return v2.Plugin{ + Name: "cloudfoundry", + Stability: feature.Beta, + Deprecated: false, + Info: "collect logs from cloudfoundry loggregator", + Manager: stateless.NewInputManager(configure), } +} - var conf cloudfoundry.Config - if err = cfg.Unpack(&conf); err != nil { +func configure(cfg *common.Config) (stateless.Input, error) { + config := cloudfoundry.Config{} + if err := cfg.Unpack(&config); err != nil { return nil, err } - switch conf.Version { + switch config.Version { case cloudfoundry.ConsumerVersionV1: - return newInputV1(log, conf, out, context) + return configureV1(config) case cloudfoundry.ConsumerVersionV2: - return newInputV2(log, conf, out, context) + return configureV2(config) default: - return nil, fmt.Errorf("not supported consumer version: %s", conf.Version) + return nil, fmt.Errorf("not supported consumer version: %s", config.Version) + } +} + +func createEvent(evt cloudfoundryEvent) beat.Event { + return beat.Event{ + Timestamp: evt.Timestamp(), + Fields: evt.ToFields(), } } diff --git a/x-pack/filebeat/input/cloudfoundry/input_integration_test.go b/x-pack/filebeat/input/cloudfoundry/input_integration_test.go index 736acf778fd..700e5888c2f 100644 --- a/x-pack/filebeat/input/cloudfoundry/input_integration_test.go +++ b/x-pack/filebeat/input/cloudfoundry/input_integration_test.go @@ -11,16 +11,17 @@ import ( "context" "crypto/tls" "net/http" + "sync" "testing" "time" "github.com/stretchr/testify/require" - "github.com/elastic/beats/v7/filebeat/channel" - "github.com/elastic/beats/v7/filebeat/input" - "github.com/elastic/beats/v7/libbeat/beat" + v2 "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" + pubtest "github.com/elastic/beats/v7/libbeat/publisher/testing" + "github.com/elastic/beats/v7/libbeat/tests/resources" cftest "github.com/elastic/beats/v7/x-pack/libbeat/common/cloudfoundry/test" ) @@ -37,69 +38,46 @@ func TestInput(t *testing.T) { } func testInput(t *testing.T, version string) { + defer resources.NewGoroutinesChecker().Check(t) + config := common.MustNewConfigFrom(cftest.GetConfigFromEnv(t)) config.SetString("version", -1, version) + input, err := Plugin().Manager.Create(config) + require.NoError(t, err) + + var wg sync.WaitGroup + defer wg.Wait() + ctx, cancel := context.WithCancel(context.Background()) defer cancel() + // Ensure that there is something happening in the firehose apiAddress, err := config.String("api_address", -1) require.NoError(t, err) - - // Ensure that there is something happening in the firehose go makeApiRequests(t, ctx, apiAddress) - events := make(chan beat.Event) - connector := channel.ConnectorFunc(func(*common.Config, beat.ClientConfig) (channel.Outleter, error) { - return newOutleter(events), nil - }) - - inputCtx := input.Context{Done: make(chan struct{})} + client := pubtest.NewChanClient(0) - input, err := NewInput(config, connector, inputCtx) - require.NoError(t, err) + wg.Add(1) + go func() { + defer wg.Done() - go input.Run() - defer input.Stop() + inputCtx := v2.Context{ + Logger: logp.NewLogger("test"), + Cancelation: ctx, + } + input.Run(inputCtx, pubtest.ConstClient(client)) + }() select { - case e := <-events: + case e := <-client.Channel: t.Logf("Event received: %+v", e) case <-time.After(10 * time.Second): t.Fatal("timeout waiting for events") } } -type outleter struct { - events chan<- beat.Event - done chan struct{} -} - -func newOutleter(events chan<- beat.Event) *outleter { - return &outleter{ - events: events, - done: make(chan struct{}), - } -} - -func (o *outleter) Close() error { - close(o.done) - return nil -} - -func (o *outleter) Done() <-chan struct{} { - return o.done -} - -func (o *outleter) OnEvent(e beat.Event) bool { - select { - case o.events <- e: - return true - default: - return false - } -} - func makeApiRequests(t *testing.T, ctx context.Context, address string) { client := &http.Client{ Transport: &http.Transport{ diff --git a/x-pack/filebeat/input/cloudfoundry/v1.go b/x-pack/filebeat/input/cloudfoundry/v1.go index c1248545369..d35b67a4096 100644 --- a/x-pack/filebeat/input/cloudfoundry/v1.go +++ b/x-pack/filebeat/input/cloudfoundry/v1.go @@ -5,83 +5,56 @@ package cloudfoundry import ( - "sync" - "github.com/pkg/errors" - "github.com/elastic/beats/v7/filebeat/channel" - "github.com/elastic/beats/v7/filebeat/harvester" - "github.com/elastic/beats/v7/filebeat/input" - "github.com/elastic/beats/v7/libbeat/beat" - "github.com/elastic/beats/v7/libbeat/logp" + v2 "github.com/elastic/beats/v7/filebeat/input/v2" + stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless" "github.com/elastic/beats/v7/x-pack/libbeat/common/cloudfoundry" + "github.com/elastic/go-concert/ctxtool" ) -// InputV1 defines a udp input to receive event on a specific host:port. -type InputV1 struct { - sync.Mutex - consumer *cloudfoundry.DopplerConsumer - started bool - log *logp.Logger - outlet channel.Outleter +// inputV1 defines a udp input to receive event on a specific host:port. +type inputV1 struct { + config cloudfoundry.Config +} + +func configureV1(config cloudfoundry.Config) (*inputV1, error) { + return &inputV1{config: config}, nil } -func newInputV1(log *logp.Logger, conf cloudfoundry.Config, out channel.Outleter, context input.Context) (*InputV1, error) { - hub := cloudfoundry.NewHub(&conf, "filebeat", log) - forwarder := harvester.NewForwarder(out) +func (i *inputV1) Name() string { return "cloudfoundry-v1" } + +func (i *inputV1) Test(_ v2.TestContext) error { + // XXX: try to connect, but don't consume + return nil +} + +func (i *inputV1) Run(ctx v2.Context, publisher stateless.Publisher) error { + log := ctx.Logger + hub := cloudfoundry.NewHub(&i.config, "filebeat", log) + + log.Info("Starting cloudfoundry input") + defer log.Info("Stopped cloudfoundry input") callbacks := cloudfoundry.DopplerCallbacks{ Log: func(evt cloudfoundry.Event) { - forwarder.Send(beat.Event{ - Timestamp: evt.Timestamp(), - Fields: evt.ToFields(), - }) + publisher.Publish(createEvent(evt)) }, Error: func(evt cloudfoundry.EventError) { - forwarder.Send(beat.Event{ - Timestamp: evt.Timestamp(), - Fields: evt.ToFields(), - }) + publisher.Publish(createEvent(&evt)) }, } consumer, err := hub.DopplerConsumer(callbacks) if err != nil { - return nil, errors.Wrapf(err, "initializing doppler consumer") + return errors.Wrapf(err, "initializing doppler consumer") } - return &InputV1{ - outlet: out, - consumer: consumer, - started: false, - log: log, - }, nil -} - -// Run starts the consumer of cloudfoundry events -func (p *InputV1) Run() { - p.Lock() - defer p.Unlock() - if !p.started { - p.log.Info("starting cloudfoundry input") - p.consumer.Run() - p.started = true - } -} - -// Stop stops cloudfoundry doppler consumer -func (p *InputV1) Stop() { - defer p.outlet.Close() - p.Lock() - defer p.Unlock() - - p.log.Info("stopping cloudfoundry input") - p.consumer.Stop() - p.started = false -} + _, cancel := ctxtool.WithFunc(ctxtool.FromCanceller(ctx.Cancelation), func() { + consumer.Stop() + }) + defer cancel() -// Wait waits for the input to finalize, and stops it -func (p *InputV1) Wait() { - p.Stop() - p.consumer.Wait() + consumer.Run() + return nil } diff --git a/x-pack/filebeat/input/cloudfoundry/v2.go b/x-pack/filebeat/input/cloudfoundry/v2.go index 0d21d361f9c..251f4f7f8a8 100644 --- a/x-pack/filebeat/input/cloudfoundry/v2.go +++ b/x-pack/filebeat/input/cloudfoundry/v2.go @@ -5,86 +5,50 @@ package cloudfoundry import ( - "context" - "sync" - - "github.com/elastic/beats/v7/filebeat/channel" - "github.com/elastic/beats/v7/filebeat/harvester" - "github.com/elastic/beats/v7/filebeat/input" - "github.com/elastic/beats/v7/libbeat/beat" - "github.com/elastic/beats/v7/libbeat/logp" + v2 "github.com/elastic/beats/v7/filebeat/input/v2" + stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless" "github.com/elastic/beats/v7/x-pack/libbeat/common/cloudfoundry" + "github.com/elastic/go-concert/ctxtool" ) -// InputV2 defines a Cloudfoundry input that uses the consumer V2 API -type InputV2 struct { - sync.Mutex - listener *cloudfoundry.RlpListener - started bool - log *logp.Logger - outlet channel.Outleter +// inputV2 defines a Cloudfoundry input that uses the consumer V2 API +type inputV2 struct { + config cloudfoundry.Config +} + +func configureV2(config cloudfoundry.Config) (*inputV2, error) { + return &inputV2{config: config}, nil +} + +func (i *inputV2) Name() string { return "cloudfoundry-v2" } + +func (i *inputV2) Test(_ v2.TestContext) error { + // XXX: try to connect, but don't consume + return nil } -func newInputV2(log *logp.Logger, conf cloudfoundry.Config, out channel.Outleter, context input.Context) (*InputV2, error) { - hub := cloudfoundry.NewHub(&conf, "filebeat", log) - forwarder := harvester.NewForwarder(out) +func (i *inputV2) Run(ctx v2.Context, publisher stateless.Publisher) error { + log := ctx.Logger + hub := cloudfoundry.NewHub(&i.config, "filebeat", log) + callbacks := cloudfoundry.RlpListenerCallbacks{ HttpAccess: func(evt *cloudfoundry.EventHttpAccess) { - forwarder.Send(beat.Event{ - Timestamp: evt.Timestamp(), - Fields: evt.ToFields(), - }) + publisher.Publish(createEvent(evt)) }, Log: func(evt *cloudfoundry.EventLog) { - forwarder.Send(beat.Event{ - Timestamp: evt.Timestamp(), - Fields: evt.ToFields(), - }) + publisher.Publish(createEvent(evt)) }, Error: func(evt *cloudfoundry.EventError) { - forwarder.Send(beat.Event{ - Timestamp: evt.Timestamp(), - Fields: evt.ToFields(), - }) + publisher.Publish(createEvent(evt)) }, } listener, err := hub.RlpListener(callbacks) if err != nil { - return nil, err + return err } - return &InputV2{ - outlet: out, - listener: listener, - started: false, - log: log, - }, nil -} - -// Run starts the listener of cloudfoundry events -func (p *InputV2) Run() { - p.Lock() - defer p.Unlock() - - if !p.started { - p.log.Info("starting cloudfoundry input") - p.listener.Start(context.TODO()) - p.started = true - } -} - -// Stop stops cloudfoundry listener -func (p *InputV2) Stop() { - defer p.outlet.Close() - p.Lock() - defer p.Unlock() - - p.log.Info("stopping cloudfoundry input") - p.listener.Stop() - p.started = false -} -// Wait waits for the input to finalize, and stops it -func (p *InputV2) Wait() { - p.Stop() + listener.Start(ctxtool.FromCanceller(ctx.Cancelation)) + listener.Wait() + return nil } diff --git a/x-pack/filebeat/input/default-inputs/inputs.go b/x-pack/filebeat/input/default-inputs/inputs.go index 0af51f2c18a..afac3c2e61c 100644 --- a/x-pack/filebeat/input/default-inputs/inputs.go +++ b/x-pack/filebeat/input/default-inputs/inputs.go @@ -10,6 +10,7 @@ import ( v2 "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/x-pack/filebeat/input/cloudfoundry" "github.com/elastic/beats/v7/x-pack/filebeat/input/o365audit" ) @@ -23,5 +24,6 @@ func Init(info beat.Info, log *logp.Logger, store beater.StateStore) []v2.Plugin func xpackInputs(info beat.Info, log *logp.Logger, store beater.StateStore) []v2.Plugin { return []v2.Plugin{ o365audit.Plugin(log, store), + cloudfoundry.Plugin(), } } diff --git a/x-pack/libbeat/common/cloudfoundry/rlplistener.go b/x-pack/libbeat/common/cloudfoundry/rlplistener.go index f542f8eac7e..2448ddd669d 100644 --- a/x-pack/libbeat/common/cloudfoundry/rlplistener.go +++ b/x-pack/libbeat/common/cloudfoundry/rlplistener.go @@ -110,6 +110,10 @@ func (c *RlpListener) Stop() { c.wg.Wait() } +func (c *RlpListener) Wait() { + c.wg.Wait() +} + // getSelectors returns the server side selectors based on the callbacks defined on the listener. func (c *RlpListener) getSelectors() []*loggregator_v2.Selector { selectors := make([]*loggregator_v2.Selector, 0) From 926a3bdfb0cc647bc7e5e9962be49b9c397f2f5e Mon Sep 17 00:00:00 2001 From: urso Date: Tue, 7 Jul 2020 22:37:09 +0200 Subject: [PATCH 2/5] remove cloudfoundry from include list --- x-pack/filebeat/include/list.go | 1 - 1 file changed, 1 deletion(-) diff --git a/x-pack/filebeat/include/list.go b/x-pack/filebeat/include/list.go index ae739d07510..7d20d33952d 100644 --- a/x-pack/filebeat/include/list.go +++ b/x-pack/filebeat/include/list.go @@ -10,7 +10,6 @@ import ( // Import packages that need to register themselves. _ "github.com/elastic/beats/v7/x-pack/filebeat/input/awscloudwatch" _ "github.com/elastic/beats/v7/x-pack/filebeat/input/azureeventhub" - _ "github.com/elastic/beats/v7/x-pack/filebeat/input/cloudfoundry" _ "github.com/elastic/beats/v7/x-pack/filebeat/input/googlepubsub" _ "github.com/elastic/beats/v7/x-pack/filebeat/input/http_endpoint" _ "github.com/elastic/beats/v7/x-pack/filebeat/input/httpjson" From dac45383c9b2e1e12fef63525a8dffeee073720e Mon Sep 17 00:00:00 2001 From: urso Date: Wed, 8 Jul 2020 17:17:45 +0200 Subject: [PATCH 3/5] review --- .../input/cloudfoundry/input_integration_test.go | 3 --- x-pack/filebeat/input/cloudfoundry/v1.go | 13 ++++++++----- x-pack/libbeat/common/cloudfoundry/rlplistener.go | 2 +- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/x-pack/filebeat/input/cloudfoundry/input_integration_test.go b/x-pack/filebeat/input/cloudfoundry/input_integration_test.go index 700e5888c2f..5320dcc7386 100644 --- a/x-pack/filebeat/input/cloudfoundry/input_integration_test.go +++ b/x-pack/filebeat/input/cloudfoundry/input_integration_test.go @@ -21,7 +21,6 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" pubtest "github.com/elastic/beats/v7/libbeat/publisher/testing" - "github.com/elastic/beats/v7/libbeat/tests/resources" cftest "github.com/elastic/beats/v7/x-pack/libbeat/common/cloudfoundry/test" ) @@ -38,8 +37,6 @@ func TestInput(t *testing.T) { } func testInput(t *testing.T, version string) { - defer resources.NewGoroutinesChecker().Check(t) - config := common.MustNewConfigFrom(cftest.GetConfigFromEnv(t)) config.SetString("version", -1, version) diff --git a/x-pack/filebeat/input/cloudfoundry/v1.go b/x-pack/filebeat/input/cloudfoundry/v1.go index d35b67a4096..ffc5ccd4a75 100644 --- a/x-pack/filebeat/input/cloudfoundry/v1.go +++ b/x-pack/filebeat/input/cloudfoundry/v1.go @@ -24,9 +24,10 @@ func configureV1(config cloudfoundry.Config) (*inputV1, error) { func (i *inputV1) Name() string { return "cloudfoundry-v1" } -func (i *inputV1) Test(_ v2.TestContext) error { - // XXX: try to connect, but don't consume - return nil +func (i *inputV1) Test(ctx v2.TestContext) error { + hub := cloudfoundry.NewHub(&i.config, "filebeat", ctx.Logger) + _, err := hub.Client() + return err } func (i *inputV1) Run(ctx v2.Context, publisher stateless.Publisher) error { @@ -50,11 +51,13 @@ func (i *inputV1) Run(ctx v2.Context, publisher stateless.Publisher) error { return errors.Wrapf(err, "initializing doppler consumer") } - _, cancel := ctxtool.WithFunc(ctxtool.FromCanceller(ctx.Cancelation), func() { - consumer.Stop() + stopCtx, cancel := ctxtool.WithFunc(ctxtool.FromCanceller(ctx.Cancelation), func() { + // wait stops the consumer and waits for all internal go-routines to be stopped. + consumer.Wait() }) defer cancel() consumer.Run() + <-stopCtx.Done() return nil } diff --git a/x-pack/libbeat/common/cloudfoundry/rlplistener.go b/x-pack/libbeat/common/cloudfoundry/rlplistener.go index 2448ddd669d..e80db747c8e 100644 --- a/x-pack/libbeat/common/cloudfoundry/rlplistener.go +++ b/x-pack/libbeat/common/cloudfoundry/rlplistener.go @@ -66,8 +66,8 @@ func (c *RlpListener) Start(ctx context.Context) { } es := rlpClient.Stream(ctx, l) + c.wg.Add(1) go func() { - c.wg.Add(1) defer c.wg.Done() for { select { From 118ed70041c70a0b11ec183f977ed287ac079d6b Mon Sep 17 00:00:00 2001 From: urso Date: Thu, 9 Jul 2020 13:13:30 +0200 Subject: [PATCH 4/5] Do not block the client on shutdown --- .../cloudfoundry/input_integration_test.go | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/x-pack/filebeat/input/cloudfoundry/input_integration_test.go b/x-pack/filebeat/input/cloudfoundry/input_integration_test.go index 5320dcc7386..81d2991e5aa 100644 --- a/x-pack/filebeat/input/cloudfoundry/input_integration_test.go +++ b/x-pack/filebeat/input/cloudfoundry/input_integration_test.go @@ -18,6 +18,7 @@ import ( "github.com/stretchr/testify/require" v2 "github.com/elastic/beats/v7/filebeat/input/v2" + "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" pubtest "github.com/elastic/beats/v7/libbeat/publisher/testing" @@ -54,7 +55,19 @@ func testInput(t *testing.T, version string) { require.NoError(t, err) go makeApiRequests(t, ctx, apiAddress) - client := pubtest.NewChanClient(0) + ch := make(chan beat.Event) + client := &pubtest.FakeClient{ + PublishFunc: func(evt beat.Event) { + if ctx.Err() != nil { + return + } + + select { + case ch <- evt: + case <-ctx.Done(): + } + }, + } wg.Add(1) go func() { @@ -68,7 +81,7 @@ func testInput(t *testing.T, version string) { }() select { - case e := <-client.Channel: + case e := <-ch: t.Logf("Event received: %+v", e) case <-time.After(10 * time.Second): t.Fatal("timeout waiting for events") From a6e93f80b4c1ef9f89f8cd8e2f1cec52668feb0c Mon Sep 17 00:00:00 2001 From: urso Date: Thu, 9 Jul 2020 14:10:01 +0200 Subject: [PATCH 5/5] Add Test for v2 input as well --- x-pack/filebeat/input/cloudfoundry/v2.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/x-pack/filebeat/input/cloudfoundry/v2.go b/x-pack/filebeat/input/cloudfoundry/v2.go index 251f4f7f8a8..a8257e29a76 100644 --- a/x-pack/filebeat/input/cloudfoundry/v2.go +++ b/x-pack/filebeat/input/cloudfoundry/v2.go @@ -22,9 +22,10 @@ func configureV2(config cloudfoundry.Config) (*inputV2, error) { func (i *inputV2) Name() string { return "cloudfoundry-v2" } -func (i *inputV2) Test(_ v2.TestContext) error { - // XXX: try to connect, but don't consume - return nil +func (i *inputV2) Test(ctx v2.TestContext) error { + hub := cloudfoundry.NewHub(&i.config, "filebeat", ctx.Logger) + _, err := hub.Client() + return err } func (i *inputV2) Run(ctx v2.Context, publisher stateless.Publisher) error {