Skip to content

Commit

Permalink
Extract stdin prospector (#4158)
Browse files Browse the repository at this point in the history
This a first step in the direction of having a package for each prospector together with its harvester. Stdin prospector was tackled first as it has much less dependencies. Until the refactoring is completed there will be some duplicated code.
  • Loading branch information
ruflin authored and andrewkroh committed May 1, 2017
1 parent d47c55a commit 461cbc8
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 21 deletions.
11 changes: 11 additions & 0 deletions filebeat/channel/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package channel

import "github.com/elastic/beats/filebeat/input"

// Outleter is the outlet for a prospector
type Outleter interface {
SetSignal(signal <-chan struct{})
OnEventSignal(event *input.Data) bool
OnEvent(event *input.Data) bool
Copy() Outleter
}
3 changes: 1 addition & 2 deletions filebeat/channel/outlet.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"sync/atomic"

"github.com/elastic/beats/filebeat/input"
"github.com/elastic/beats/filebeat/prospector"
)

// Outlet struct is used to be passed to an object which needs an outlet
Expand Down Expand Up @@ -90,6 +89,6 @@ func (o *Outlet) OnEventSignal(event *input.Data) bool {
}
}

func (o *Outlet) Copy() prospector.Outlet {
func (o *Outlet) Copy() Outleter {
return NewOutlet(o.done, o.channel, o.wg)
}
5 changes: 3 additions & 2 deletions filebeat/crawler/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"sync"

"github.com/elastic/beats/filebeat/channel"
"github.com/elastic/beats/filebeat/input/file"
"github.com/elastic/beats/filebeat/prospector"
"github.com/elastic/beats/filebeat/registrar"
Expand All @@ -15,14 +16,14 @@ import (
type Crawler struct {
prospectors map[uint64]*prospector.Prospector
prospectorConfigs []*common.Config
out prospector.Outlet
out channel.Outleter
wg sync.WaitGroup
reloader *cfgfile.Reloader
once bool
beatDone chan struct{}
}

func New(out prospector.Outlet, prospectorConfigs []*common.Config, beatDone chan struct{}, once bool) (*Crawler, error) {
func New(out channel.Outleter, prospectorConfigs []*common.Config, beatDone chan struct{}, once bool) (*Crawler, error) {

return &Crawler{
out: out,
Expand Down
5 changes: 3 additions & 2 deletions filebeat/prospector/factory.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package prospector

import (
"github.com/elastic/beats/filebeat/channel"
"github.com/elastic/beats/filebeat/registrar"
"github.com/elastic/beats/libbeat/cfgfile"
"github.com/elastic/beats/libbeat/common"
Expand All @@ -9,13 +10,13 @@ import (

// Factory is a factory for registrars
type Factory struct {
outlet Outlet
outlet channel.Outleter
registrar *registrar.Registrar
beatDone chan struct{}
}

// NewFactory instantiates a new Factory
func NewFactory(outlet Outlet, registrar *registrar.Registrar, beatDone chan struct{}) *Factory {
func NewFactory(outlet channel.Outleter, registrar *registrar.Registrar, beatDone chan struct{}) *Factory {
return &Factory{
outlet: outlet,
registrar: registrar,
Expand Down
16 changes: 5 additions & 11 deletions filebeat/prospector/prospector.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ import (

"github.com/mitchellh/hashstructure"

"github.com/elastic/beats/filebeat/channel"
cfg "github.com/elastic/beats/filebeat/config"
"github.com/elastic/beats/filebeat/harvester"
"github.com/elastic/beats/filebeat/input"
"github.com/elastic/beats/filebeat/input/file"
"github.com/elastic/beats/filebeat/prospector/stdin"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/monitoring"
Expand All @@ -26,7 +28,7 @@ type Prospector struct {
cfg *common.Config // Raw config
config prospectorConfig
prospectorer Prospectorer
outlet Outlet
outlet channel.Outleter
harvesterChan chan *input.Event
channelDone chan struct{}
runDone chan struct{}
Expand All @@ -46,16 +48,8 @@ type Prospectorer interface {
Run()
}

// Outlet is the outlet for a prospector
type Outlet interface {
SetSignal(signal <-chan struct{})
OnEventSignal(event *input.Data) bool
OnEvent(event *input.Data) bool
Copy() Outlet
}

// NewProspector instantiates a new prospector
func NewProspector(cfg *common.Config, outlet Outlet, beatDone chan struct{}) (*Prospector, error) {
func NewProspector(cfg *common.Config, outlet channel.Outleter, beatDone chan struct{}) (*Prospector, error) {
prospector := &Prospector{
cfg: cfg,
config: defaultConfig,
Expand Down Expand Up @@ -97,7 +91,7 @@ func (p *Prospector) LoadStates(states []file.State) error {

switch p.config.InputType {
case cfg.StdinInputType:
prospectorer, err = NewStdin(p)
prospectorer, err = stdin.NewProspector(p.cfg, p.outlet)
case cfg.LogInputType:
prospectorer, err = NewLog(p)
default:
Expand Down
3 changes: 2 additions & 1 deletion filebeat/prospector/prospector_log_other_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/elastic/beats/filebeat/input/file"
"github.com/elastic/beats/libbeat/common/match"

"github.com/elastic/beats/filebeat/channel"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -158,4 +159,4 @@ type TestOutlet struct{}
func (o TestOutlet) OnEvent(event *input.Data) bool { return true }
func (o TestOutlet) OnEventSignal(event *input.Data) bool { return true }
func (o TestOutlet) SetSignal(signal <-chan struct{}) {}
func (o TestOutlet) Copy() Outlet { return o }
func (o TestOutlet) Copy() channel.Outleter { return o }
Original file line number Diff line number Diff line change
@@ -1,30 +1,36 @@
package prospector
package stdin

import (
"fmt"

"github.com/elastic/beats/filebeat/channel"
"github.com/elastic/beats/filebeat/harvester"
"github.com/elastic/beats/filebeat/input/file"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
)

// Stdin is a prospector for stdin
type Stdin struct {
harvester *harvester.Harvester
started bool
cfg *common.Config
outlet channel.Outleter
}

// NewStdin creates a new stdin prospector
// This prospector contains one harvester which is reading from stdin
func NewStdin(p *Prospector) (*Stdin, error) {
func NewProspector(cfg *common.Config, outlet channel.Outleter) (*Stdin, error) {

prospectorer := &Stdin{
started: false,
cfg: cfg,
outlet: outlet,
}

var err error

prospectorer.harvester, err = p.createHarvester(file.State{Source: "-"})
prospectorer.harvester, err = prospectorer.createHarvester(file.State{Source: "-"})
if err != nil {
return nil, fmt.Errorf("Error initializing stdin harvester: %v", err)
}
Expand All @@ -51,3 +57,18 @@ func (s *Stdin) Run() {
s.started = true
}
}

// createHarvester creates a new harvester instance from the given state
func (s *Stdin) createHarvester(state file.State) (*harvester.Harvester, error) {

// Each harvester gets its own copy of the outlet
outlet := s.outlet.Copy()
h, err := harvester.NewHarvester(
s.cfg,
state,
nil,
outlet,
)

return h, err
}

0 comments on commit 461cbc8

Please sign in to comment.