Skip to content

Commit

Permalink
review
Browse files Browse the repository at this point in the history
  • Loading branch information
urso committed Jul 8, 2020
1 parent 926a3bd commit dac4538
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 9 deletions.
3 changes: 0 additions & 3 deletions x-pack/filebeat/input/cloudfoundry/input_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"
pubtest "github.com/elastic/beats/v7/libbeat/publisher/testing"
"github.com/elastic/beats/v7/libbeat/tests/resources"
cftest "github.com/elastic/beats/v7/x-pack/libbeat/common/cloudfoundry/test"
)

Expand All @@ -38,8 +37,6 @@ func TestInput(t *testing.T) {
}

func testInput(t *testing.T, version string) {
defer resources.NewGoroutinesChecker().Check(t)

config := common.MustNewConfigFrom(cftest.GetConfigFromEnv(t))
config.SetString("version", -1, version)

Expand Down
13 changes: 8 additions & 5 deletions x-pack/filebeat/input/cloudfoundry/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ func configureV1(config cloudfoundry.Config) (*inputV1, error) {

func (i *inputV1) Name() string { return "cloudfoundry-v1" }

func (i *inputV1) Test(_ v2.TestContext) error {
// XXX: try to connect, but don't consume
return nil
func (i *inputV1) Test(ctx v2.TestContext) error {
hub := cloudfoundry.NewHub(&i.config, "filebeat", ctx.Logger)
_, err := hub.Client()
return err
}

func (i *inputV1) Run(ctx v2.Context, publisher stateless.Publisher) error {
Expand All @@ -50,11 +51,13 @@ func (i *inputV1) Run(ctx v2.Context, publisher stateless.Publisher) error {
return errors.Wrapf(err, "initializing doppler consumer")
}

_, cancel := ctxtool.WithFunc(ctxtool.FromCanceller(ctx.Cancelation), func() {
consumer.Stop()
stopCtx, cancel := ctxtool.WithFunc(ctxtool.FromCanceller(ctx.Cancelation), func() {
// wait stops the consumer and waits for all internal go-routines to be stopped.
consumer.Wait()
})
defer cancel()

consumer.Run()
<-stopCtx.Done()
return nil
}
2 changes: 1 addition & 1 deletion x-pack/libbeat/common/cloudfoundry/rlplistener.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ func (c *RlpListener) Start(ctx context.Context) {
}
es := rlpClient.Stream(ctx, l)

c.wg.Add(1)
go func() {
c.wg.Add(1)
defer c.wg.Done()
for {
select {
Expand Down

0 comments on commit dac4538

Please sign in to comment.