diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 2996c436e20d..f9cdc611dfc9 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -108,6 +108,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff] - Fix TestMultiEventForEOFRetryHandlerInput unit test of CometD input {pull}34903[34903] - Add input instance id to request trace filename for httpjson and cel inputs {pull}35024[35024] - Fix panic in TCP and UDP inputs on Linux when collecting socket metrics from OS. {issue}35064[35064] +- Correctly collect TCP and UDP metrics for unspecified address values. {pull}35111[35111] *Heartbeat* diff --git a/filebeat/input/tcp/input.go b/filebeat/input/tcp/input.go index 6bb444d3523f..1528d8bd165e 100644 --- a/filebeat/input/tcp/input.go +++ b/filebeat/input/tcp/input.go @@ -20,6 +20,7 @@ package tcp import ( "bytes" "encoding/binary" + "encoding/hex" "errors" "fmt" "net" @@ -242,11 +243,15 @@ func (m *inputMetrics) log(data []byte, timestamp time.Time) { // poll periodically gets TCP buffer stats from the OS. func (m *inputMetrics) poll(addr []string, each time.Duration, log *logp.Logger) { + hasUnspecified, addrIsUnspecified, badAddr := containsUnspecifiedAddr(addr) + if badAddr != nil { + log.Warnf("failed to parse addrs for metric collection %q", badAddr) + } t := time.NewTicker(each) for { select { case <-t.C: - rx, err := procNetTCP("/proc/net/tcp", addr) + rx, err := procNetTCP("/proc/net/tcp", addr, hasUnspecified, addrIsUnspecified) if err != nil { log.Warnf("failed to get tcp stats from /proc: %v", err) continue @@ -259,11 +264,33 @@ func (m *inputMetrics) poll(addr []string, each time.Duration, log *logp.Logger) } } +func containsUnspecifiedAddr(addr []string) (yes bool, which []bool, bad []string) { + which = make([]bool, len(addr)) + for i, a := range addr { + prefix, _, ok := strings.Cut(a, ":") + if !ok { + continue + } + ip, err := hex.DecodeString(prefix) + if err != nil { + bad = append(bad, a) + } + if net.IP(ip).IsUnspecified() { + yes = true + which[i] = true + } + } + return yes, which, bad +} + // procNetTCP returns the rx_queue field of the TCP socket table for the // socket on the provided address formatted in hex, xxxxxxxx:xxxx. // This function is only useful on linux due to its dependence on the /proc -// filesystem, but is kept in this file for simplicity. -func procNetTCP(path string, addr []string) (rx int64, err error) { +// filesystem, but is kept in this file for simplicity. If hasUnspecified +// is true, all addresses listed in the file in path are considered, and the +// sum of rx_queue matching the addr ports is returned where the corresponding +// addrIsUnspecified is true. +func procNetTCP(path string, addr []string, hasUnspecified bool, addrIsUnspecified []bool) (rx int64, err error) { b, err := os.ReadFile(path) if err != nil { return 0, err @@ -272,26 +299,43 @@ func procNetTCP(path string, addr []string) (rx int64, err error) { if len(lines) < 2 { return 0, fmt.Errorf("%s entry not found for %s (no line)", path, addr) } + var found bool for _, l := range lines[1:] { f := bytes.Fields(l) - if len(f) > 4 && contains(f[1], addr) { + if len(f) > 4 && contains(f[1], addr, addrIsUnspecified) { _, r, ok := bytes.Cut(f[4], []byte(":")) if !ok { return 0, errors.New("no rx_queue field " + string(f[4])) } - rx, err = strconv.ParseInt(string(r), 16, 64) + found = true + + v, err := strconv.ParseInt(string(r), 16, 64) if err != nil { return 0, fmt.Errorf("failed to parse rx_queue: %w", err) } + rx += v + + if hasUnspecified { + continue + } return rx, nil } } + if found { + return rx, nil + } return 0, fmt.Errorf("%s entry not found for %s", path, addr) } -func contains(b []byte, addr []string) bool { - for _, a := range addr { - if strings.EqualFold(string(b), a) { +func contains(b []byte, addr []string, addrIsUnspecified []bool) bool { + for i, a := range addr { + if addrIsUnspecified[i] { + _, ap, pok := strings.Cut(a, ":") + _, bp, bok := bytes.Cut(b, []byte(":")) + if pok && bok && strings.EqualFold(string(bp), ap) { + return true + } + } else if strings.EqualFold(string(b), a) { return true } } diff --git a/filebeat/input/tcp/input_test.go b/filebeat/input/tcp/input_test.go index f36d275478b8..ff0506f0ee0b 100644 --- a/filebeat/input/tcp/input_test.go +++ b/filebeat/input/tcp/input_test.go @@ -25,15 +25,42 @@ import ( func TestProcNetTCP(t *testing.T) { t.Run("with_match", func(t *testing.T) { - rx, err := procNetTCP("testdata/proc_net_tcp.txt", []string{"0100007F:17AC"}) + addr := []string{"0100007F:17AC"} + hasUnspecified, addrIsUnspecified, bad := containsUnspecifiedAddr(addr) + rx, err := procNetTCP("testdata/proc_net_tcp.txt", addr, hasUnspecified, addrIsUnspecified) if err != nil { t.Fatal(err) } + assert.Nil(t, bad) assert.EqualValues(t, 1, rx) }) + t.Run("unspecified", func(t *testing.T) { + addr := []string{"00000000:17AC"} + hasUnspecified, addrIsUnspecified, bad := containsUnspecifiedAddr(addr) + rx, err := procNetTCP("testdata/proc_net_tcp.txt", addr, hasUnspecified, addrIsUnspecified) + if err != nil { + t.Fatal(err) + } + assert.Nil(t, bad) + assert.EqualValues(t, 2, rx) + }) + t.Run("without_match", func(t *testing.T) { - _, err := procNetTCP("testdata/proc_net_tcp.txt", []string{"FOO:BAR", "BAR:BAZ"}) + addr := []string{"deadbeef:f00d", "ba1dface:1135"} + hasUnspecified, addrIsUnspecified, bad := containsUnspecifiedAddr(addr) + _, err := procNetTCP("testdata/proc_net_tcp.txt", addr, hasUnspecified, addrIsUnspecified) + assert.Nil(t, bad) + if assert.Error(t, err) { + assert.Contains(t, err.Error(), "entry not found") + } + }) + + t.Run("bad_addrs", func(t *testing.T) { + addr := []string{"FOO:BAR", "BAR:BAZ"} + hasUnspecified, addrIsUnspecified, bad := containsUnspecifiedAddr(addr) + _, err := procNetTCP("testdata/proc_net_tcp.txt", addr, hasUnspecified, addrIsUnspecified) + assert.EqualValues(t, addr, bad) if assert.Error(t, err) { assert.Contains(t, err.Error(), "entry not found") } diff --git a/filebeat/input/tcp/testdata/proc_net_tcp.txt b/filebeat/input/tcp/testdata/proc_net_tcp.txt index d82c6309bbba..57c6075eccad 100644 --- a/filebeat/input/tcp/testdata/proc_net_tcp.txt +++ b/filebeat/input/tcp/testdata/proc_net_tcp.txt @@ -1,2 +1,4 @@ sl local_address rem_address st tx_queue rx_queue tr tm->when retrnsmt uid timeout inode 1: 0100007F:17AC 00000000:0000 0A 00000000:00000001 00:00000000 00000000 0 0 104724420 1 0000000000000000 100 0 0 10 0 + 1: 8100007F:17AC 00000000:0000 0A 00000000:00000001 00:00000000 00000000 0 0 104724420 1 0000000000000000 100 0 0 10 0 + 1: 8100007F:17AD 00000000:0000 0A 00000000:00000001 00:00000000 00000000 0 0 104724420 1 0000000000000000 100 0 0 10 0 diff --git a/filebeat/input/udp/input.go b/filebeat/input/udp/input.go index 05c5644dde25..6bcc2bd624c6 100644 --- a/filebeat/input/udp/input.go +++ b/filebeat/input/udp/input.go @@ -20,6 +20,7 @@ package udp import ( "bytes" "encoding/binary" + "encoding/hex" "errors" "fmt" "net" @@ -235,11 +236,15 @@ func (m *inputMetrics) log(data []byte, timestamp time.Time) { // poll periodically gets UDP buffer and packet drops stats from the OS. func (m *inputMetrics) poll(addr []string, each time.Duration, log *logp.Logger) { + hasUnspecified, addrIsUnspecified, badAddr := containsUnspecifiedAddr(addr) + if badAddr != nil { + log.Warnf("failed to parse addrs for metric collection %q", badAddr) + } t := time.NewTicker(each) for { select { case <-t.C: - rx, drops, err := procNetUDP("/proc/net/udp", addr) + rx, drops, err := procNetUDP("/proc/net/udp", addr, hasUnspecified, addrIsUnspecified) if err != nil { log.Warnf("failed to get udp stats from /proc: %v", err) continue @@ -253,11 +258,33 @@ func (m *inputMetrics) poll(addr []string, each time.Duration, log *logp.Logger) } } +func containsUnspecifiedAddr(addr []string) (yes bool, which []bool, bad []string) { + which = make([]bool, len(addr)) + for i, a := range addr { + prefix, _, ok := strings.Cut(a, ":") + if !ok { + continue + } + ip, err := hex.DecodeString(prefix) + if err != nil { + bad = append(bad, a) + } + if net.IP(ip).IsUnspecified() { + yes = true + which[i] = true + } + } + return yes, which, bad +} + // procNetUDP returns the rx_queue and drops field of the UDP socket table // for the socket on the provided address formatted in hex, xxxxxxxx:xxxx. // This function is only useful on linux due to its dependence on the /proc -// filesystem, but is kept in this file for simplicity. -func procNetUDP(path string, addr []string) (rx, drops int64, err error) { +// filesystem, but is kept in this file for simplicity. If hasUnspecified +// is true, all addresses listed in the file in path are considered, and the +// sum of rx_queue and drops matching the addr ports is returned where the +// corresponding addrIsUnspecified is true. +func procNetUDP(path string, addr []string, hasUnspecified bool, addrIsUnspecified []bool) (rx, drops int64, err error) { b, err := os.ReadFile(path) if err != nil { return 0, 0, err @@ -266,30 +293,49 @@ func procNetUDP(path string, addr []string) (rx, drops int64, err error) { if len(lines) < 2 { return 0, 0, fmt.Errorf("%s entry not found for %s (no line)", path, addr) } + var found bool for _, l := range lines[1:] { f := bytes.Fields(l) - if len(f) > 12 && contains(f[1], addr) { + if len(f) > 12 && contains(f[1], addr, addrIsUnspecified) { _, r, ok := bytes.Cut(f[4], []byte(":")) if !ok { return 0, 0, errors.New("no rx_queue field " + string(f[4])) } - rx, err = strconv.ParseInt(string(r), 16, 64) + found = true + + v, err := strconv.ParseInt(string(r), 16, 64) if err != nil { return 0, 0, fmt.Errorf("failed to parse rx_queue: %w", err) } - drops, err = strconv.ParseInt(string(f[12]), 16, 64) + rx += v + + v, err = strconv.ParseInt(string(f[12]), 16, 64) if err != nil { return 0, 0, fmt.Errorf("failed to parse drops: %w", err) } + drops += v + + if hasUnspecified { + continue + } return rx, drops, nil } } + if found { + return rx, drops, nil + } return 0, 0, fmt.Errorf("%s entry not found for %s", path, addr) } -func contains(b []byte, addr []string) bool { - for _, a := range addr { - if strings.EqualFold(string(b), a) { +func contains(b []byte, addr []string, addrIsUnspecified []bool) bool { + for i, a := range addr { + if addrIsUnspecified[i] { + _, ap, pok := strings.Cut(a, ":") + _, bp, bok := bytes.Cut(b, []byte(":")) + if pok && bok && strings.EqualFold(string(bp), ap) { + return true + } + } else if strings.EqualFold(string(b), a) { return true } } diff --git a/filebeat/input/udp/input_test.go b/filebeat/input/udp/input_test.go index 193b7d708cde..de8a8fa31869 100644 --- a/filebeat/input/udp/input_test.go +++ b/filebeat/input/udp/input_test.go @@ -25,16 +25,44 @@ import ( func TestProcNetUDP(t *testing.T) { t.Run("with_match", func(t *testing.T) { - rx, drops, err := procNetUDP("testdata/proc_net_udp.txt", []string{"2508640A:1BBE"}) + addr := []string{"2508640A:1BBE"} + hasUnspecified, addrIsUnspecified, bad := containsUnspecifiedAddr(addr) + rx, drops, err := procNetUDP("testdata/proc_net_udp.txt", addr, hasUnspecified, addrIsUnspecified) if err != nil { t.Fatal(err) } + assert.Nil(t, bad) assert.EqualValues(t, 1, rx) assert.EqualValues(t, 2, drops) }) + t.Run("unspecified", func(t *testing.T) { + addr := []string{"00000000:1BBE"} + hasUnspecified, addrIsUnspecified, bad := containsUnspecifiedAddr(addr) + rx, drops, err := procNetUDP("testdata/proc_net_udp.txt", addr, hasUnspecified, addrIsUnspecified) + if err != nil { + t.Fatal(err) + } + assert.Nil(t, bad) + assert.EqualValues(t, 2, rx) + assert.EqualValues(t, 4, drops) + }) + t.Run("without_match", func(t *testing.T) { - _, _, err := procNetUDP("testdata/proc_net_udp.txt", []string{"FOO:BAR", "BAR:BAZ"}) + addr := []string{"deadbeef:f00d", "ba1dface:1135"} + hasUnspecified, addrIsUnspecified, bad := containsUnspecifiedAddr(addr) + _, _, err := procNetUDP("testdata/proc_net_udp.txt", addr, hasUnspecified, addrIsUnspecified) + assert.Nil(t, bad) + if assert.Error(t, err) { + assert.Contains(t, err.Error(), "entry not found") + } + }) + + t.Run("bad_addrs", func(t *testing.T) { + addr := []string{"FOO:BAR", "BAR:BAZ"} + hasUnspecified, addrIsUnspecified, bad := containsUnspecifiedAddr(addr) + _, _, err := procNetUDP("testdata/proc_net_udp.txt", addr, hasUnspecified, addrIsUnspecified) + assert.EqualValues(t, addr, bad) if assert.Error(t, err) { assert.Contains(t, err.Error(), "entry not found") } diff --git a/filebeat/input/udp/testdata/proc_net_udp.txt b/filebeat/input/udp/testdata/proc_net_udp.txt index 69efc6efb96f..a3000248cc8c 100644 --- a/filebeat/input/udp/testdata/proc_net_udp.txt +++ b/filebeat/input/udp/testdata/proc_net_udp.txt @@ -19,3 +19,5 @@ 1570: 2508640A:6CB3 00000000:0000 07 00000000:00000000 00:00000000 00000000 0 0 104819249 2 0000000000000000 0 1694: 2508640A:D52F 2208640A:0035 01 00000000:00000000 00:00000000 00000000 0 0 112991224 2 0000000000000000 0 1938: 2508640A:5E23 00000000:0000 07 00000000:00000000 00:00000000 00000000 0 0 104833716 2 0000000000000000 0 + 1325: 3509640A:1BBE 00000000:0000 07 00000000:00000001 00:00000000 00000000 0 0 104836465 2 0000000000000000 2 + 1325: 3509640A:1BBF 00000000:0000 07 00000000:00000001 00:00000000 00000000 0 0 104836465 2 0000000000000000 2