-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
filebeat/input/udp: convert to v2 input #33930
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,63 +18,90 @@ | |
package udp | ||
|
||
import ( | ||
"fmt" | ||
"sync" | ||
"net" | ||
"time" | ||
|
||
"github.com/mitchellh/hashstructure" | ||
"github.com/dustin/go-humanize" | ||
"github.com/rcrowley/go-metrics" | ||
|
||
"github.com/elastic/beats/v7/filebeat/channel" | ||
"github.com/elastic/beats/v7/filebeat/harvester" | ||
"github.com/elastic/beats/v7/filebeat/input" | ||
input "github.com/elastic/beats/v7/filebeat/input/v2" | ||
stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless" | ||
"github.com/elastic/beats/v7/filebeat/inputsource" | ||
"github.com/elastic/beats/v7/filebeat/inputsource/udp" | ||
"github.com/elastic/beats/v7/libbeat/beat" | ||
"github.com/elastic/beats/v7/libbeat/feature" | ||
"github.com/elastic/beats/v7/libbeat/monitoring/inputmon" | ||
conf "github.com/elastic/elastic-agent-libs/config" | ||
"github.com/elastic/elastic-agent-libs/logp" | ||
"github.com/elastic/elastic-agent-libs/mapstr" | ||
"github.com/elastic/elastic-agent-libs/monitoring" | ||
"github.com/elastic/elastic-agent-libs/monitoring/adapter" | ||
"github.com/elastic/go-concert/ctxtool" | ||
) | ||
|
||
func init() { | ||
err := input.Register("udp", NewInput) | ||
if err != nil { | ||
panic(err) | ||
func Plugin() input.Plugin { | ||
return input.Plugin{ | ||
Name: "udp", | ||
Stability: feature.Stable, | ||
Deprecated: false, | ||
Info: "udp packet server", | ||
Manager: stateless.NewInputManager(configure), | ||
} | ||
} | ||
|
||
// Input defines a udp input to receive event on a specific host:port. | ||
type Input struct { | ||
sync.Mutex | ||
udp *udp.Server | ||
started bool | ||
outlet channel.Outleter | ||
func configure(cfg *conf.C) (stateless.Input, error) { | ||
config := defaultConfig() | ||
if err := cfg.Unpack(&config); err != nil { | ||
return nil, err | ||
} | ||
|
||
metrics *inputMetrics | ||
return newServer(config) | ||
} | ||
|
||
// NewInput creates a new udp input | ||
func NewInput(cfg *conf.C, outlet channel.Connector, context input.Context) (input.Input, error) { | ||
id, err := configID(cfg) | ||
if err != nil { | ||
return nil, err | ||
func defaultConfig() config { | ||
return config{ | ||
Config: udp.Config{ | ||
MaxMessageSize: 10 * humanize.KiByte, | ||
// TODO: What should be default port? | ||
Host: "localhost:8080", | ||
// TODO: What should be the default timeout? | ||
Timeout: time.Minute * 5, | ||
efd6 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
}, | ||
} | ||
out, err := outlet.Connect(cfg) | ||
} | ||
|
||
type server struct { | ||
udp.Server | ||
config | ||
} | ||
|
||
type config struct { | ||
udp.Config `config:",inline"` | ||
} | ||
|
||
func newServer(config config) (*server, error) { | ||
return &server{config: config}, nil | ||
} | ||
|
||
func (s *server) Name() string { return "udp" } | ||
|
||
func (s *server) Test(_ input.TestContext) error { | ||
l, err := net.Listen("udp", s.config.Config.Host) | ||
if err != nil { | ||
return nil, err | ||
return err | ||
} | ||
return l.Close() | ||
} | ||
|
||
config := defaultConfig | ||
if err = cfg.Unpack(&config); err != nil { | ||
return nil, err | ||
} | ||
func (s *server) Run(ctx input.Context, publisher stateless.Publisher) error { | ||
log := ctx.Logger.Named("udp").With("host", s.config.Config.Host) | ||
efd6 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
log.Info("starting udp socket input") | ||
defer log.Info("udp input stopped") | ||
|
||
metrics := newInputMetrics(ctx.ID, s.config.Host, uint64(s.config.ReadBuffer)) | ||
defer metrics.close() | ||
|
||
forwarder := harvester.NewForwarder(out) | ||
metrics := newInputMetrics(id, config.Host, uint64(config.ReadBuffer)) | ||
callback := func(data []byte, metadata inputsource.NetworkMetadata) { | ||
server := udp.New(&s.config.Config, func(data []byte, metadata inputsource.NetworkMetadata) { | ||
evt := beat.Event{ | ||
Timestamp: time.Now(), | ||
Meta: mapstr.M{ | ||
|
@@ -91,74 +118,22 @@ func NewInput(cfg *conf.C, outlet channel.Connector, context input.Context) (inp | |
}, | ||
} | ||
} | ||
_ = forwarder.Send(evt) | ||
|
||
publisher.Publish(evt) | ||
|
||
// This must be called after forwarder.Send to measure | ||
// the processing time metric. | ||
metrics.log(data, evt.Timestamp) | ||
} | ||
|
||
udp := udp.New(&config.Config, callback) | ||
|
||
return &Input{ | ||
outlet: out, | ||
udp: udp, | ||
started: false, | ||
metrics: metrics, | ||
}, nil | ||
} | ||
|
||
func configID(config *conf.C) (string, error) { | ||
var tmp struct { | ||
ID string `config:"id"` | ||
} | ||
if err := config.Unpack(&tmp); err != nil { | ||
return "", fmt.Errorf("error extracting ID: %w", err) | ||
} | ||
if tmp.ID != "" { | ||
return tmp.ID, nil | ||
} | ||
|
||
var h map[string]interface{} | ||
_ = config.Unpack(&h) | ||
id, err := hashstructure.Hash(h, nil) | ||
if err != nil { | ||
return "", fmt.Errorf("can not compute ID from configuration: %w", err) | ||
} | ||
}) | ||
|
||
return fmt.Sprintf("%16X", id), nil | ||
} | ||
log.Debugf("udp input '%v' initialized", ctx.ID) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No need to include the ID in the message since the v2 framework includes the ID in the logger's context.
efd6 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
// Run starts and start the UDP server and read events from the socket | ||
func (p *Input) Run() { | ||
p.Lock() | ||
defer p.Unlock() | ||
|
||
if !p.started { | ||
logp.Info("Starting UDP input") | ||
err := p.udp.Start() | ||
if err != nil { | ||
logp.Err("Error running harvester: %v", err) | ||
} | ||
p.started = true | ||
err := server.Run(ctxtool.FromCanceller(ctx.Cancelation)) | ||
// Ignore error from 'Run' in case shutdown was signaled. | ||
if ctxerr := ctx.Cancelation.Err(); ctxerr != nil { | ||
err = ctxerr | ||
} | ||
} | ||
|
||
// Stop stops the UDP input | ||
func (p *Input) Stop() { | ||
defer p.outlet.Close() | ||
p.Lock() | ||
defer p.Unlock() | ||
|
||
logp.Info("Stopping UDP input") | ||
p.udp.Stop() | ||
p.metrics.close() | ||
p.started = false | ||
} | ||
|
||
// Wait suspends the UDP input | ||
func (p *Input) Wait() { | ||
p.Stop() | ||
return err | ||
} | ||
|
||
// inputMetrics handles the input's metric reporting. | ||
|
This file was deleted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's remove the TODOs. I think these have been "decided" given how long they have been the defaults.