Skip to content

Commit

Permalink
filebeat/input/{tcp,udp}: collect rx and drops metrics for unspecifie…
Browse files Browse the repository at this point in the history
…d addresses (#35111)

Previously we were accepting exact (case-folded) address matches only,
resulting in loss of receive queue and drops metrics when the inputs
were listening on 0.0.0.0. This change fixes that by collecting these
values from all /proc lines that have a matching port when the address
is unspecified.

(cherry picked from commit 097bd76)
  • Loading branch information
efd6 authored and mergify[bot] committed Apr 18, 2023
1 parent 04a9e70 commit 7a1f9f4
Show file tree
Hide file tree
Showing 7 changed files with 171 additions and 21 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
[Gcs Input] - Added missing locks for safe concurrency {pull}34914[34914]
- Add input instance id to request trace filename for httpjson and cel inputs {pull}35037[35037]
- Fix panic in TCP and UDP inputs on Linux when collecting socket metrics from OS. {issue}35064[35064]
- Correctly collect TCP and UDP metrics for unspecified address values. {pull}35111[35111]

*Heartbeat*

Expand Down
60 changes: 52 additions & 8 deletions filebeat/input/tcp/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package tcp
import (
"bytes"
"encoding/binary"
"encoding/hex"
"errors"
"fmt"
"net"
Expand Down Expand Up @@ -242,11 +243,15 @@ func (m *inputMetrics) log(data []byte, timestamp time.Time) {

// poll periodically gets TCP buffer stats from the OS.
func (m *inputMetrics) poll(addr []string, each time.Duration, log *logp.Logger) {
hasUnspecified, addrIsUnspecified, badAddr := containsUnspecifiedAddr(addr)
if badAddr != nil {
log.Warnf("failed to parse addrs for metric collection %q", badAddr)
}
t := time.NewTicker(each)
for {
select {
case <-t.C:
rx, err := procNetTCP("/proc/net/tcp", addr)
rx, err := procNetTCP("/proc/net/tcp", addr, hasUnspecified, addrIsUnspecified)
if err != nil {
log.Warnf("failed to get tcp stats from /proc: %v", err)
continue
Expand All @@ -259,11 +264,33 @@ func (m *inputMetrics) poll(addr []string, each time.Duration, log *logp.Logger)
}
}

func containsUnspecifiedAddr(addr []string) (yes bool, which []bool, bad []string) {
which = make([]bool, len(addr))
for i, a := range addr {
prefix, _, ok := strings.Cut(a, ":")
if !ok {
continue
}
ip, err := hex.DecodeString(prefix)
if err != nil {
bad = append(bad, a)
}
if net.IP(ip).IsUnspecified() {
yes = true
which[i] = true
}
}
return yes, which, bad
}

// procNetTCP returns the rx_queue field of the TCP socket table for the
// socket on the provided address formatted in hex, xxxxxxxx:xxxx.
// This function is only useful on linux due to its dependence on the /proc
// filesystem, but is kept in this file for simplicity.
func procNetTCP(path string, addr []string) (rx int64, err error) {
// filesystem, but is kept in this file for simplicity. If hasUnspecified
// is true, all addresses listed in the file in path are considered, and the
// sum of rx_queue matching the addr ports is returned where the corresponding
// addrIsUnspecified is true.
func procNetTCP(path string, addr []string, hasUnspecified bool, addrIsUnspecified []bool) (rx int64, err error) {
b, err := os.ReadFile(path)
if err != nil {
return 0, err
Expand All @@ -272,26 +299,43 @@ func procNetTCP(path string, addr []string) (rx int64, err error) {
if len(lines) < 2 {
return 0, fmt.Errorf("%s entry not found for %s (no line)", path, addr)
}
var found bool
for _, l := range lines[1:] {
f := bytes.Fields(l)
if len(f) > 4 && contains(f[1], addr) {
if len(f) > 4 && contains(f[1], addr, addrIsUnspecified) {
_, r, ok := bytes.Cut(f[4], []byte(":"))
if !ok {
return 0, errors.New("no rx_queue field " + string(f[4]))
}
rx, err = strconv.ParseInt(string(r), 16, 64)
found = true

v, err := strconv.ParseInt(string(r), 16, 64)
if err != nil {
return 0, fmt.Errorf("failed to parse rx_queue: %w", err)
}
rx += v

if hasUnspecified {
continue
}
return rx, nil
}
}
if found {
return rx, nil
}
return 0, fmt.Errorf("%s entry not found for %s", path, addr)
}

func contains(b []byte, addr []string) bool {
for _, a := range addr {
if strings.EqualFold(string(b), a) {
func contains(b []byte, addr []string, addrIsUnspecified []bool) bool {
for i, a := range addr {
if addrIsUnspecified[i] {
_, ap, pok := strings.Cut(a, ":")
_, bp, bok := bytes.Cut(b, []byte(":"))
if pok && bok && strings.EqualFold(string(bp), ap) {
return true
}
} else if strings.EqualFold(string(b), a) {
return true
}
}
Expand Down
31 changes: 29 additions & 2 deletions filebeat/input/tcp/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,42 @@ import (

func TestProcNetTCP(t *testing.T) {
t.Run("with_match", func(t *testing.T) {
rx, err := procNetTCP("testdata/proc_net_tcp.txt", []string{"0100007F:17AC"})
addr := []string{"0100007F:17AC"}
hasUnspecified, addrIsUnspecified, bad := containsUnspecifiedAddr(addr)
rx, err := procNetTCP("testdata/proc_net_tcp.txt", addr, hasUnspecified, addrIsUnspecified)
if err != nil {
t.Fatal(err)
}
assert.Nil(t, bad)
assert.EqualValues(t, 1, rx)
})

t.Run("unspecified", func(t *testing.T) {
addr := []string{"00000000:17AC"}
hasUnspecified, addrIsUnspecified, bad := containsUnspecifiedAddr(addr)
rx, err := procNetTCP("testdata/proc_net_tcp.txt", addr, hasUnspecified, addrIsUnspecified)
if err != nil {
t.Fatal(err)
}
assert.Nil(t, bad)
assert.EqualValues(t, 2, rx)
})

t.Run("without_match", func(t *testing.T) {
_, err := procNetTCP("testdata/proc_net_tcp.txt", []string{"FOO:BAR", "BAR:BAZ"})
addr := []string{"deadbeef:f00d", "ba1dface:1135"}
hasUnspecified, addrIsUnspecified, bad := containsUnspecifiedAddr(addr)
_, err := procNetTCP("testdata/proc_net_tcp.txt", addr, hasUnspecified, addrIsUnspecified)
assert.Nil(t, bad)
if assert.Error(t, err) {
assert.Contains(t, err.Error(), "entry not found")
}
})

t.Run("bad_addrs", func(t *testing.T) {
addr := []string{"FOO:BAR", "BAR:BAZ"}
hasUnspecified, addrIsUnspecified, bad := containsUnspecifiedAddr(addr)
_, err := procNetTCP("testdata/proc_net_tcp.txt", addr, hasUnspecified, addrIsUnspecified)
assert.EqualValues(t, addr, bad)
if assert.Error(t, err) {
assert.Contains(t, err.Error(), "entry not found")
}
Expand Down
2 changes: 2 additions & 0 deletions filebeat/input/tcp/testdata/proc_net_tcp.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
sl local_address rem_address st tx_queue rx_queue tr tm->when retrnsmt uid timeout inode
1: 0100007F:17AC 00000000:0000 0A 00000000:00000001 00:00000000 00000000 0 0 104724420 1 0000000000000000 100 0 0 10 0
1: 8100007F:17AC 00000000:0000 0A 00000000:00000001 00:00000000 00000000 0 0 104724420 1 0000000000000000 100 0 0 10 0
1: 8100007F:17AD 00000000:0000 0A 00000000:00000001 00:00000000 00000000 0 0 104724420 1 0000000000000000 100 0 0 10 0
64 changes: 55 additions & 9 deletions filebeat/input/udp/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package udp
import (
"bytes"
"encoding/binary"
"encoding/hex"
"errors"
"fmt"
"net"
Expand Down Expand Up @@ -235,11 +236,15 @@ func (m *inputMetrics) log(data []byte, timestamp time.Time) {

// poll periodically gets UDP buffer and packet drops stats from the OS.
func (m *inputMetrics) poll(addr []string, each time.Duration, log *logp.Logger) {
hasUnspecified, addrIsUnspecified, badAddr := containsUnspecifiedAddr(addr)
if badAddr != nil {
log.Warnf("failed to parse addrs for metric collection %q", badAddr)
}
t := time.NewTicker(each)
for {
select {
case <-t.C:
rx, drops, err := procNetUDP("/proc/net/udp", addr)
rx, drops, err := procNetUDP("/proc/net/udp", addr, hasUnspecified, addrIsUnspecified)
if err != nil {
log.Warnf("failed to get udp stats from /proc: %v", err)
continue
Expand All @@ -253,11 +258,33 @@ func (m *inputMetrics) poll(addr []string, each time.Duration, log *logp.Logger)
}
}

func containsUnspecifiedAddr(addr []string) (yes bool, which []bool, bad []string) {
which = make([]bool, len(addr))
for i, a := range addr {
prefix, _, ok := strings.Cut(a, ":")
if !ok {
continue
}
ip, err := hex.DecodeString(prefix)
if err != nil {
bad = append(bad, a)
}
if net.IP(ip).IsUnspecified() {
yes = true
which[i] = true
}
}
return yes, which, bad
}

// procNetUDP returns the rx_queue and drops field of the UDP socket table
// for the socket on the provided address formatted in hex, xxxxxxxx:xxxx.
// This function is only useful on linux due to its dependence on the /proc
// filesystem, but is kept in this file for simplicity.
func procNetUDP(path string, addr []string) (rx, drops int64, err error) {
// filesystem, but is kept in this file for simplicity. If hasUnspecified
// is true, all addresses listed in the file in path are considered, and the
// sum of rx_queue and drops matching the addr ports is returned where the
// corresponding addrIsUnspecified is true.
func procNetUDP(path string, addr []string, hasUnspecified bool, addrIsUnspecified []bool) (rx, drops int64, err error) {
b, err := os.ReadFile(path)
if err != nil {
return 0, 0, err
Expand All @@ -266,30 +293,49 @@ func procNetUDP(path string, addr []string) (rx, drops int64, err error) {
if len(lines) < 2 {
return 0, 0, fmt.Errorf("%s entry not found for %s (no line)", path, addr)
}
var found bool
for _, l := range lines[1:] {
f := bytes.Fields(l)
if len(f) > 12 && contains(f[1], addr) {
if len(f) > 12 && contains(f[1], addr, addrIsUnspecified) {
_, r, ok := bytes.Cut(f[4], []byte(":"))
if !ok {
return 0, 0, errors.New("no rx_queue field " + string(f[4]))
}
rx, err = strconv.ParseInt(string(r), 16, 64)
found = true

v, err := strconv.ParseInt(string(r), 16, 64)
if err != nil {
return 0, 0, fmt.Errorf("failed to parse rx_queue: %w", err)
}
drops, err = strconv.ParseInt(string(f[12]), 16, 64)
rx += v

v, err = strconv.ParseInt(string(f[12]), 16, 64)
if err != nil {
return 0, 0, fmt.Errorf("failed to parse drops: %w", err)
}
drops += v

if hasUnspecified {
continue
}
return rx, drops, nil
}
}
if found {
return rx, drops, nil
}
return 0, 0, fmt.Errorf("%s entry not found for %s", path, addr)
}

func contains(b []byte, addr []string) bool {
for _, a := range addr {
if strings.EqualFold(string(b), a) {
func contains(b []byte, addr []string, addrIsUnspecified []bool) bool {
for i, a := range addr {
if addrIsUnspecified[i] {
_, ap, pok := strings.Cut(a, ":")
_, bp, bok := bytes.Cut(b, []byte(":"))
if pok && bok && strings.EqualFold(string(bp), ap) {
return true
}
} else if strings.EqualFold(string(b), a) {
return true
}
}
Expand Down
32 changes: 30 additions & 2 deletions filebeat/input/udp/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,44 @@ import (

func TestProcNetUDP(t *testing.T) {
t.Run("with_match", func(t *testing.T) {
rx, drops, err := procNetUDP("testdata/proc_net_udp.txt", []string{"2508640A:1BBE"})
addr := []string{"2508640A:1BBE"}
hasUnspecified, addrIsUnspecified, bad := containsUnspecifiedAddr(addr)
rx, drops, err := procNetUDP("testdata/proc_net_udp.txt", addr, hasUnspecified, addrIsUnspecified)
if err != nil {
t.Fatal(err)
}
assert.Nil(t, bad)
assert.EqualValues(t, 1, rx)
assert.EqualValues(t, 2, drops)
})

t.Run("unspecified", func(t *testing.T) {
addr := []string{"00000000:1BBE"}
hasUnspecified, addrIsUnspecified, bad := containsUnspecifiedAddr(addr)
rx, drops, err := procNetUDP("testdata/proc_net_udp.txt", addr, hasUnspecified, addrIsUnspecified)
if err != nil {
t.Fatal(err)
}
assert.Nil(t, bad)
assert.EqualValues(t, 2, rx)
assert.EqualValues(t, 4, drops)
})

t.Run("without_match", func(t *testing.T) {
_, _, err := procNetUDP("testdata/proc_net_udp.txt", []string{"FOO:BAR", "BAR:BAZ"})
addr := []string{"deadbeef:f00d", "ba1dface:1135"}
hasUnspecified, addrIsUnspecified, bad := containsUnspecifiedAddr(addr)
_, _, err := procNetUDP("testdata/proc_net_udp.txt", addr, hasUnspecified, addrIsUnspecified)
assert.Nil(t, bad)
if assert.Error(t, err) {
assert.Contains(t, err.Error(), "entry not found")
}
})

t.Run("bad_addrs", func(t *testing.T) {
addr := []string{"FOO:BAR", "BAR:BAZ"}
hasUnspecified, addrIsUnspecified, bad := containsUnspecifiedAddr(addr)
_, _, err := procNetUDP("testdata/proc_net_udp.txt", addr, hasUnspecified, addrIsUnspecified)
assert.EqualValues(t, addr, bad)
if assert.Error(t, err) {
assert.Contains(t, err.Error(), "entry not found")
}
Expand Down
2 changes: 2 additions & 0 deletions filebeat/input/udp/testdata/proc_net_udp.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,5 @@
1570: 2508640A:6CB3 00000000:0000 07 00000000:00000000 00:00000000 00000000 0 0 104819249 2 0000000000000000 0
1694: 2508640A:D52F 2208640A:0035 01 00000000:00000000 00:00000000 00000000 0 0 112991224 2 0000000000000000 0
1938: 2508640A:5E23 00000000:0000 07 00000000:00000000 00:00000000 00000000 0 0 104833716 2 0000000000000000 0
1325: 3509640A:1BBE 00000000:0000 07 00000000:00000001 00:00000000 00000000 0 0 104836465 2 0000000000000000 2
1325: 3509640A:1BBF 00000000:0000 07 00000000:00000001 00:00000000 00000000 0 0 104836465 2 0000000000000000 2

0 comments on commit 7a1f9f4

Please sign in to comment.