Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract stdin prospector #4158

Merged
merged 1 commit into from
May 1, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions filebeat/channel/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package channel

import "github.com/elastic/beats/filebeat/input"

// Outleter is the outlet for a prospector
type Outleter interface {
SetSignal(signal <-chan struct{})
OnEventSignal(event *input.Data) bool
OnEvent(event *input.Data) bool
Copy() Outleter
}
3 changes: 1 addition & 2 deletions filebeat/channel/outlet.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"sync/atomic"

"github.com/elastic/beats/filebeat/input"
"github.com/elastic/beats/filebeat/prospector"
)

// Outlet struct is used to be passed to an object which needs an outlet
Expand Down Expand Up @@ -90,6 +89,6 @@ func (o *Outlet) OnEventSignal(event *input.Data) bool {
}
}

func (o *Outlet) Copy() prospector.Outlet {
func (o *Outlet) Copy() Outleter {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[golint] reported by reviewdog 🐶
exported method Outlet.Copy should have comment or be unexported

return NewOutlet(o.done, o.channel, o.wg)
}
5 changes: 3 additions & 2 deletions filebeat/crawler/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"sync"

"github.com/elastic/beats/filebeat/channel"
"github.com/elastic/beats/filebeat/input/file"
"github.com/elastic/beats/filebeat/prospector"
"github.com/elastic/beats/filebeat/registrar"
Expand All @@ -15,14 +16,14 @@ import (
type Crawler struct {
prospectors map[uint64]*prospector.Prospector
prospectorConfigs []*common.Config
out prospector.Outlet
out channel.Outleter
wg sync.WaitGroup
reloader *cfgfile.Reloader
once bool
beatDone chan struct{}
}

func New(out prospector.Outlet, prospectorConfigs []*common.Config, beatDone chan struct{}, once bool) (*Crawler, error) {
func New(out channel.Outleter, prospectorConfigs []*common.Config, beatDone chan struct{}, once bool) (*Crawler, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[golint] reported by reviewdog 🐶
exported function New should have comment or be unexported


return &Crawler{
out: out,
Expand Down
5 changes: 3 additions & 2 deletions filebeat/prospector/factory.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package prospector

import (
"github.com/elastic/beats/filebeat/channel"
"github.com/elastic/beats/filebeat/registrar"
"github.com/elastic/beats/libbeat/cfgfile"
"github.com/elastic/beats/libbeat/common"
Expand All @@ -9,13 +10,13 @@ import (

// Factory is a factory for registrars
type Factory struct {
outlet Outlet
outlet channel.Outleter
registrar *registrar.Registrar
beatDone chan struct{}
}

// NewFactory instantiates a new Factory
func NewFactory(outlet Outlet, registrar *registrar.Registrar, beatDone chan struct{}) *Factory {
func NewFactory(outlet channel.Outleter, registrar *registrar.Registrar, beatDone chan struct{}) *Factory {
return &Factory{
outlet: outlet,
registrar: registrar,
Expand Down
16 changes: 5 additions & 11 deletions filebeat/prospector/prospector.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ import (

"github.com/mitchellh/hashstructure"

"github.com/elastic/beats/filebeat/channel"
cfg "github.com/elastic/beats/filebeat/config"
"github.com/elastic/beats/filebeat/harvester"
"github.com/elastic/beats/filebeat/input"
"github.com/elastic/beats/filebeat/input/file"
"github.com/elastic/beats/filebeat/prospector/stdin"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/monitoring"
Expand All @@ -26,7 +28,7 @@ type Prospector struct {
cfg *common.Config // Raw config
config prospectorConfig
prospectorer Prospectorer
outlet Outlet
outlet channel.Outleter
harvesterChan chan *input.Event
channelDone chan struct{}
runDone chan struct{}
Expand All @@ -46,16 +48,8 @@ type Prospectorer interface {
Run()
}

// Outlet is the outlet for a prospector
type Outlet interface {
SetSignal(signal <-chan struct{})
OnEventSignal(event *input.Data) bool
OnEvent(event *input.Data) bool
Copy() Outlet
}

// NewProspector instantiates a new prospector
func NewProspector(cfg *common.Config, outlet Outlet, beatDone chan struct{}) (*Prospector, error) {
func NewProspector(cfg *common.Config, outlet channel.Outleter, beatDone chan struct{}) (*Prospector, error) {
prospector := &Prospector{
cfg: cfg,
config: defaultConfig,
Expand Down Expand Up @@ -97,7 +91,7 @@ func (p *Prospector) LoadStates(states []file.State) error {

switch p.config.InputType {
case cfg.StdinInputType:
prospectorer, err = NewStdin(p)
prospectorer, err = stdin.NewProspector(p.cfg, p.outlet)
case cfg.LogInputType:
prospectorer, err = NewLog(p)
default:
Expand Down
3 changes: 2 additions & 1 deletion filebeat/prospector/prospector_log_other_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/elastic/beats/filebeat/input/file"
"github.com/elastic/beats/libbeat/common/match"

"github.com/elastic/beats/filebeat/channel"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -158,4 +159,4 @@ type TestOutlet struct{}
func (o TestOutlet) OnEvent(event *input.Data) bool { return true }
func (o TestOutlet) OnEventSignal(event *input.Data) bool { return true }
func (o TestOutlet) SetSignal(signal <-chan struct{}) {}
func (o TestOutlet) Copy() Outlet { return o }
func (o TestOutlet) Copy() channel.Outleter { return o }
Original file line number Diff line number Diff line change
@@ -1,30 +1,36 @@
package prospector
package stdin

import (
"fmt"

"github.com/elastic/beats/filebeat/channel"
"github.com/elastic/beats/filebeat/harvester"
"github.com/elastic/beats/filebeat/input/file"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
)

// Stdin is a prospector for stdin
type Stdin struct {
harvester *harvester.Harvester
started bool
cfg *common.Config
outlet channel.Outleter
}

// NewStdin creates a new stdin prospector
// This prospector contains one harvester which is reading from stdin
func NewStdin(p *Prospector) (*Stdin, error) {
func NewProspector(cfg *common.Config, outlet channel.Outleter) (*Stdin, error) {

prospectorer := &Stdin{
started: false,
cfg: cfg,
outlet: outlet,
}

var err error

prospectorer.harvester, err = p.createHarvester(file.State{Source: "-"})
prospectorer.harvester, err = prospectorer.createHarvester(file.State{Source: "-"})
if err != nil {
return nil, fmt.Errorf("Error initializing stdin harvester: %v", err)
}
Expand All @@ -51,3 +57,18 @@ func (s *Stdin) Run() {
s.started = true
}
}

// createHarvester creates a new harvester instance from the given state
func (s *Stdin) createHarvester(state file.State) (*harvester.Harvester, error) {

// Each harvester gets its own copy of the outlet
outlet := s.outlet.Copy()
h, err := harvester.NewHarvester(
s.cfg,
state,
nil,
outlet,
)

return h, err
}