diff --git a/x-pack/filebeat/include/list.go b/x-pack/filebeat/include/list.go index ae739d07510..7d20d33952d 100644 --- a/x-pack/filebeat/include/list.go +++ b/x-pack/filebeat/include/list.go @@ -10,7 +10,6 @@ import ( // Import packages that need to register themselves. _ "github.com/elastic/beats/v7/x-pack/filebeat/input/awscloudwatch" _ "github.com/elastic/beats/v7/x-pack/filebeat/input/azureeventhub" - _ "github.com/elastic/beats/v7/x-pack/filebeat/input/cloudfoundry" _ "github.com/elastic/beats/v7/x-pack/filebeat/input/googlepubsub" _ "github.com/elastic/beats/v7/x-pack/filebeat/input/http_endpoint" _ "github.com/elastic/beats/v7/x-pack/filebeat/input/httpjson" diff --git a/x-pack/filebeat/input/cloudfoundry/input.go b/x-pack/filebeat/input/cloudfoundry/input.go index fc38d85fc34..036a61b9d1e 100644 --- a/x-pack/filebeat/input/cloudfoundry/input.go +++ b/x-pack/filebeat/input/cloudfoundry/input.go @@ -6,49 +6,51 @@ package cloudfoundry import ( "fmt" + "time" - "github.com/elastic/beats/v7/filebeat/channel" - "github.com/elastic/beats/v7/filebeat/input" + v2 "github.com/elastic/beats/v7/filebeat/input/v2" + stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless" + "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/beats/v7/libbeat/common/cfgwarn" - "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/libbeat/feature" "github.com/elastic/beats/v7/x-pack/libbeat/common/cloudfoundry" ) -func init() { - err := input.Register("cloudfoundry", NewInput) - if err != nil { - panic(err) - } +type cloudfoundryEvent interface { + Timestamp() time.Time + ToFields() common.MapStr } -// NewInput creates a new udp input -func NewInput( - cfg *common.Config, - outlet channel.Connector, - context input.Context, -) (input.Input, error) { - cfgwarn.Beta("The cloudfoundry input is beta") - - log := logp.NewLogger("cloudfoundry") - - out, err := outlet.Connect(cfg) - if err != nil { - return nil, err +func Plugin() v2.Plugin { + return v2.Plugin{ + Name: "cloudfoundry", + Stability: feature.Beta, + Deprecated: false, + Info: "collect logs from cloudfoundry loggregator", + Manager: stateless.NewInputManager(configure), } +} - var conf cloudfoundry.Config - if err = cfg.Unpack(&conf); err != nil { +func configure(cfg *common.Config) (stateless.Input, error) { + config := cloudfoundry.Config{} + if err := cfg.Unpack(&config); err != nil { return nil, err } - switch conf.Version { + switch config.Version { case cloudfoundry.ConsumerVersionV1: - return newInputV1(log, conf, out, context) + return configureV1(config) case cloudfoundry.ConsumerVersionV2: - return newInputV2(log, conf, out, context) + return configureV2(config) default: - return nil, fmt.Errorf("not supported consumer version: %s", conf.Version) + return nil, fmt.Errorf("not supported consumer version: %s", config.Version) + } +} + +func createEvent(evt cloudfoundryEvent) beat.Event { + return beat.Event{ + Timestamp: evt.Timestamp(), + Fields: evt.ToFields(), } } diff --git a/x-pack/filebeat/input/cloudfoundry/input_integration_test.go b/x-pack/filebeat/input/cloudfoundry/input_integration_test.go index 736acf778fd..81d2991e5aa 100644 --- a/x-pack/filebeat/input/cloudfoundry/input_integration_test.go +++ b/x-pack/filebeat/input/cloudfoundry/input_integration_test.go @@ -11,16 +11,17 @@ import ( "context" "crypto/tls" "net/http" + "sync" "testing" "time" "github.com/stretchr/testify/require" - "github.com/elastic/beats/v7/filebeat/channel" - "github.com/elastic/beats/v7/filebeat/input" + v2 "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" + pubtest "github.com/elastic/beats/v7/libbeat/publisher/testing" cftest "github.com/elastic/beats/v7/x-pack/libbeat/common/cloudfoundry/test" ) @@ -40,66 +41,53 @@ func testInput(t *testing.T, version string) { config := common.MustNewConfigFrom(cftest.GetConfigFromEnv(t)) config.SetString("version", -1, version) + input, err := Plugin().Manager.Create(config) + require.NoError(t, err) + + var wg sync.WaitGroup + defer wg.Wait() + ctx, cancel := context.WithCancel(context.Background()) defer cancel() + // Ensure that there is something happening in the firehose apiAddress, err := config.String("api_address", -1) require.NoError(t, err) - - // Ensure that there is something happening in the firehose go makeApiRequests(t, ctx, apiAddress) - events := make(chan beat.Event) - connector := channel.ConnectorFunc(func(*common.Config, beat.ClientConfig) (channel.Outleter, error) { - return newOutleter(events), nil - }) - - inputCtx := input.Context{Done: make(chan struct{})} + ch := make(chan beat.Event) + client := &pubtest.FakeClient{ + PublishFunc: func(evt beat.Event) { + if ctx.Err() != nil { + return + } + + select { + case ch <- evt: + case <-ctx.Done(): + } + }, + } - input, err := NewInput(config, connector, inputCtx) - require.NoError(t, err) + wg.Add(1) + go func() { + defer wg.Done() - go input.Run() - defer input.Stop() + inputCtx := v2.Context{ + Logger: logp.NewLogger("test"), + Cancelation: ctx, + } + input.Run(inputCtx, pubtest.ConstClient(client)) + }() select { - case e := <-events: + case e := <-ch: t.Logf("Event received: %+v", e) case <-time.After(10 * time.Second): t.Fatal("timeout waiting for events") } } -type outleter struct { - events chan<- beat.Event - done chan struct{} -} - -func newOutleter(events chan<- beat.Event) *outleter { - return &outleter{ - events: events, - done: make(chan struct{}), - } -} - -func (o *outleter) Close() error { - close(o.done) - return nil -} - -func (o *outleter) Done() <-chan struct{} { - return o.done -} - -func (o *outleter) OnEvent(e beat.Event) bool { - select { - case o.events <- e: - return true - default: - return false - } -} - func makeApiRequests(t *testing.T, ctx context.Context, address string) { client := &http.Client{ Transport: &http.Transport{ diff --git a/x-pack/filebeat/input/cloudfoundry/v1.go b/x-pack/filebeat/input/cloudfoundry/v1.go index c1248545369..ffc5ccd4a75 100644 --- a/x-pack/filebeat/input/cloudfoundry/v1.go +++ b/x-pack/filebeat/input/cloudfoundry/v1.go @@ -5,83 +5,59 @@ package cloudfoundry import ( - "sync" - "github.com/pkg/errors" - "github.com/elastic/beats/v7/filebeat/channel" - "github.com/elastic/beats/v7/filebeat/harvester" - "github.com/elastic/beats/v7/filebeat/input" - "github.com/elastic/beats/v7/libbeat/beat" - "github.com/elastic/beats/v7/libbeat/logp" + v2 "github.com/elastic/beats/v7/filebeat/input/v2" + stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless" "github.com/elastic/beats/v7/x-pack/libbeat/common/cloudfoundry" + "github.com/elastic/go-concert/ctxtool" ) -// InputV1 defines a udp input to receive event on a specific host:port. -type InputV1 struct { - sync.Mutex - consumer *cloudfoundry.DopplerConsumer - started bool - log *logp.Logger - outlet channel.Outleter +// inputV1 defines a udp input to receive event on a specific host:port. +type inputV1 struct { + config cloudfoundry.Config +} + +func configureV1(config cloudfoundry.Config) (*inputV1, error) { + return &inputV1{config: config}, nil } -func newInputV1(log *logp.Logger, conf cloudfoundry.Config, out channel.Outleter, context input.Context) (*InputV1, error) { - hub := cloudfoundry.NewHub(&conf, "filebeat", log) - forwarder := harvester.NewForwarder(out) +func (i *inputV1) Name() string { return "cloudfoundry-v1" } + +func (i *inputV1) Test(ctx v2.TestContext) error { + hub := cloudfoundry.NewHub(&i.config, "filebeat", ctx.Logger) + _, err := hub.Client() + return err +} + +func (i *inputV1) Run(ctx v2.Context, publisher stateless.Publisher) error { + log := ctx.Logger + hub := cloudfoundry.NewHub(&i.config, "filebeat", log) + + log.Info("Starting cloudfoundry input") + defer log.Info("Stopped cloudfoundry input") callbacks := cloudfoundry.DopplerCallbacks{ Log: func(evt cloudfoundry.Event) { - forwarder.Send(beat.Event{ - Timestamp: evt.Timestamp(), - Fields: evt.ToFields(), - }) + publisher.Publish(createEvent(evt)) }, Error: func(evt cloudfoundry.EventError) { - forwarder.Send(beat.Event{ - Timestamp: evt.Timestamp(), - Fields: evt.ToFields(), - }) + publisher.Publish(createEvent(&evt)) }, } consumer, err := hub.DopplerConsumer(callbacks) if err != nil { - return nil, errors.Wrapf(err, "initializing doppler consumer") + return errors.Wrapf(err, "initializing doppler consumer") } - return &InputV1{ - outlet: out, - consumer: consumer, - started: false, - log: log, - }, nil -} - -// Run starts the consumer of cloudfoundry events -func (p *InputV1) Run() { - p.Lock() - defer p.Unlock() - if !p.started { - p.log.Info("starting cloudfoundry input") - p.consumer.Run() - p.started = true - } -} - -// Stop stops cloudfoundry doppler consumer -func (p *InputV1) Stop() { - defer p.outlet.Close() - p.Lock() - defer p.Unlock() - - p.log.Info("stopping cloudfoundry input") - p.consumer.Stop() - p.started = false -} + stopCtx, cancel := ctxtool.WithFunc(ctxtool.FromCanceller(ctx.Cancelation), func() { + // wait stops the consumer and waits for all internal go-routines to be stopped. + consumer.Wait() + }) + defer cancel() -// Wait waits for the input to finalize, and stops it -func (p *InputV1) Wait() { - p.Stop() - p.consumer.Wait() + consumer.Run() + <-stopCtx.Done() + return nil } diff --git a/x-pack/filebeat/input/cloudfoundry/v2.go b/x-pack/filebeat/input/cloudfoundry/v2.go index 0d21d361f9c..a8257e29a76 100644 --- a/x-pack/filebeat/input/cloudfoundry/v2.go +++ b/x-pack/filebeat/input/cloudfoundry/v2.go @@ -5,86 +5,51 @@ package cloudfoundry import ( - "context" - "sync" - - "github.com/elastic/beats/v7/filebeat/channel" - "github.com/elastic/beats/v7/filebeat/harvester" - "github.com/elastic/beats/v7/filebeat/input" - "github.com/elastic/beats/v7/libbeat/beat" - "github.com/elastic/beats/v7/libbeat/logp" + v2 "github.com/elastic/beats/v7/filebeat/input/v2" + stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless" "github.com/elastic/beats/v7/x-pack/libbeat/common/cloudfoundry" + "github.com/elastic/go-concert/ctxtool" ) -// InputV2 defines a Cloudfoundry input that uses the consumer V2 API -type InputV2 struct { - sync.Mutex - listener *cloudfoundry.RlpListener - started bool - log *logp.Logger - outlet channel.Outleter +// inputV2 defines a Cloudfoundry input that uses the consumer V2 API +type inputV2 struct { + config cloudfoundry.Config +} + +func configureV2(config cloudfoundry.Config) (*inputV2, error) { + return &inputV2{config: config}, nil +} + +func (i *inputV2) Name() string { return "cloudfoundry-v2" } + +func (i *inputV2) Test(ctx v2.TestContext) error { + hub := cloudfoundry.NewHub(&i.config, "filebeat", ctx.Logger) + _, err := hub.Client() + return err } -func newInputV2(log *logp.Logger, conf cloudfoundry.Config, out channel.Outleter, context input.Context) (*InputV2, error) { - hub := cloudfoundry.NewHub(&conf, "filebeat", log) - forwarder := harvester.NewForwarder(out) +func (i *inputV2) Run(ctx v2.Context, publisher stateless.Publisher) error { + log := ctx.Logger + hub := cloudfoundry.NewHub(&i.config, "filebeat", log) + callbacks := cloudfoundry.RlpListenerCallbacks{ HttpAccess: func(evt *cloudfoundry.EventHttpAccess) { - forwarder.Send(beat.Event{ - Timestamp: evt.Timestamp(), - Fields: evt.ToFields(), - }) + publisher.Publish(createEvent(evt)) }, Log: func(evt *cloudfoundry.EventLog) { - forwarder.Send(beat.Event{ - Timestamp: evt.Timestamp(), - Fields: evt.ToFields(), - }) + publisher.Publish(createEvent(evt)) }, Error: func(evt *cloudfoundry.EventError) { - forwarder.Send(beat.Event{ - Timestamp: evt.Timestamp(), - Fields: evt.ToFields(), - }) + publisher.Publish(createEvent(evt)) }, } listener, err := hub.RlpListener(callbacks) if err != nil { - return nil, err + return err } - return &InputV2{ - outlet: out, - listener: listener, - started: false, - log: log, - }, nil -} - -// Run starts the listener of cloudfoundry events -func (p *InputV2) Run() { - p.Lock() - defer p.Unlock() - - if !p.started { - p.log.Info("starting cloudfoundry input") - p.listener.Start(context.TODO()) - p.started = true - } -} - -// Stop stops cloudfoundry listener -func (p *InputV2) Stop() { - defer p.outlet.Close() - p.Lock() - defer p.Unlock() - - p.log.Info("stopping cloudfoundry input") - p.listener.Stop() - p.started = false -} -// Wait waits for the input to finalize, and stops it -func (p *InputV2) Wait() { - p.Stop() + listener.Start(ctxtool.FromCanceller(ctx.Cancelation)) + listener.Wait() + return nil } diff --git a/x-pack/filebeat/input/default-inputs/inputs.go b/x-pack/filebeat/input/default-inputs/inputs.go index 0af51f2c18a..afac3c2e61c 100644 --- a/x-pack/filebeat/input/default-inputs/inputs.go +++ b/x-pack/filebeat/input/default-inputs/inputs.go @@ -10,6 +10,7 @@ import ( v2 "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/x-pack/filebeat/input/cloudfoundry" "github.com/elastic/beats/v7/x-pack/filebeat/input/o365audit" ) @@ -23,5 +24,6 @@ func Init(info beat.Info, log *logp.Logger, store beater.StateStore) []v2.Plugin func xpackInputs(info beat.Info, log *logp.Logger, store beater.StateStore) []v2.Plugin { return []v2.Plugin{ o365audit.Plugin(log, store), + cloudfoundry.Plugin(), } } diff --git a/x-pack/libbeat/common/cloudfoundry/rlplistener.go b/x-pack/libbeat/common/cloudfoundry/rlplistener.go index f542f8eac7e..e80db747c8e 100644 --- a/x-pack/libbeat/common/cloudfoundry/rlplistener.go +++ b/x-pack/libbeat/common/cloudfoundry/rlplistener.go @@ -66,8 +66,8 @@ func (c *RlpListener) Start(ctx context.Context) { } es := rlpClient.Stream(ctx, l) + c.wg.Add(1) go func() { - c.wg.Add(1) defer c.wg.Done() for { select { @@ -110,6 +110,10 @@ func (c *RlpListener) Stop() { c.wg.Wait() } +func (c *RlpListener) Wait() { + c.wg.Wait() +} + // getSelectors returns the server side selectors based on the callbacks defined on the listener. func (c *RlpListener) getSelectors() []*loggregator_v2.Selector { selectors := make([]*loggregator_v2.Selector, 0)