-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
filebeat/input/udp: add initial UDP metrics support #33870
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,18 +18,25 @@ | |
package udp | ||
|
||
import ( | ||
"fmt" | ||
"sync" | ||
"time" | ||
|
||
"github.com/mitchellh/hashstructure" | ||
"github.com/rcrowley/go-metrics" | ||
|
||
"github.com/elastic/beats/v7/filebeat/channel" | ||
"github.com/elastic/beats/v7/filebeat/harvester" | ||
"github.com/elastic/beats/v7/filebeat/input" | ||
"github.com/elastic/beats/v7/filebeat/inputsource" | ||
"github.com/elastic/beats/v7/filebeat/inputsource/udp" | ||
"github.com/elastic/beats/v7/libbeat/beat" | ||
"github.com/elastic/beats/v7/libbeat/monitoring/inputmon" | ||
conf "github.com/elastic/elastic-agent-libs/config" | ||
"github.com/elastic/elastic-agent-libs/logp" | ||
"github.com/elastic/elastic-agent-libs/mapstr" | ||
"github.com/elastic/elastic-agent-libs/monitoring" | ||
"github.com/elastic/elastic-agent-libs/monitoring/adapter" | ||
) | ||
|
||
func init() { | ||
|
@@ -45,14 +52,16 @@ type Input struct { | |
udp *udp.Server | ||
started bool | ||
outlet channel.Outleter | ||
|
||
metrics *inputMetrics | ||
} | ||
|
||
// NewInput creates a new udp input | ||
func NewInput( | ||
cfg *conf.C, | ||
outlet channel.Connector, | ||
context input.Context, | ||
) (input.Input, error) { | ||
func NewInput(cfg *conf.C, outlet channel.Connector, context input.Context) (input.Input, error) { | ||
id, err := configID(cfg) | ||
if err != nil { | ||
return nil, err | ||
} | ||
out, err := outlet.Connect(cfg) | ||
if err != nil { | ||
return nil, err | ||
|
@@ -64,6 +73,7 @@ func NewInput( | |
} | ||
|
||
forwarder := harvester.NewForwarder(out) | ||
metrics := newInputMetrics(id, config.Host, uint64(config.ReadBuffer)) | ||
callback := func(data []byte, metadata inputsource.NetworkMetadata) { | ||
evt := beat.Event{ | ||
Timestamp: time.Now(), | ||
|
@@ -82,6 +92,10 @@ func NewInput( | |
} | ||
} | ||
_ = forwarder.Send(evt) | ||
|
||
// This must be called after forwarder.Send to measure | ||
// the processing time metric. | ||
metrics.log(data, evt.Timestamp) | ||
} | ||
|
||
udp := udp.New(&config.Config, callback) | ||
|
@@ -90,9 +104,31 @@ func NewInput( | |
outlet: out, | ||
udp: udp, | ||
started: false, | ||
metrics: metrics, | ||
}, nil | ||
} | ||
|
||
func configID(config *conf.C) (string, error) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I didn't realize that this input is not converted to the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It looks reasonably straight forward. The only inputs here that are v2 are filestream, journald, kafka, unix and winlog. The update would be better in another PR. |
||
var tmp struct { | ||
ID string `config:"id"` | ||
} | ||
if err := config.Unpack(&tmp); err != nil { | ||
return "", fmt.Errorf("error extracting ID: %w", err) | ||
} | ||
if tmp.ID != "" { | ||
return tmp.ID, nil | ||
} | ||
|
||
var h map[string]interface{} | ||
_ = config.Unpack(&h) | ||
id, err := hashstructure.Hash(h, nil) | ||
if err != nil { | ||
return "", fmt.Errorf("can not compute ID from configuration: %w", err) | ||
} | ||
|
||
return fmt.Sprintf("%16X", id), nil | ||
} | ||
|
||
// Run starts and start the UDP server and read events from the socket | ||
func (p *Input) Run() { | ||
p.Lock() | ||
|
@@ -116,10 +152,73 @@ func (p *Input) Stop() { | |
|
||
logp.Info("Stopping UDP input") | ||
p.udp.Stop() | ||
p.metrics.close() | ||
p.started = false | ||
} | ||
|
||
// Wait suspends the UDP input | ||
func (p *Input) Wait() { | ||
p.Stop() | ||
} | ||
|
||
// inputMetrics handles the input's metric reporting. | ||
type inputMetrics struct { | ||
unregister func() | ||
|
||
lastPacket time.Time | ||
|
||
device *monitoring.String // name of the device being monitored | ||
packets *monitoring.Uint // number of packets processed | ||
bytes *monitoring.Uint // number of bytes processed | ||
bufferLen *monitoring.Uint // configured read buffer length | ||
arrivalPeriod metrics.Sample // histogram of the elapsed time between packet arrivals | ||
processingTime metrics.Sample // histogram of the elapsed time between packet receipt and publication | ||
} | ||
|
||
// newInputMetrics returns an input metric for the UDP processor. If id is empty | ||
// a nil inputMetric is returned. | ||
func newInputMetrics(id, device string, buflen uint64) *inputMetrics { | ||
if id == "" { | ||
return nil | ||
} | ||
reg, unreg := inputmon.NewInputRegistry("udp", id+"::"+device, nil) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For the case of Fleet where an |
||
out := &inputMetrics{ | ||
unregister: unreg, | ||
bufferLen: monitoring.NewUint(reg, "udp_read_buffer_length"), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Our monitoring package does not have the concept of a gauge so I have been suffixing new gauges with |
||
device: monitoring.NewString(reg, "device"), | ||
packets: monitoring.NewUint(reg, "udp_packets"), | ||
bytes: monitoring.NewUint(reg, "udp_bytes"), | ||
arrivalPeriod: metrics.NewUniformSample(1024), | ||
processingTime: metrics.NewUniformSample(1024), | ||
} | ||
_ = adapter.NewGoMetrics(reg, "udp_arrival_period", adapter.Accept). | ||
Register("histogram", metrics.NewHistogram(out.arrivalPeriod)) | ||
_ = adapter.NewGoMetrics(reg, "udp_processing_time", adapter.Accept). | ||
Register("histogram", metrics.NewHistogram(out.processingTime)) | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let me propose some slightly more generic names. Before we ship this in the next release I plan to look across the inputs to see if there are opportunities for alignment on naming.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Is the first of these intended to be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yep. That should have been received_bytes_total. (updated original comment) |
||
out.device.Set(device) | ||
out.bufferLen.Set(buflen) | ||
|
||
return out | ||
} | ||
|
||
// log logs metric for the given packet. | ||
func (m *inputMetrics) log(data []byte, timestamp time.Time) { | ||
if m == nil { | ||
return | ||
} | ||
m.processingTime.Update(time.Since(timestamp).Nanoseconds()) | ||
m.packets.Add(1) | ||
m.bytes.Add(uint64(len(data))) | ||
if !m.lastPacket.IsZero() { | ||
m.arrivalPeriod.Update(timestamp.Sub(m.lastPacket).Nanoseconds()) | ||
} | ||
m.lastPacket = timestamp | ||
} | ||
|
||
func (m *inputMetrics) close() { | ||
if m != nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ooops. Will send a fix. #33920 |
||
return | ||
} | ||
m.unregister() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is probably a good thing to factor out into inputmon.