From 5482ecf45047ce52b831c31b23431c6cd0bd6c3c Mon Sep 17 00:00:00 2001 From: Steffen Siering Date: Thu, 9 Jul 2020 12:53:18 +0200 Subject: [PATCH] Move unix socket to v2 input API (#19716) (cherry picked from commit 2320b3f8b22cac996fe995e61f45cfc3b58fd282) --- filebeat/include/list.go | 1 - filebeat/input/default-inputs/inputs.go | 5 +- filebeat/input/default-inputs/inputs_other.go | 2 +- filebeat/input/unix/config.go | 22 ++- filebeat/input/unix/input.go | 141 ++++++++---------- 5 files changed, 74 insertions(+), 97 deletions(-) diff --git a/filebeat/include/list.go b/filebeat/include/list.go index 7cc66b3894bd..519d0e715819 100644 --- a/filebeat/include/list.go +++ b/filebeat/include/list.go @@ -31,7 +31,6 @@ import ( _ "github.com/elastic/beats/v7/filebeat/input/syslog" _ "github.com/elastic/beats/v7/filebeat/input/tcp" _ "github.com/elastic/beats/v7/filebeat/input/udp" - _ "github.com/elastic/beats/v7/filebeat/input/unix" _ "github.com/elastic/beats/v7/filebeat/module/apache" _ "github.com/elastic/beats/v7/filebeat/module/auditd" _ "github.com/elastic/beats/v7/filebeat/module/elasticsearch" diff --git a/filebeat/input/default-inputs/inputs.go b/filebeat/input/default-inputs/inputs.go index 1cfce53a3eb8..52338a0af983 100644 --- a/filebeat/input/default-inputs/inputs.go +++ b/filebeat/input/default-inputs/inputs.go @@ -19,6 +19,7 @@ package inputs import ( "github.com/elastic/beats/v7/filebeat/beater" + "github.com/elastic/beats/v7/filebeat/input/unix" v2 "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/logp" @@ -32,5 +33,7 @@ func Init(info beat.Info, log *logp.Logger, components beater.StateStore) []v2.P } func genericInputs() []v2.Plugin { - return []v2.Plugin{} + return []v2.Plugin{ + unix.Plugin(), + } } diff --git a/filebeat/input/default-inputs/inputs_other.go b/filebeat/input/default-inputs/inputs_other.go index ccd6ce417520..13f4abeb7c81 100644 --- a/filebeat/input/default-inputs/inputs_other.go +++ b/filebeat/input/default-inputs/inputs_other.go @@ -28,5 +28,5 @@ import ( type osComponents interface{} func osInputs(info beat.Info, log *logp.Logger, components osComponents) []v2.Plugin { - return nil + return []v2.Plugin{} } diff --git a/filebeat/input/unix/config.go b/filebeat/input/unix/config.go index 5f65173bc46e..4d4400cb9740 100644 --- a/filebeat/input/unix/config.go +++ b/filebeat/input/unix/config.go @@ -22,24 +22,20 @@ import ( "github.com/dustin/go-humanize" - "github.com/elastic/beats/v7/filebeat/harvester" "github.com/elastic/beats/v7/filebeat/inputsource/unix" ) type config struct { - unix.Config `config:",inline"` - harvester.ForwarderConfig `config:",inline"` - + unix.Config `config:",inline"` LineDelimiter string `config:"line_delimiter" validate:"nonzero"` } -var defaultConfig = config{ - ForwarderConfig: harvester.ForwarderConfig{ - Type: "unix", - }, - Config: unix.Config{ - Timeout: time.Minute * 5, - MaxMessageSize: 20 * humanize.MiByte, - }, - LineDelimiter: "\n", +func defaultConfig() config { + return config{ + Config: unix.Config{ + Timeout: time.Minute * 5, + MaxMessageSize: 20 * humanize.MiByte, + }, + LineDelimiter: "\n", + } } diff --git a/filebeat/input/unix/input.go b/filebeat/input/unix/input.go index 3c827c7cd92c..3f5be8c8b87b 100644 --- a/filebeat/input/unix/input.go +++ b/filebeat/input/unix/input.go @@ -18,118 +18,97 @@ package unix import ( + "bufio" "fmt" - "sync" + "net" "time" - "github.com/elastic/beats/v7/filebeat/channel" - "github.com/elastic/beats/v7/filebeat/harvester" - "github.com/elastic/beats/v7/filebeat/input" + input "github.com/elastic/beats/v7/filebeat/input/v2" + stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless" "github.com/elastic/beats/v7/filebeat/inputsource" netcommon "github.com/elastic/beats/v7/filebeat/inputsource/common" "github.com/elastic/beats/v7/filebeat/inputsource/unix" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/beats/v7/libbeat/common/cfgwarn" - "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/libbeat/feature" + "github.com/elastic/go-concert/ctxtool" ) -func init() { - err := input.Register("unix", NewInput) - if err != nil { - panic(err) - } -} - -// Input for Unix socket connection -type Input struct { - mutex sync.Mutex - server *unix.Server - started bool - outlet channel.Outleter - config *config - log *logp.Logger +type server struct { + config + splitFunc bufio.SplitFunc } -// NewInput creates a new Unix socket input -func NewInput( - cfg *common.Config, - connector channel.Connector, - context input.Context, -) (input.Input, error) { - cfgwarn.Beta("Unix socket support is beta.") - - out, err := connector.Connect(cfg) - if err != nil { - return nil, err +func Plugin() input.Plugin { + return input.Plugin{ + Name: "unix", + Stability: feature.Beta, + Deprecated: false, + Info: "unix socket server", + Manager: stateless.NewInputManager(configure), } +} - forwarder := harvester.NewForwarder(out) - - config := defaultConfig - err = cfg.Unpack(&config) - if err != nil { +func configure(cfg *common.Config) (stateless.Input, error) { + config := defaultConfig() + if err := cfg.Unpack(&config); err != nil { return nil, err } - cb := func(data []byte, metadata inputsource.NetworkMetadata) { - forwarder.Send(beat.Event{ - Timestamp: time.Now(), - Fields: common.MapStr{ - "message": string(data), - }, - }) - } + return newServer(config) +} +func newServer(config config) (*server, error) { splitFunc := netcommon.SplitFunc([]byte(config.LineDelimiter)) if splitFunc == nil { return nil, fmt.Errorf("unable to create splitFunc for delimiter %s", config.LineDelimiter) } - logger := logp.NewLogger("input.unix").With("path", config.Config.Path) - factory := netcommon.SplitHandlerFactory(netcommon.FamilyUnix, logger, unix.MetadataCallback, cb, splitFunc) + return &server{config: config, splitFunc: splitFunc}, nil +} + +func (s *server) Name() string { return "unix" } - server, err := unix.New(&config.Config, factory) +func (s *server) Test(_ input.TestContext) error { + l, err := net.Listen("unix", s.config.Path) if err != nil { - return nil, err + return err } - - return &Input{ - server: server, - started: false, - outlet: out, - config: &config, - log: logger, - }, nil + return l.Close() } -// Run start a Unix socket input -func (p *Input) Run() { - p.mutex.Lock() - defer p.mutex.Unlock() - - if !p.started { - p.log.Info("Starting Unix socket input") - err := p.server.Start() - if err != nil { - p.log.Errorw("Error starting the Unix socket server", "error", err) - } - p.started = true +func (s *server) Run(ctx input.Context, publisher stateless.Publisher) error { + log := ctx.Logger.Named("input.unix").With("path", s.config.Config.Path) + + log.Info("Starting Unix socket input") + defer log.Info("Unix socket input stopped") + + cb := func(data []byte, metadata inputsource.NetworkMetadata) { + event := createEvent(data, metadata) + publisher.Publish(event) + } + factory := netcommon.SplitHandlerFactory(netcommon.FamilyUnix, log, unix.MetadataCallback, cb, s.splitFunc) + server, err := unix.New(&s.config.Config, factory) + if err != nil { + return err } -} -// Stop stops Unix socket server -func (p *Input) Stop() { - defer p.outlet.Close() - p.mutex.Lock() - defer p.mutex.Unlock() + log.Debugf("TCP Input '%v' initialized", ctx.ID) - p.log.Info("Stopping Unix socket input") - p.server.Stop() - p.started = false + err = server.Run(ctxtool.FromCanceller(ctx.Cancelation)) + + // ignore error from 'Run' in case shutdown was signaled. + if ctxerr := ctx.Cancelation.Err(); ctxerr != nil { + err = ctxerr + } + return err } -// Wait stop the current server -func (p *Input) Wait() { - p.Stop() +func createEvent(raw []byte, metadata inputsource.NetworkMetadata) beat.Event { + return beat.Event{ + Timestamp: time.Now(), + Fields: common.MapStr{ + "message": string(raw), + }, + } }