Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

filebeat/input/{tcp,udp}: collect rx and drops metrics for unspecified addresses #35111

Merged
merged 1 commit into from
Apr 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- Fix TestMultiEventForEOFRetryHandlerInput unit test of CometD input {pull}34903[34903]
- Add input instance id to request trace filename for httpjson and cel inputs {pull}35024[35024]
- Fix panic in TCP and UDP inputs on Linux when collecting socket metrics from OS. {issue}35064[35064]
- Correctly collect TCP and UDP metrics for unspecified address values. {pull}35111[35111]

*Heartbeat*

Expand Down
60 changes: 52 additions & 8 deletions filebeat/input/tcp/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package tcp
import (
"bytes"
"encoding/binary"
"encoding/hex"
"errors"
"fmt"
"net"
Expand Down Expand Up @@ -242,11 +243,15 @@ func (m *inputMetrics) log(data []byte, timestamp time.Time) {

// poll periodically gets TCP buffer stats from the OS.
func (m *inputMetrics) poll(addr []string, each time.Duration, log *logp.Logger) {
hasUnspecified, addrIsUnspecified, badAddr := containsUnspecifiedAddr(addr)
if badAddr != nil {
log.Warnf("failed to parse addrs for metric collection %q", badAddr)
}
t := time.NewTicker(each)
for {
select {
case <-t.C:
rx, err := procNetTCP("/proc/net/tcp", addr)
rx, err := procNetTCP("/proc/net/tcp", addr, hasUnspecified, addrIsUnspecified)
if err != nil {
log.Warnf("failed to get tcp stats from /proc: %v", err)
continue
Expand All @@ -259,11 +264,33 @@ func (m *inputMetrics) poll(addr []string, each time.Duration, log *logp.Logger)
}
}

func containsUnspecifiedAddr(addr []string) (yes bool, which []bool, bad []string) {
which = make([]bool, len(addr))
for i, a := range addr {
prefix, _, ok := strings.Cut(a, ":")
if !ok {
continue
}
ip, err := hex.DecodeString(prefix)
if err != nil {
bad = append(bad, a)
}
if net.IP(ip).IsUnspecified() {
yes = true
which[i] = true
}
}
return yes, which, bad
}

// procNetTCP returns the rx_queue field of the TCP socket table for the
// socket on the provided address formatted in hex, xxxxxxxx:xxxx.
// This function is only useful on linux due to its dependence on the /proc
// filesystem, but is kept in this file for simplicity.
func procNetTCP(path string, addr []string) (rx int64, err error) {
// filesystem, but is kept in this file for simplicity. If hasUnspecified
// is true, all addresses listed in the file in path are considered, and the
// sum of rx_queue matching the addr ports is returned where the corresponding
// addrIsUnspecified is true.
func procNetTCP(path string, addr []string, hasUnspecified bool, addrIsUnspecified []bool) (rx int64, err error) {
b, err := os.ReadFile(path)
if err != nil {
return 0, err
Expand All @@ -272,26 +299,43 @@ func procNetTCP(path string, addr []string) (rx int64, err error) {
if len(lines) < 2 {
return 0, fmt.Errorf("%s entry not found for %s (no line)", path, addr)
}
var found bool
for _, l := range lines[1:] {
f := bytes.Fields(l)
if len(f) > 4 && contains(f[1], addr) {
if len(f) > 4 && contains(f[1], addr, addrIsUnspecified) {
_, r, ok := bytes.Cut(f[4], []byte(":"))
if !ok {
return 0, errors.New("no rx_queue field " + string(f[4]))
}
rx, err = strconv.ParseInt(string(r), 16, 64)
found = true

v, err := strconv.ParseInt(string(r), 16, 64)
if err != nil {
return 0, fmt.Errorf("failed to parse rx_queue: %w", err)
}
rx += v

if hasUnspecified {
continue
}
return rx, nil
}
}
if found {
return rx, nil
}
return 0, fmt.Errorf("%s entry not found for %s", path, addr)
}

func contains(b []byte, addr []string) bool {
for _, a := range addr {
if strings.EqualFold(string(b), a) {
func contains(b []byte, addr []string, addrIsUnspecified []bool) bool {
for i, a := range addr {
if addrIsUnspecified[i] {
_, ap, pok := strings.Cut(a, ":")
_, bp, bok := bytes.Cut(b, []byte(":"))
if pok && bok && strings.EqualFold(string(bp), ap) {
return true
}
} else if strings.EqualFold(string(b), a) {
return true
}
}
Expand Down
31 changes: 29 additions & 2 deletions filebeat/input/tcp/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,42 @@ import (

func TestProcNetTCP(t *testing.T) {
t.Run("with_match", func(t *testing.T) {
rx, err := procNetTCP("testdata/proc_net_tcp.txt", []string{"0100007F:17AC"})
addr := []string{"0100007F:17AC"}
hasUnspecified, addrIsUnspecified, bad := containsUnspecifiedAddr(addr)
rx, err := procNetTCP("testdata/proc_net_tcp.txt", addr, hasUnspecified, addrIsUnspecified)
if err != nil {
t.Fatal(err)
}
assert.Nil(t, bad)
assert.EqualValues(t, 1, rx)
})

t.Run("unspecified", func(t *testing.T) {
addr := []string{"00000000:17AC"}
hasUnspecified, addrIsUnspecified, bad := containsUnspecifiedAddr(addr)
rx, err := procNetTCP("testdata/proc_net_tcp.txt", addr, hasUnspecified, addrIsUnspecified)
if err != nil {
t.Fatal(err)
}
assert.Nil(t, bad)
assert.EqualValues(t, 2, rx)
})

t.Run("without_match", func(t *testing.T) {
_, err := procNetTCP("testdata/proc_net_tcp.txt", []string{"FOO:BAR", "BAR:BAZ"})
addr := []string{"deadbeef:f00d", "ba1dface:1135"}
hasUnspecified, addrIsUnspecified, bad := containsUnspecifiedAddr(addr)
_, err := procNetTCP("testdata/proc_net_tcp.txt", addr, hasUnspecified, addrIsUnspecified)
assert.Nil(t, bad)
if assert.Error(t, err) {
assert.Contains(t, err.Error(), "entry not found")
}
})

t.Run("bad_addrs", func(t *testing.T) {
addr := []string{"FOO:BAR", "BAR:BAZ"}
hasUnspecified, addrIsUnspecified, bad := containsUnspecifiedAddr(addr)
_, err := procNetTCP("testdata/proc_net_tcp.txt", addr, hasUnspecified, addrIsUnspecified)
assert.EqualValues(t, addr, bad)
if assert.Error(t, err) {
assert.Contains(t, err.Error(), "entry not found")
}
Expand Down
2 changes: 2 additions & 0 deletions filebeat/input/tcp/testdata/proc_net_tcp.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
sl local_address rem_address st tx_queue rx_queue tr tm->when retrnsmt uid timeout inode
1: 0100007F:17AC 00000000:0000 0A 00000000:00000001 00:00000000 00000000 0 0 104724420 1 0000000000000000 100 0 0 10 0
1: 8100007F:17AC 00000000:0000 0A 00000000:00000001 00:00000000 00000000 0 0 104724420 1 0000000000000000 100 0 0 10 0
1: 8100007F:17AD 00000000:0000 0A 00000000:00000001 00:00000000 00000000 0 0 104724420 1 0000000000000000 100 0 0 10 0
64 changes: 55 additions & 9 deletions filebeat/input/udp/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package udp
import (
"bytes"
"encoding/binary"
"encoding/hex"
"errors"
"fmt"
"net"
Expand Down Expand Up @@ -235,11 +236,15 @@ func (m *inputMetrics) log(data []byte, timestamp time.Time) {

// poll periodically gets UDP buffer and packet drops stats from the OS.
func (m *inputMetrics) poll(addr []string, each time.Duration, log *logp.Logger) {
hasUnspecified, addrIsUnspecified, badAddr := containsUnspecifiedAddr(addr)
if badAddr != nil {
log.Warnf("failed to parse addrs for metric collection %q", badAddr)
}
t := time.NewTicker(each)
for {
select {
case <-t.C:
rx, drops, err := procNetUDP("/proc/net/udp", addr)
rx, drops, err := procNetUDP("/proc/net/udp", addr, hasUnspecified, addrIsUnspecified)
if err != nil {
log.Warnf("failed to get udp stats from /proc: %v", err)
continue
Expand All @@ -253,11 +258,33 @@ func (m *inputMetrics) poll(addr []string, each time.Duration, log *logp.Logger)
}
}

func containsUnspecifiedAddr(addr []string) (yes bool, which []bool, bad []string) {
which = make([]bool, len(addr))
for i, a := range addr {
prefix, _, ok := strings.Cut(a, ":")
if !ok {
continue
}
ip, err := hex.DecodeString(prefix)
if err != nil {
bad = append(bad, a)
}
if net.IP(ip).IsUnspecified() {
yes = true
which[i] = true
}
}
return yes, which, bad
}

// procNetUDP returns the rx_queue and drops field of the UDP socket table
// for the socket on the provided address formatted in hex, xxxxxxxx:xxxx.
// This function is only useful on linux due to its dependence on the /proc
// filesystem, but is kept in this file for simplicity.
func procNetUDP(path string, addr []string) (rx, drops int64, err error) {
// filesystem, but is kept in this file for simplicity. If hasUnspecified
// is true, all addresses listed in the file in path are considered, and the
// sum of rx_queue and drops matching the addr ports is returned where the
// corresponding addrIsUnspecified is true.
func procNetUDP(path string, addr []string, hasUnspecified bool, addrIsUnspecified []bool) (rx, drops int64, err error) {
b, err := os.ReadFile(path)
if err != nil {
return 0, 0, err
Expand All @@ -266,30 +293,49 @@ func procNetUDP(path string, addr []string) (rx, drops int64, err error) {
if len(lines) < 2 {
return 0, 0, fmt.Errorf("%s entry not found for %s (no line)", path, addr)
}
var found bool
for _, l := range lines[1:] {
f := bytes.Fields(l)
if len(f) > 12 && contains(f[1], addr) {
if len(f) > 12 && contains(f[1], addr, addrIsUnspecified) {
_, r, ok := bytes.Cut(f[4], []byte(":"))
if !ok {
return 0, 0, errors.New("no rx_queue field " + string(f[4]))
}
rx, err = strconv.ParseInt(string(r), 16, 64)
found = true

v, err := strconv.ParseInt(string(r), 16, 64)
if err != nil {
return 0, 0, fmt.Errorf("failed to parse rx_queue: %w", err)
}
drops, err = strconv.ParseInt(string(f[12]), 16, 64)
rx += v

v, err = strconv.ParseInt(string(f[12]), 16, 64)
if err != nil {
return 0, 0, fmt.Errorf("failed to parse drops: %w", err)
}
drops += v

if hasUnspecified {
continue
}
return rx, drops, nil
}
}
if found {
return rx, drops, nil
}
return 0, 0, fmt.Errorf("%s entry not found for %s", path, addr)
}

func contains(b []byte, addr []string) bool {
for _, a := range addr {
if strings.EqualFold(string(b), a) {
func contains(b []byte, addr []string, addrIsUnspecified []bool) bool {
for i, a := range addr {
if addrIsUnspecified[i] {
_, ap, pok := strings.Cut(a, ":")
_, bp, bok := bytes.Cut(b, []byte(":"))
if pok && bok && strings.EqualFold(string(bp), ap) {
return true
}
} else if strings.EqualFold(string(b), a) {
return true
}
}
Expand Down
32 changes: 30 additions & 2 deletions filebeat/input/udp/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,44 @@ import (

func TestProcNetUDP(t *testing.T) {
t.Run("with_match", func(t *testing.T) {
rx, drops, err := procNetUDP("testdata/proc_net_udp.txt", []string{"2508640A:1BBE"})
addr := []string{"2508640A:1BBE"}
hasUnspecified, addrIsUnspecified, bad := containsUnspecifiedAddr(addr)
rx, drops, err := procNetUDP("testdata/proc_net_udp.txt", addr, hasUnspecified, addrIsUnspecified)
if err != nil {
t.Fatal(err)
}
assert.Nil(t, bad)
assert.EqualValues(t, 1, rx)
assert.EqualValues(t, 2, drops)
})

t.Run("unspecified", func(t *testing.T) {
addr := []string{"00000000:1BBE"}
hasUnspecified, addrIsUnspecified, bad := containsUnspecifiedAddr(addr)
rx, drops, err := procNetUDP("testdata/proc_net_udp.txt", addr, hasUnspecified, addrIsUnspecified)
if err != nil {
t.Fatal(err)
}
assert.Nil(t, bad)
assert.EqualValues(t, 2, rx)
assert.EqualValues(t, 4, drops)
})

t.Run("without_match", func(t *testing.T) {
_, _, err := procNetUDP("testdata/proc_net_udp.txt", []string{"FOO:BAR", "BAR:BAZ"})
addr := []string{"deadbeef:f00d", "ba1dface:1135"}
hasUnspecified, addrIsUnspecified, bad := containsUnspecifiedAddr(addr)
_, _, err := procNetUDP("testdata/proc_net_udp.txt", addr, hasUnspecified, addrIsUnspecified)
assert.Nil(t, bad)
if assert.Error(t, err) {
assert.Contains(t, err.Error(), "entry not found")
}
})

t.Run("bad_addrs", func(t *testing.T) {
addr := []string{"FOO:BAR", "BAR:BAZ"}
hasUnspecified, addrIsUnspecified, bad := containsUnspecifiedAddr(addr)
_, _, err := procNetUDP("testdata/proc_net_udp.txt", addr, hasUnspecified, addrIsUnspecified)
assert.EqualValues(t, addr, bad)
if assert.Error(t, err) {
assert.Contains(t, err.Error(), "entry not found")
}
Expand Down
2 changes: 2 additions & 0 deletions filebeat/input/udp/testdata/proc_net_udp.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,5 @@
1570: 2508640A:6CB3 00000000:0000 07 00000000:00000000 00:00000000 00000000 0 0 104819249 2 0000000000000000 0
1694: 2508640A:D52F 2208640A:0035 01 00000000:00000000 00:00000000 00000000 0 0 112991224 2 0000000000000000 0
1938: 2508640A:5E23 00000000:0000 07 00000000:00000000 00:00000000 00000000 0 0 104833716 2 0000000000000000 0
1325: 3509640A:1BBE 00000000:0000 07 00000000:00000001 00:00000000 00000000 0 0 104836465 2 0000000000000000 2
1325: 3509640A:1BBF 00000000:0000 07 00000000:00000001 00:00000000 00000000 0 0 104836465 2 0000000000000000 2