From 6cd27d6d5405c9ae3161c3288c2ccb2eae5f2a98 Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Wed, 30 Nov 2022 10:49:50 +1030 Subject: [PATCH 1/2] filebeat/input/udp: add initial UDP metrics support This adds metrics for UDP packet count and total bytes, and histograms for time required to process UDP packets prior to acking from a publication and time between UDP packet arrivals. --- CHANGELOG.next.asciidoc | 1 + filebeat/input/udp/input.go | 109 ++++++++++++++++++- filebeat/inputsource/common/dgram/handler.go | 11 +- 3 files changed, 108 insertions(+), 13 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 9154a8dd4d6f..c2f1d1dbe39e 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -171,6 +171,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff] - Add support for http+unix and http+npipe schemes in httpjson input. {issue}33571[33571] {pull}33610[33610] - Add support for http+unix and http+npipe schemes in cel input. {issue}33571[33571] {pull}33712[33712] - Add `decode_duration`, `move_fields` processors. {pull}31301[31301] +- Add metrics for UDP packet processing. {pull}33870[33870] *Auditbeat* diff --git a/filebeat/input/udp/input.go b/filebeat/input/udp/input.go index 1f6129f6c23d..d46743463be0 100644 --- a/filebeat/input/udp/input.go +++ b/filebeat/input/udp/input.go @@ -18,18 +18,25 @@ package udp import ( + "fmt" "sync" "time" + "github.com/mitchellh/hashstructure" + "github.com/rcrowley/go-metrics" + "github.com/elastic/beats/v7/filebeat/channel" "github.com/elastic/beats/v7/filebeat/harvester" "github.com/elastic/beats/v7/filebeat/input" "github.com/elastic/beats/v7/filebeat/inputsource" "github.com/elastic/beats/v7/filebeat/inputsource/udp" "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/monitoring/inputmon" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" + "github.com/elastic/elastic-agent-libs/monitoring" + "github.com/elastic/elastic-agent-libs/monitoring/adapter" ) func init() { @@ -45,14 +52,16 @@ type Input struct { udp *udp.Server started bool outlet channel.Outleter + + metrics *inputMetrics } // NewInput creates a new udp input -func NewInput( - cfg *conf.C, - outlet channel.Connector, - context input.Context, -) (input.Input, error) { +func NewInput(cfg *conf.C, outlet channel.Connector, context input.Context) (input.Input, error) { + id, err := configID(cfg) + if err != nil { + return nil, err + } out, err := outlet.Connect(cfg) if err != nil { return nil, err @@ -64,6 +73,7 @@ func NewInput( } forwarder := harvester.NewForwarder(out) + metrics := newInputMetrics(id, config.Host, uint64(config.ReadBuffer)) callback := func(data []byte, metadata inputsource.NetworkMetadata) { evt := beat.Event{ Timestamp: time.Now(), @@ -82,6 +92,10 @@ func NewInput( } } _ = forwarder.Send(evt) + + // This must be called after forwarder.Send to measure + // the processing time metric. + metrics.log(data, evt.Timestamp) } udp := udp.New(&config.Config, callback) @@ -90,9 +104,31 @@ func NewInput( outlet: out, udp: udp, started: false, + metrics: metrics, }, nil } +func configID(config *conf.C) (string, error) { + var tmp struct { + ID string `config:"id"` + } + if err := config.Unpack(&tmp); err != nil { + return "", fmt.Errorf("error extracting ID: %w", err) + } + if tmp.ID != "" { + return tmp.ID, nil + } + + var h map[string]interface{} + _ = config.Unpack(&h) + id, err := hashstructure.Hash(h, nil) + if err != nil { + return "", fmt.Errorf("can not compute ID from configuration: %w", err) + } + + return fmt.Sprintf("%16X", id), nil +} + // Run starts and start the UDP server and read events from the socket func (p *Input) Run() { p.Lock() @@ -116,6 +152,7 @@ func (p *Input) Stop() { logp.Info("Stopping UDP input") p.udp.Stop() + p.metrics.close() p.started = false } @@ -123,3 +160,65 @@ func (p *Input) Stop() { func (p *Input) Wait() { p.Stop() } + +// inputMetrics handles the input's metric reporting. +type inputMetrics struct { + unregister func() + + lastPacket time.Time + + device *monitoring.String // name of the device being monitored + packets *monitoring.Uint // number of packets processed + bytes *monitoring.Uint // number of bytes processed + bufferLen *monitoring.Uint // configured read buffer length + arrivalPeriod metrics.Sample // histogram of the elapsed time between packet arrivals + processingTime metrics.Sample // histogram of the elapsed time between packet receipt and publication +} + +// newInputMetrics returns an input metric for the UDP processor. If id is empty +// a nil inputMetric is returned. +func newInputMetrics(id, device string, buflen uint64) *inputMetrics { + if id == "" { + return nil + } + reg, unreg := inputmon.NewInputRegistry("udp", id+"::"+device, nil) + out := &inputMetrics{ + unregister: unreg, + bufferLen: monitoring.NewUint(reg, "udp_read_buffer_length"), + device: monitoring.NewString(reg, "device"), + packets: monitoring.NewUint(reg, "udp_packets"), + bytes: monitoring.NewUint(reg, "udp_bytes"), + arrivalPeriod: metrics.NewUniformSample(1024), + processingTime: metrics.NewUniformSample(1024), + } + _ = adapter.NewGoMetrics(reg, "udp_arrival_period", adapter.Accept). + Register("histogram", metrics.NewHistogram(out.arrivalPeriod)) + _ = adapter.NewGoMetrics(reg, "udp_processing_time", adapter.Accept). + Register("histogram", metrics.NewHistogram(out.processingTime)) + + out.device.Set(device) + out.bufferLen.Set(buflen) + + return out +} + +// log logs metric for the given packet. +func (m *inputMetrics) log(data []byte, timestamp time.Time) { + if m == nil { + return + } + m.processingTime.Update(time.Since(timestamp).Nanoseconds()) + m.packets.Add(1) + m.bytes.Add(uint64(len(data))) + if !m.lastPacket.IsZero() { + m.arrivalPeriod.Update(timestamp.Sub(m.lastPacket).Nanoseconds()) + } + m.lastPacket = timestamp +} + +func (m *inputMetrics) close() { + if m != nil { + return + } + m.unregister() +} diff --git a/filebeat/inputsource/common/dgram/handler.go b/filebeat/inputsource/common/dgram/handler.go index 0072ee8bbb61..f9ce8c6c648d 100644 --- a/filebeat/inputsource/common/dgram/handler.go +++ b/filebeat/inputsource/common/dgram/handler.go @@ -19,7 +19,6 @@ package dgram import ( "context" - "fmt" "net" "runtime" "strings" @@ -38,11 +37,7 @@ type ConnectionHandler func(context.Context, net.PacketConn) error type MetadataFunc func(net.Conn) inputsource.NetworkMetadata // DatagramReaderFactory allows creation of a handler which can read packets from connections. -func DatagramReaderFactory( - family inputsource.Family, - logger *logp.Logger, - callback inputsource.NetworkFunc, -) HandlerFactory { +func DatagramReaderFactory(family inputsource.Family, logger *logp.Logger, callback inputsource.NetworkFunc) HandlerFactory { return func(config ListenerConfig) ConnectionHandler { return ConnectionHandler(func(ctx context.Context, conn net.PacketConn) error { for ctx.Err() == nil { @@ -58,7 +53,7 @@ func DatagramReaderFactory( length, addr, err := conn.ReadFrom(buffer) if err != nil { if family == inputsource.FamilyUnix { - fmt.Println("connection handler error", err) + logger.Info("connection handler error", err) } // don't log any deadline events. e, ok := err.(net.Error) @@ -88,7 +83,7 @@ func DatagramReaderFactory( callback(buffer[:length], inputsource.NetworkMetadata{RemoteAddr: addr}) } } - fmt.Println("end of connection handling") + logger.Debug("end of connection handling") return nil }) } From 923a29020203c2bfec6d2684475cd9e89d56193e Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Thu, 1 Dec 2022 14:11:09 +1030 Subject: [PATCH 2/2] address pr comments --- filebeat/docs/inputs/input-udp.asciidoc | 18 ++++++++++++++++++ filebeat/input/udp/input.go | 12 ++++++------ 2 files changed, 24 insertions(+), 6 deletions(-) diff --git a/filebeat/docs/inputs/input-udp.asciidoc b/filebeat/docs/inputs/input-udp.asciidoc index f6c63c828fed..6f88b7758075 100644 --- a/filebeat/docs/inputs/input-udp.asciidoc +++ b/filebeat/docs/inputs/input-udp.asciidoc @@ -27,6 +27,24 @@ The `udp` input supports the following configuration options plus the include::../inputs/input-common-udp-options.asciidoc[] +[float] +=== Metrics + +This input exposes metrics under the <>. +These metrics are exposed under the `/dataset` path. They can be used to +observe the activity of the input. + +[options="header"] +|======= +| Metric | Description +| `device` | Host/port of the UDP stream. +| `udp_read_buffer_length_gauge` | Size of the UDP socket buffer length in bytes (gauge). +| `received_events_total` | Total number of packets (events) that have been received. +| `received_bytes_total` | Total number of bytes received. +| `arrival_period` | Histogram of the time between successive packets in nanoseconds. +| `processing_time` | Histogram of the time taken to process packets in nanoseconds. +|======= + [id="{beatname_lc}-input-{type}-common-options"] include::../inputs/input-common-options.asciidoc[] diff --git a/filebeat/input/udp/input.go b/filebeat/input/udp/input.go index d46743463be0..c5c33c692b06 100644 --- a/filebeat/input/udp/input.go +++ b/filebeat/input/udp/input.go @@ -181,19 +181,19 @@ func newInputMetrics(id, device string, buflen uint64) *inputMetrics { if id == "" { return nil } - reg, unreg := inputmon.NewInputRegistry("udp", id+"::"+device, nil) + reg, unreg := inputmon.NewInputRegistry("udp", id, nil) out := &inputMetrics{ unregister: unreg, - bufferLen: monitoring.NewUint(reg, "udp_read_buffer_length"), + bufferLen: monitoring.NewUint(reg, "udp_read_buffer_length_gauge"), device: monitoring.NewString(reg, "device"), - packets: monitoring.NewUint(reg, "udp_packets"), - bytes: monitoring.NewUint(reg, "udp_bytes"), + packets: monitoring.NewUint(reg, "received_events_total"), + bytes: monitoring.NewUint(reg, "received_bytes_total"), arrivalPeriod: metrics.NewUniformSample(1024), processingTime: metrics.NewUniformSample(1024), } - _ = adapter.NewGoMetrics(reg, "udp_arrival_period", adapter.Accept). + _ = adapter.NewGoMetrics(reg, "arrival_period", adapter.Accept). Register("histogram", metrics.NewHistogram(out.arrivalPeriod)) - _ = adapter.NewGoMetrics(reg, "udp_processing_time", adapter.Accept). + _ = adapter.NewGoMetrics(reg, "processing_time", adapter.Accept). Register("histogram", metrics.NewHistogram(out.processingTime)) out.device.Set(device)