diff --git a/collector/analyzer/tcpconnectanalyzer/analyzer.go b/collector/analyzer/tcpconnectanalyzer/analyzer.go new file mode 100644 index 000000000..1c456e4e4 --- /dev/null +++ b/collector/analyzer/tcpconnectanalyzer/analyzer.go @@ -0,0 +1,241 @@ +package tcpconnectanalyzer + +import ( + "time" + + "github.com/Kindling-project/kindling/collector/analyzer" + "github.com/Kindling-project/kindling/collector/analyzer/tcpconnectanalyzer/internal" + "github.com/Kindling-project/kindling/collector/component" + "github.com/Kindling-project/kindling/collector/consumer" + conntrackerpackge "github.com/Kindling-project/kindling/collector/metadata/conntracker" + "github.com/Kindling-project/kindling/collector/model" + "github.com/Kindling-project/kindling/collector/model/constlabels" + "github.com/Kindling-project/kindling/collector/model/constnames" + "github.com/hashicorp/go-multierror" + "go.uber.org/zap" +) + +const Type analyzer.Type = "tcpconnectanalyzer" + +type TcpConnectAnalyzer struct { + config *Config + nextConsumers []consumer.Consumer + conntracker conntrackerpackge.Conntracker + + eventChannel chan *model.KindlingEvent + connectMonitor *internal.ConnectMonitor + + stopCh chan bool + + telemetry *component.TelemetryTools +} + +func New(cfg interface{}, telemetry *component.TelemetryTools, consumers []consumer.Consumer) analyzer.Analyzer { + config := cfg.(*Config) + ret := &TcpConnectAnalyzer{ + config: config, + nextConsumers: consumers, + telemetry: telemetry, + eventChannel: make(chan *model.KindlingEvent, config.ChannelSize), + stopCh: make(chan bool), + + connectMonitor: internal.NewConnectMonitor(telemetry.Logger), + } + conntracker, err := conntrackerpackge.NewConntracker(nil) + if err != nil { + telemetry.Logger.Warn("Conntracker cannot work as expected:", zap.Error(err)) + } + ret.conntracker = conntracker + newSelfMetrics(telemetry.MeterProvider, ret.connectMonitor) + return ret +} + +func (a *TcpConnectAnalyzer) ConsumableEvents() []string { + return []string{ + constnames.ConnectEvent, + constnames.TcpConnectEvent, + constnames.TcpSetStateEvent, + constnames.WriteEvent, + constnames.WritevEvent, + constnames.SendMsgEvent, + constnames.SendToEvent, + } +} + +// Start initializes the analyzer +func (a *TcpConnectAnalyzer) Start() error { + go func() { + scanTcpStateTicker := time.NewTicker(time.Duration(a.config.WaitEventSecond/3) * time.Second) + for { + select { + case <-scanTcpStateTicker.C: + a.trimConnectionsWithTcpStat() + case event := <-a.eventChannel: + a.consumeChannelEvent(event) + case <-a.stopCh: + // Only trim the connections expired. For those unfinished, we leave them + // unchanged and just shutdown this goroutine. + a.trimConnectionsWithTcpStat() + return + } + } + }() + return nil +} + +// ConsumeEvent gets the event from the previous component +func (a *TcpConnectAnalyzer) ConsumeEvent(event *model.KindlingEvent) error { + a.eventChannel <- event + return nil +} + +func (a *TcpConnectAnalyzer) consumeChannelEvent(event *model.KindlingEvent) { + var ( + connectStats *internal.ConnectionStats + err error + ) + + switch event.Name { + case constnames.ConnectEvent: + if !event.IsTcp() { + return + } + connectStats, err = a.connectMonitor.ReadInConnectExitSyscall(event) + case constnames.TcpConnectEvent: + connectStats, err = a.connectMonitor.ReadInTcpConnect(event) + case constnames.TcpSetStateEvent: + connectStats, err = a.connectMonitor.ReadInTcpSetState(event) + case constnames.WriteEvent: + fallthrough + case constnames.WritevEvent: + fallthrough + case constnames.SendToEvent: + fallthrough + case constnames.SendMsgEvent: + if filterRequestEvent(event) { + return + } + connectStats, err = a.connectMonitor.ReadSendRequestSyscall(event) + } + + if err != nil { + a.telemetry.Logger.Debug("Cannot update connection stats:", zap.Error(err)) + return + } + // Connection is not established yet + if connectStats == nil { + return + } + + dataGroup := a.generateDataGroup(connectStats) + a.passThroughConsumers(dataGroup) +} + +func filterRequestEvent(event *model.KindlingEvent) bool { + if event.Category != model.Category_CAT_NET { + return true + } + + ctx := event.GetCtx() + if ctx == nil || ctx.GetThreadInfo() == nil { + return true + } + fd := ctx.GetFdInfo() + if fd == nil { + return true + } + if fd.GetProtocol() != model.L4Proto_TCP { + return true + } + if fd.GetSip() == nil || fd.GetDip() == nil { + return true + } + + return false +} + +func (a *TcpConnectAnalyzer) trimConnectionsWithTcpStat() { + connStats := a.connectMonitor.TrimConnectionsWithTcpStat(a.config.WaitEventSecond) + for _, connStat := range connStats { + dataGroup := a.generateDataGroup(connStat) + a.passThroughConsumers(dataGroup) + } +} + +func (a *TcpConnectAnalyzer) passThroughConsumers(dataGroup *model.DataGroup) { + var retError error + for _, nextConsumer := range a.nextConsumers { + err := nextConsumer.Consume(dataGroup) + if err != nil { + retError = multierror.Append(retError, err) + } + } + if retError != nil { + a.telemetry.Logger.Warn("Error happened while passing through processors:", zap.Error(retError)) + } +} + +func (a *TcpConnectAnalyzer) generateDataGroup(connectStats *internal.ConnectionStats) *model.DataGroup { + labels := a.generateLabels(connectStats) + metrics := make([]*model.Metric, 0, 2) + metrics = append(metrics, model.NewIntMetric(constnames.TcpConnectTotalMetric, 1)) + // Only record the connection's duration when it is successfully established + if connectStats.StateMachine.GetCurrentState() == internal.Success { + metrics = append(metrics, model.NewIntMetric(constnames.TcpConnectDurationMetric, connectStats.GetConnectDuration())) + } + + retDataGroup := model.NewDataGroup( + constnames.TcpConnectMetricGroupName, + labels, + connectStats.EndTimestamp, + metrics...) + + return retDataGroup +} + +func (a *TcpConnectAnalyzer) generateLabels(connectStats *internal.ConnectionStats) *model.AttributeMap { + labels := model.NewAttributeMap() + // The connect events always come from the client-side + labels.AddBoolValue(constlabels.IsServer, false) + labels.AddStringValue(constlabels.ContainerId, connectStats.ContainerId) + labels.AddIntValue(constlabels.Errno, int64(connectStats.Code)) + if connectStats.StateMachine.GetCurrentState() == internal.Success { + labels.AddBoolValue(constlabels.Success, true) + } else { + labels.AddBoolValue(constlabels.Success, false) + } + + srcIp := connectStats.ConnKey.SrcIP + dstIp := connectStats.ConnKey.DstIP + srcPort := connectStats.ConnKey.SrcPort + dstPort := connectStats.ConnKey.DstPort + labels.UpdateAddStringValue(constlabels.SrcIp, srcIp) + labels.UpdateAddStringValue(constlabels.DstIp, dstIp) + labels.UpdateAddIntValue(constlabels.SrcPort, int64(srcPort)) + labels.UpdateAddIntValue(constlabels.DstPort, int64(dstPort)) + dNatIp, dNatPort := a.findDNatTuple(srcIp, uint64(srcPort), dstIp, uint64(dstPort)) + labels.AddStringValue(constlabels.DnatIp, dNatIp) + labels.AddIntValue(constlabels.DnatPort, dNatPort) + return labels +} + +func (a *TcpConnectAnalyzer) findDNatTuple(sIp string, sPort uint64, dIp string, dPort uint64) (string, int64) { + dNat := a.conntracker.GetDNATTupleWithString(sIp, dIp, uint16(sPort), uint16(dPort), 0) + if dNat == nil { + return "", -1 + } + dNatIp := dNat.ReplSrcIP.String() + dNatPort := dNat.ReplSrcPort + return dNatIp, int64(dNatPort) +} + +// Shutdown cleans all the resources used by the analyzer +func (a *TcpConnectAnalyzer) Shutdown() error { + a.stopCh <- true + return nil +} + +// Type returns the type of the analyzer +func (a *TcpConnectAnalyzer) Type() analyzer.Type { + return Type +} diff --git a/collector/analyzer/tcpconnectanalyzer/config.go b/collector/analyzer/tcpconnectanalyzer/config.go new file mode 100644 index 000000000..0ac90e526 --- /dev/null +++ b/collector/analyzer/tcpconnectanalyzer/config.go @@ -0,0 +1,13 @@ +package tcpconnectanalyzer + +type Config struct { + ChannelSize int `mapstructure:"channel_size"` + WaitEventSecond int `mapstructure:"wait_event_second"` +} + +func NewDefaultConfig() *Config { + return &Config{ + ChannelSize: 2000, + WaitEventSecond: 10, + } +} diff --git a/collector/analyzer/tcpconnectanalyzer/internal/connect_monitor.go b/collector/analyzer/tcpconnectanalyzer/internal/connect_monitor.go new file mode 100644 index 000000000..9e1cd56cd --- /dev/null +++ b/collector/analyzer/tcpconnectanalyzer/internal/connect_monitor.go @@ -0,0 +1,328 @@ +package internal + +import ( + "fmt" + "os" + "time" + + "github.com/Kindling-project/kindling/collector/model" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +// ConnectMonitor reads in events related to TCP connect operations and updates its +// status to record the connection procedure. +// This is not thread safe to use. +type ConnectMonitor struct { + connMap map[ConnKey]*ConnectionStats + statesResource StatesResource + hostProcPath string + logger *zap.Logger +} + +const HostProc = "HOST_PROC_PATH" + +func NewConnectMonitor(logger *zap.Logger) *ConnectMonitor { + path, ok := os.LookupEnv(HostProc) + if !ok { + path = "/proc" + } + return &ConnectMonitor{ + connMap: make(map[ConnKey]*ConnectionStats), + statesResource: createStatesResource(), + hostProcPath: path, + logger: logger, + } +} + +func (c *ConnectMonitor) ReadInConnectExitSyscall(event *model.KindlingEvent) (*ConnectionStats, error) { + retValue := event.GetUserAttribute("res") + if retValue == nil { + return nil, fmt.Errorf("res of connect_exit is nil") + } + retValueInt := retValue.GetIntValue() + + connKey := ConnKey{ + SrcIP: event.GetSip(), + SrcPort: event.GetSport(), + DstIP: event.GetDip(), + DstPort: event.GetDport(), + } + if ce := c.logger.Check(zapcore.DebugLevel, "Receive connect_exit event:"); ce != nil { + ce.Write( + zap.String("ConnKey", connKey.String()), + zap.Int64("retValue", retValueInt), + ) + } + + connStats, ok := c.connMap[connKey] + if !ok { + // Maybe the connStats have been closed by tcp_set_state_from_established event. + // We don't care about it. + return nil, nil + } + // "connect_exit" comes to analyzer after "tcp_connect" + connStats.EndTimestamp = event.Timestamp + connStats.pid = event.GetPid() + connStats.ContainerId = event.GetContainerId() + var eventType EventType + if retValueInt == 0 { + eventType = connectExitSyscallSuccess + } else if isNotErrorReturnCode(retValueInt) { + eventType = connectExitSyscallNotConcern + } else { + eventType = connectExitSyscallFailure + connStats.Code = int(retValueInt) + } + return connStats.StateMachine.ReceiveEvent(eventType, c.connMap) +} + +func (c *ConnectMonitor) ReadSendRequestSyscall(event *model.KindlingEvent) (*ConnectionStats, error) { + // The events without sip/sport/dip/dport have been filtered outside this method. + connKey := ConnKey{ + SrcIP: event.GetSip(), + SrcPort: event.GetSport(), + DstIP: event.GetDip(), + DstPort: event.GetDport(), + } + if ce := c.logger.Check(zapcore.DebugLevel, "Receive sendRequestSyscall event:"); ce != nil { + ce.Write( + zap.String("ConnKey", connKey.String()), + zap.String("eventName", event.Name), + ) + } + + connStats, ok := c.connMap[connKey] + if !ok { + return nil, nil + } + connStats.pid = event.GetPid() + connStats.ContainerId = event.GetContainerId() + return connStats.StateMachine.ReceiveEvent(sendRequestSyscall, c.connMap) +} + +func isNotErrorReturnCode(code int64) bool { + return code == einprogress || code == eintr || code == eisconn || code == ealready +} + +func (c *ConnectMonitor) ReadInTcpConnect(event *model.KindlingEvent) (*ConnectionStats, error) { + connKey, err := getConnKeyForTcpConnect(event) + if err != nil { + return nil, err + } + retValue := event.GetUserAttribute("retval") + if retValue == nil { + return nil, fmt.Errorf("retval of tcp_connect is nil") + } + retValueInt := retValue.GetUintValue() + + if ce := c.logger.Check(zapcore.DebugLevel, "Receive tcp_connect event:"); ce != nil { + ce.Write( + zap.String("ConnKey", connKey.String()), + zap.Uint64("retValue", retValueInt), + ) + } + + var eventType EventType + if retValueInt == 0 { + eventType = tcpConnectNoError + } else { + eventType = tcpConnectError + } + + connStats, ok := c.connMap[connKey] + if !ok { + // "tcp_connect" comes to analyzer before "connect_exit" + connStats = &ConnectionStats{ + ConnKey: connKey, + InitialTimestamp: event.Timestamp, + EndTimestamp: event.Timestamp, + Code: int(retValueInt), + } + connStats.StateMachine = NewStateMachine(Inprogress, c.statesResource, connStats) + c.connMap[connKey] = connStats + } else { + // Not possible to enter this branch + c.logger.Info("Receive another unexpected tcp_connect event", zap.String("connKey", connKey.String())) + connStats.EndTimestamp = event.Timestamp + connStats.Code = int(retValueInt) + } + return connStats.StateMachine.ReceiveEvent(eventType, c.connMap) +} + +const ( + establishedState = 1 +) + +func (c *ConnectMonitor) ReadInTcpSetState(event *model.KindlingEvent) (*ConnectionStats, error) { + connKey, err := getConnKeyForTcpConnect(event) + if err != nil { + return nil, err + } + + oldState := event.GetUserAttribute("old_state") + newState := event.GetUserAttribute("new_state") + if oldState == nil || newState == nil { + return nil, fmt.Errorf("tcp_set_state events have nil state, ConnKey: %s", connKey.String()) + } + oldStateInt := oldState.GetIntValue() + newStateInt := newState.GetIntValue() + + if oldStateInt == establishedState { + return c.readInTcpSetStateFromEstablished(connKey, event) + } else if newStateInt == establishedState { + return c.readInTcpSetStateToEstablished(connKey, event) + } else { + return nil, fmt.Errorf("no state is 'established' for tcp_set_state event, "+ + "old state: %d, new state: %d", oldStateInt, newStateInt) + } +} + +func (c *ConnectMonitor) readInTcpSetStateToEstablished(connKey ConnKey, event *model.KindlingEvent) (*ConnectionStats, error) { + if ce := c.logger.Check(zapcore.DebugLevel, "Receive tcp_set_state(to established) event:"); ce != nil { + ce.Write( + zap.String("ConnKey", connKey.String()), + ) + } + connStats, ok := c.connMap[connKey] + if !ok { + // No tcp_connect or connect_exit received. + // This is the events from server-side. + c.logger.Debug("No tcp_connect received, but receive tcp_set_state_to_established") + return nil, nil + } + connStats.EndTimestamp = event.Timestamp + return connStats.StateMachine.ReceiveEvent(tcpSetStateToEstablished, c.connMap) +} + +func (c *ConnectMonitor) readInTcpSetStateFromEstablished(connKey ConnKey, event *model.KindlingEvent) (*ConnectionStats, error) { + if ce := c.logger.Check(zapcore.DebugLevel, "Receive tcp_set_state(from established) event:"); ce != nil { + ce.Write( + zap.String("ConnKey", connKey.String()), + ) + } + connStats, ok := c.connMap[connKey] + if !ok { + // Connection has been established and the connStats have been emitted. + return nil, nil + } + connStats.EndTimestamp = event.Timestamp + return connStats.StateMachine.ReceiveEvent(tcpSetStateFromEstablished, c.connMap) +} + +func (c *ConnectMonitor) TrimConnectionsWithTcpStat(waitForEventSecond int) []*ConnectionStats { + ret := make([]*ConnectionStats, 0, len(c.connMap)) + // Only scan once for each pid + pidTcpStateMap := make(map[uint32]NetSocketStateMap) + waitForEventNano := int64(waitForEventSecond) * 1000000000 + timeNow := time.Now().UnixNano() + for key, connStat := range c.connMap { + if connStat.pid == 0 { + continue + } + if timeNow-int64(connStat.InitialTimestamp) < waitForEventNano { + // Still waiting for other events + continue + } + tcpStateMap, ok := pidTcpStateMap[connStat.pid] + if !ok { + tcpState, err := NewPidTcpStat(c.hostProcPath, int(connStat.pid)) + if err != nil { + c.logger.Debug("error happened when scanning net/tcp", + zap.Uint32("pid", connStat.pid), zap.Error(err)) + // No such file or directory, which means the process has been purged. + // We consider the connection failed to be established. + stats, err := connStat.StateMachine.ReceiveEvent(expiredEvent, c.connMap) + if err != nil { + c.logger.Warn("error happened when receiving event:", zap.Error(err)) + } + if stats != nil { + ret = append(ret, stats) + } + continue + } + pidTcpStateMap[connStat.pid] = tcpState + tcpStateMap = tcpState + } + state, ok := tcpStateMap[key.toSocketKey()] + // There are two possible reasons for no such socket found: + // 1. The connection was established and has been closed. + // There should have been received a tcp_set_state event, and c.connMap + // should not contain such socket. In this case, we won't enter this piece of code. + // 2. The connection failed to be established and has been closed. + if !ok { + stats, err := connStat.StateMachine.ReceiveEvent(expiredEvent, c.connMap) + if err != nil { + c.logger.Warn("error happened when receiving event:", zap.Error(err)) + } + if stats != nil { + ret = append(ret, stats) + } + continue + } + if state == established { + stats, err := connStat.StateMachine.ReceiveEvent(tcpSetStateToEstablished, c.connMap) + if err != nil { + c.logger.Warn("error happened when receiving event:", zap.Error(err)) + } + if stats != nil { + ret = append(ret, stats) + } + } else if state == synSent || state == synRecv { + continue + } else { + // These states are behind the ESTABLISHED state. + // The codes could run into this branch if tcpSetStateToEstablished not received. + c.logger.Debug("See sockets whose state is behind ESTABLISHED, which means no "+ + "tcp_set_state_from_established received.", zap.String("state", state)) + stats, err := connStat.StateMachine.ReceiveEvent(tcpSetStateFromEstablished, c.connMap) + if err != nil { + c.logger.Warn("error happened when receiving event:", zap.Error(err)) + } + if stats != nil { + ret = append(ret, stats) + } + } + } + return ret +} + +func (c *ConnectMonitor) GetMapSize() int { + return len(c.connMap) +} + +func getConnKeyForTcpConnect(event *model.KindlingEvent) (ConnKey, error) { + var sIpString string + var sPortUint uint64 + var dIpString string + var dPortUint uint64 + sIp := event.GetUserAttribute("sip") + if sIp != nil { + sIpString = model.IPLong2String(uint32(sIp.GetUintValue())) + } + sPort := event.GetUserAttribute("sport") + if sPort != nil { + sPortUint = sPort.GetUintValue() + } + dIp := event.GetUserAttribute("dip") + if dIp != nil { + dIpString = model.IPLong2String(uint32(dIp.GetUintValue())) + } + dPort := event.GetUserAttribute("dport") + if dPort != nil { + dPortUint = dPort.GetUintValue() + } + + if sIp == nil || sPort == nil || dIp == nil || dPort == nil { + return ConnKey{}, fmt.Errorf("some fields are nil for event %s. srcIp=%v, srcPort=%v, "+ + "dstIp=%v, dstPort=%v", event.Name, sIpString, sPortUint, dIpString, dPortUint) + } + + connKey := ConnKey{ + SrcIP: sIpString, + SrcPort: uint32(sPortUint), + DstIP: dIpString, + DstPort: uint32(dPortUint), + } + return connKey, nil +} diff --git a/collector/analyzer/tcpconnectanalyzer/internal/connection_stats.go b/collector/analyzer/tcpconnectanalyzer/internal/connection_stats.go new file mode 100644 index 000000000..f62e48f26 --- /dev/null +++ b/collector/analyzer/tcpconnectanalyzer/internal/connection_stats.go @@ -0,0 +1,115 @@ +package internal + +import ( + "fmt" +) + +const ( + // See in Linux + einprogress = -115 + ealready = -114 + eisconn = -106 + eintr = -4 +) + +const ( + established = "01" + synSent = "02" + synRecv = "03" + finWait1 = "04" + finWait2 = "05" + timeWait = "06" + close = "07" + closeWait = "08" + lastAck = "09" + listen = "0A" + closing = "0B" +) + +type ConnectionStats struct { + pid uint32 + ContainerId string + ConnKey ConnKey + StateMachine *StateMachine + InitialTimestamp uint64 + EndTimestamp uint64 + Code int +} + +func (c *ConnectionStats) GetConnectDuration() int64 { + return int64(c.EndTimestamp - c.InitialTimestamp) +} + +type ConnKey struct { + SrcIP string + SrcPort uint32 + DstIP string + DstPort uint32 +} + +func (k *ConnKey) toSocketKey() SocketKey { + return SocketKey{ + LocalAddr: k.SrcIP, + LocalPort: uint64(k.SrcPort), + RemAddr: k.DstIP, + RemPort: uint64(k.DstPort), + } +} + +func (k *ConnKey) String() string { + return fmt.Sprintf("src: %s:%d, dst: %s:%d", k.SrcIP, k.SrcPort, k.DstIP, k.DstPort) +} + +const ( + Inprogress StateType = "inprogress" + Success StateType = "success" + Failure StateType = "failure" + Closed StateType = "closed" + + tcpConnectError EventType = "tcp_connect_negative" + tcpConnectNoError EventType = "tcp_connect_zero" + tcpSetStateToEstablished EventType = "tcp_set_state_to_established" + tcpSetStateFromEstablished EventType = "tcp_set_state_from_established" + connectExitSyscallSuccess EventType = "connect_exit_syscall_zero" + connectExitSyscallFailure EventType = "connect_exit_syscall_failure" + connectExitSyscallNotConcern EventType = "connect_exit_syscall_not_concern" + expiredEvent EventType = "expired_event" + sendRequestSyscall EventType = "send_request_syscall" +) + +func createStatesResource() StatesResource { + return StatesResource{ + Inprogress: State{ + eventsMap: map[EventType]StateType{ + tcpConnectNoError: Inprogress, + tcpConnectError: Failure, + tcpSetStateToEstablished: Success, + // Sometimes tcpSetStateToEstablished and tcpSetStateFromEstablished are both missing, + // so sendRequestSyscall is used to mark the state as Success from Inprogress. + sendRequestSyscall: Success, + // Sometimes tcpSetStateToEstablished is missing and sendRequestSyscall is not triggered, + // so tcpSetStateFromEstablished is used to mark the state as Success from Inprogress. + tcpSetStateFromEstablished: Success, + connectExitSyscallSuccess: Success, + connectExitSyscallFailure: Failure, + connectExitSyscallNotConcern: Inprogress, + expiredEvent: Failure, + }, + callback: nil, + }, + Success: { + eventsMap: map[EventType]StateType{}, + callback: func(connStats *ConnectionStats, connMap map[ConnKey]*ConnectionStats) *ConnectionStats { + delete(connMap, connStats.ConnKey) + return connStats + }, + }, + Failure: { + eventsMap: map[EventType]StateType{}, + callback: func(connStats *ConnectionStats, connMap map[ConnKey]*ConnectionStats) *ConnectionStats { + delete(connMap, connStats.ConnKey) + return connStats + }, + }, + } +} diff --git a/collector/analyzer/tcpconnectanalyzer/internal/net_ip_socket.go b/collector/analyzer/tcpconnectanalyzer/internal/net_ip_socket.go new file mode 100644 index 000000000..dd66763dc --- /dev/null +++ b/collector/analyzer/tcpconnectanalyzer/internal/net_ip_socket.go @@ -0,0 +1,163 @@ +// Copyright 2020 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Modification: +// 1. Change the return parameter of newNetIPSocket to map. +// 2. Remove unused fields parsed. + +package internal + +import ( + "bufio" + "encoding/hex" + "fmt" + "io" + "net" + "os" + "strconv" + "strings" +) + +const ( + // readLimit is used by io.LimitReader while reading the content of the + // /proc/net/udp{,6} files. The number of lines inside such a file is dynamic + // as each line represents a single used socket. + // In theory, the number of available sockets is 65535 (2^16 - 1) per IP. + // With e.g. 150 Byte per line and the maximum number of 65535, + // the reader needs to handle 150 Byte * 65535 =~ 10 MB for a single IP. + readLimit = 4294967296 // Byte -> 4 GiB +) + +// This contains generic data structures for both udp and tcp sockets. +type ( + SocketKey struct { + LocalAddr string + LocalPort uint64 + RemAddr string + RemPort uint64 + } + + // NetSocketStateMap represents the map between socket and its state. + NetSocketStateMap map[SocketKey]string + + // netIPSocketLine represents the fields parsed from a single line + // in /proc/net/{t,u}dp{,6}. Fields which are not used by IPSocket are skipped. + // For the proc file format details, see https://linux.die.net/man/5/proc. + netIPSocketLine struct { + LocalAddr net.IP + LocalPort uint64 + RemAddr net.IP + RemPort uint64 + St string + } +) + +func newNetIPSocket(file string) (NetSocketStateMap, error) { + f, err := os.Open(file) + if err != nil { + return nil, err + } + defer func() { + _ = f.Close() + }() + + netIPSocket := make(NetSocketStateMap) + + lr := io.LimitReader(f, readLimit) + s := bufio.NewScanner(lr) + s.Scan() // skip first line with headers + for s.Scan() { + fields := strings.Fields(s.Text()) + line, err := parseNetIPSocketLine(fields) + if err != nil { + return nil, err + } + socketKey := SocketKey{ + LocalAddr: line.LocalAddr.String(), + LocalPort: line.LocalPort, + RemAddr: line.RemAddr.String(), + RemPort: line.RemPort, + } + netIPSocket[socketKey] = line.St + } + if err := s.Err(); err != nil { + return nil, err + } + return netIPSocket, nil +} + +// the /proc/net/{t,u}dp{,6} files are network byte order for ipv4 and for ipv6 the address +// is four words consisting of four bytes each. In each of those four words the four bytes +// are written in reverse order. +func parseIP(hexIP string) (net.IP, error) { + var byteIP []byte + byteIP, err := hex.DecodeString(hexIP) + if err != nil { + return nil, fmt.Errorf("cannot parse address field in socket line %q", hexIP) + } + switch len(byteIP) { + case 4: + return net.IP{byteIP[3], byteIP[2], byteIP[1], byteIP[0]}, nil + case 16: + i := net.IP{ + byteIP[3], byteIP[2], byteIP[1], byteIP[0], + byteIP[7], byteIP[6], byteIP[5], byteIP[4], + byteIP[11], byteIP[10], byteIP[9], byteIP[8], + byteIP[15], byteIP[14], byteIP[13], byteIP[12], + } + return i, nil + default: + return nil, fmt.Errorf("unable to parse IP %s", hexIP) + } +} + +// parseNetIPSocketLine parses a single line, represented by a list of fields. +func parseNetIPSocketLine(fields []string) (*netIPSocketLine, error) { + line := &netIPSocketLine{} + if len(fields) < 10 { + return nil, fmt.Errorf( + "cannot parse net socket line as it has less then 10 columns %q", + strings.Join(fields, " "), + ) + } + var err error // parse error + + // local_address + l := strings.Split(fields[1], ":") + if len(l) != 2 { + return nil, fmt.Errorf("cannot parse local_address field in socket line %q", fields[1]) + } + if line.LocalAddr, err = parseIP(l[0]); err != nil { + return nil, err + } + if line.LocalPort, err = strconv.ParseUint(l[1], 16, 64); err != nil { + return nil, fmt.Errorf("cannot parse local_address port value in socket line: %w", err) + } + + // remote_address + r := strings.Split(fields[2], ":") + if len(r) != 2 { + return nil, fmt.Errorf("cannot parse rem_address field in socket line %q", fields[1]) + } + if line.RemAddr, err = parseIP(r[0]); err != nil { + return nil, err + } + if line.RemPort, err = strconv.ParseUint(r[1], 16, 64); err != nil { + return nil, fmt.Errorf("cannot parse rem_address port value in socket line: %w", err) + } + + // st + line.St = fields[3] + + return line, nil +} diff --git a/collector/analyzer/tcpconnectanalyzer/internal/net_ip_socket_test.go b/collector/analyzer/tcpconnectanalyzer/internal/net_ip_socket_test.go new file mode 100644 index 000000000..1ef71923c --- /dev/null +++ b/collector/analyzer/tcpconnectanalyzer/internal/net_ip_socket_test.go @@ -0,0 +1,74 @@ +// Copyright 2020 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal + +import ( + "net" + "reflect" + "testing" +) + +func Test_parseNetIPSocketLine(t *testing.T) { + tests := []struct { + fields []string + name string + want *netIPSocketLine + wantErr bool + }{ + { + name: "reading valid lines, no issue should happened", + fields: []string{"11:", "00000000:0000", "00000000:0000", "0A", "00000017:0000002A", "0:0", "0", "1000", "0", "39309"}, + want: &netIPSocketLine{ + LocalAddr: net.IP{0, 0, 0, 0}, + LocalPort: 0, + RemAddr: net.IP{0, 0, 0, 0}, + RemPort: 0, + St: "0A", + }, + }, + { + name: "error case - invalid line - number of fields/columns < 10", + fields: []string{"1:", "00000000:0000", "00000000:0000", "07", "0:0", "0", "0"}, + want: nil, + wantErr: true, + }, + { + name: "error case - parse local_address - not a valid hex", + fields: []string{"1:", "0000000O:0000", "00000000:0000", "07", "00000000:00000001", "0:0", "0", "0", "0", "39309"}, + want: nil, + wantErr: true, + }, + { + name: "error case - parse rem_address - not a valid hex", + fields: []string{"1:", "00000000:0000", "0000000O:0000", "07", "00000000:00000001", "0:0", "0", "0", "0", "39309"}, + want: nil, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := parseNetIPSocketLine(tt.fields) + if (err != nil) != tt.wantErr { + t.Errorf("parseNetIPSocketLine() error = %v, wantErr %v", err, tt.wantErr) + return + } + if tt.want == nil && got != nil { + t.Errorf("parseNetIPSocketLine() = %v, want %v", got, tt.want) + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("parseNetIPSocketLine() = %#v, want %#v", got, tt.want) + } + }) + } +} diff --git a/collector/analyzer/tcpconnectanalyzer/internal/state_machine.go b/collector/analyzer/tcpconnectanalyzer/internal/state_machine.go new file mode 100644 index 000000000..9ff0f5245 --- /dev/null +++ b/collector/analyzer/tcpconnectanalyzer/internal/state_machine.go @@ -0,0 +1,67 @@ +package internal + +import ( + "fmt" +) + +type StateType string + +type StateMachine struct { + connStats *ConnectionStats + lastStateType StateType + currentStateType StateType + states StatesResource +} + +func NewStateMachine(initialState StateType, statesResource StatesResource, connStats *ConnectionStats) *StateMachine { + return &StateMachine{ + connStats: connStats, + lastStateType: initialState, + currentStateType: initialState, + states: statesResource, + } +} + +func (s *StateMachine) GetCurrentState() StateType { + return s.currentStateType +} + +func (s *StateMachine) GetLastState() StateType { + return s.lastStateType +} + +func (s *StateMachine) ReceiveEvent(event EventType, connMap map[ConnKey]*ConnectionStats) (*ConnectionStats, error) { + currentState, ok := s.states[s.currentStateType] + if !ok { + return nil, fmt.Errorf("no current state [%v]", s.currentStateType) + } + nextStateType, ok := currentState.eventsMap[event] + if !ok { + return nil, fmt.Errorf("receive not supported event [%v] in state [%v], ConnKey: [%s]", + event, s.currentStateType, s.connStats.ConnKey.String()) + } + nextState, ok := s.states[nextStateType] + if !ok { + return nil, fmt.Errorf("no next state [%v]", nextStateType) + } + // Only trigger the callback when the state changes + if nextStateType != s.currentStateType && nextState.callback != nil { + s.lastStateType = s.currentStateType + s.currentStateType = nextStateType + return nextState.callback(s.connStats, connMap), nil + } + s.lastStateType = s.currentStateType + s.currentStateType = nextStateType + return nil, nil +} + +type Callback func(connStats *ConnectionStats, connMap map[ConnKey]*ConnectionStats) *ConnectionStats + +type EventType string + +type State struct { + eventsMap map[EventType]StateType + callback Callback +} + +type StatesResource map[StateType]State diff --git a/collector/analyzer/tcpconnectanalyzer/internal/state_machine_test.go b/collector/analyzer/tcpconnectanalyzer/internal/state_machine_test.go new file mode 100644 index 000000000..f2ce5c9c0 --- /dev/null +++ b/collector/analyzer/tcpconnectanalyzer/internal/state_machine_test.go @@ -0,0 +1,42 @@ +package internal + +import "testing" + +func TestCallback(t *testing.T) { + connMap := make(map[ConnKey]*ConnectionStats) + connKey := ConnKey{ + SrcIP: "10.10.10.10", + SrcPort: 40040, + DstIP: "10.10.10.23", + DstPort: 80, + } + statesResource := createStatesResource() + connStats := &ConnectionStats{ + pid: 0, + ConnKey: connKey, + InitialTimestamp: 0, + EndTimestamp: 0, + Code: 0, + } + connStats.StateMachine = NewStateMachine(Inprogress, statesResource, connStats) + connMap[connKey] = connStats + + stats, err := connStats.StateMachine.ReceiveEvent(tcpConnectNoError, connMap) + if err != nil { + t.Fatal(err) + } + if connStats.StateMachine.currentStateType != Inprogress { + t.Errorf("expected inprogress, got %v", connStats.StateMachine.currentStateType) + } + + stats, err = connStats.StateMachine.ReceiveEvent(tcpSetStateToEstablished, connMap) + if err != nil { + t.Fatal(err) + } + if stats == nil { + t.Errorf("expected stats emitted, but none got") + } + if connStats.StateMachine.currentStateType != Success { + t.Errorf("expected success, got %v", connStats.StateMachine.currentStateType) + } +} diff --git a/collector/analyzer/tcpconnectanalyzer/internal/tcp_stat.go b/collector/analyzer/tcpconnectanalyzer/internal/tcp_stat.go new file mode 100644 index 000000000..32fdb3b70 --- /dev/null +++ b/collector/analyzer/tcpconnectanalyzer/internal/tcp_stat.go @@ -0,0 +1,11 @@ +package internal + +import ( + "path" + "strconv" +) + +func NewPidTcpStat(hostProc string, pid int) (NetSocketStateMap, error) { + tcpFilePath := path.Join(hostProc, strconv.Itoa(pid), "net/tcp") + return newNetIPSocket(tcpFilePath) +} diff --git a/collector/analyzer/tcpconnectanalyzer/metrics.go b/collector/analyzer/tcpconnectanalyzer/metrics.go new file mode 100644 index 000000000..1daa82e5a --- /dev/null +++ b/collector/analyzer/tcpconnectanalyzer/metrics.go @@ -0,0 +1,23 @@ +package tcpconnectanalyzer + +import ( + "context" + "sync" + + "github.com/Kindling-project/kindling/collector/analyzer/tcpconnectanalyzer/internal" + "go.opentelemetry.io/otel/metric" +) + +var once sync.Once + +const mapSizeMetric = "kindling_telemetry_tcpconnectanalyzer_map_size" + +func newSelfMetrics(meterProvider metric.MeterProvider, monitor *internal.ConnectMonitor) { + once.Do(func() { + meter := metric.Must(meterProvider.Meter("kindling")) + meter.NewInt64GaugeObserver(mapSizeMetric, + func(ctx context.Context, result metric.Int64ObserverResult) { + result.Observe(int64(monitor.GetMapSize())) + }) + }) +} diff --git a/collector/application/application.go b/collector/application/application.go index c87f415ef..cc3ae987e 100644 --- a/collector/application/application.go +++ b/collector/application/application.go @@ -7,6 +7,7 @@ import ( "github.com/Kindling-project/kindling/collector/analyzer" "github.com/Kindling-project/kindling/collector/analyzer/loganalyzer" "github.com/Kindling-project/kindling/collector/analyzer/network" + "github.com/Kindling-project/kindling/collector/analyzer/tcpconnectanalyzer" "github.com/Kindling-project/kindling/collector/analyzer/tcpmetricanalyzer" "github.com/Kindling-project/kindling/collector/component" "github.com/Kindling-project/kindling/collector/consumer" @@ -80,6 +81,7 @@ func (a *Application) registerFactory() { a.componentsFactory.RegisterExporter(logexporter.Type, logexporter.New, &logexporter.Config{}) a.componentsFactory.RegisterAnalyzer(loganalyzer.Type.String(), loganalyzer.New, &loganalyzer.Config{}) a.componentsFactory.RegisterProcessor(aggregateprocessor.Type, aggregateprocessor.New, aggregateprocessor.NewDefaultConfig()) + a.componentsFactory.RegisterAnalyzer(tcpconnectanalyzer.Type.String(), tcpconnectanalyzer.New, tcpconnectanalyzer.NewDefaultConfig()) } func (a *Application) readInConfig(path string) error { @@ -120,8 +122,10 @@ func (a *Application) buildPipeline() error { k8sMetadataProcessor2 := k8sProcessorFactory.NewFunc(k8sProcessorFactory.Config, a.telemetry.Telemetry, aggregateProcessorForTcp) tcpAnalyzerFactory := a.componentsFactory.Analyzers[tcpmetricanalyzer.TcpMetric.String()] tcpAnalyzer := tcpAnalyzerFactory.NewFunc(tcpAnalyzerFactory.Config, a.telemetry.Telemetry, []consumer.Consumer{k8sMetadataProcessor2}) + tcpConnectAnalyzerFactory := a.componentsFactory.Analyzers[tcpconnectanalyzer.Type.String()] + tcpConnectAnalyzer := tcpConnectAnalyzerFactory.NewFunc(tcpConnectAnalyzerFactory.Config, a.telemetry.Telemetry, []consumer.Consumer{k8sMetadataProcessor}) // Initialize receiver packaged with multiple analyzers - analyzerManager, err := analyzer.NewManager(networkAnalyzer, tcpAnalyzer) + analyzerManager, err := analyzer.NewManager(networkAnalyzer, tcpAnalyzer, tcpConnectAnalyzer) if err != nil { return fmt.Errorf("error happened while creating analyzer manager: %w", err) } diff --git a/collector/consumer/exporter/otelexporter/otelexporter.go b/collector/consumer/exporter/otelexporter/otelexporter.go index c691771d0..e3c9d2d20 100644 --- a/collector/consumer/exporter/otelexporter/otelexporter.go +++ b/collector/consumer/exporter/otelexporter/otelexporter.go @@ -4,14 +4,12 @@ import ( "context" "errors" "fmt" + "os" + "time" "github.com/Kindling-project/kindling/collector/component" "github.com/Kindling-project/kindling/collector/consumer/exporter" "github.com/Kindling-project/kindling/collector/consumer/exporter/tools/adapter" - - "os" - "time" - "github.com/Kindling-project/kindling/collector/model/constnames" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" @@ -153,7 +151,7 @@ func NewExporter(config interface{}, telemetry *component.TelemetryTools) export StorePodDetail: cfg.AdapterConfig.NeedPodDetail, StoreExternalSrcIP: cfg.AdapterConfig.StoreExternalSrcIP, }), - adapter.NewSimpleAdapter([]string{constnames.TcpMetricGroupName}, customLabels), + adapter.NewSimpleAdapter([]string{constnames.TcpMetricGroupName, constnames.TcpConnectMetricGroupName}, customLabels), }, } go func() { @@ -220,7 +218,7 @@ func NewExporter(config interface{}, telemetry *component.TelemetryTools) export StorePodDetail: cfg.AdapterConfig.NeedPodDetail, StoreExternalSrcIP: cfg.AdapterConfig.StoreExternalSrcIP, }), - adapter.NewSimpleAdapter([]string{constnames.TcpMetricGroupName}, customLabels), + adapter.NewSimpleAdapter([]string{constnames.TcpMetricGroupName, constnames.TcpConnectMetricGroupName}, customLabels), }, } diff --git a/collector/consumer/processor/aggregateprocessor/processor.go b/collector/consumer/processor/aggregateprocessor/processor.go index d33948d4c..67892290b 100644 --- a/collector/consumer/processor/aggregateprocessor/processor.go +++ b/collector/consumer/processor/aggregateprocessor/processor.go @@ -1,6 +1,9 @@ package aggregateprocessor import ( + "math/rand" + "time" + "github.com/Kindling-project/kindling/collector/component" "github.com/Kindling-project/kindling/collector/consumer" "github.com/Kindling-project/kindling/collector/consumer/processor" @@ -10,8 +13,6 @@ import ( "github.com/Kindling-project/kindling/collector/pkg/aggregator" "github.com/Kindling-project/kindling/collector/pkg/aggregator/defaultaggregator" "go.uber.org/zap" - "math/rand" - "time" ) const Type = "aggregateprocessor" @@ -19,6 +20,8 @@ const Type = "aggregateprocessor" var exponentialInt64Boundaries = []int64{10e6, 20e6, 50e6, 80e6, 130e6, 200e6, 300e6, 400e6, 500e6, 700e6, 1e9, 2e9, 5e9, 30e9} +var tcpConnectLabelSelectors = newTcpConnectLabelSelectors() + type AggregateProcessor struct { cfg *Config telemetry *component.TelemetryTools @@ -122,6 +125,9 @@ func (p *AggregateProcessor) Consume(dataGroup *model.DataGroup) error { case constnames.TcpMetricGroupName: p.aggregator.Aggregate(dataGroup, p.tcpLabelSelectors) return nil + case constnames.TcpConnectMetricGroupName: + p.aggregator.Aggregate(dataGroup, tcpConnectLabelSelectors) + return nil default: p.aggregator.Aggregate(dataGroup, p.netRequestLabelSelectors) return nil @@ -197,6 +203,34 @@ func newTcpLabelSelectors() *aggregator.LabelSelectors { ) } +func newTcpConnectLabelSelectors() *aggregator.LabelSelectors { + return aggregator.NewLabelSelectors( + aggregator.LabelSelector{Name: constlabels.SrcNode, VType: aggregator.StringType}, + aggregator.LabelSelector{Name: constlabels.SrcNodeIp, VType: aggregator.StringType}, + aggregator.LabelSelector{Name: constlabels.SrcNamespace, VType: aggregator.StringType}, + aggregator.LabelSelector{Name: constlabels.SrcPod, VType: aggregator.StringType}, + aggregator.LabelSelector{Name: constlabels.SrcWorkloadName, VType: aggregator.StringType}, + aggregator.LabelSelector{Name: constlabels.SrcWorkloadKind, VType: aggregator.StringType}, + aggregator.LabelSelector{Name: constlabels.SrcService, VType: aggregator.StringType}, + aggregator.LabelSelector{Name: constlabels.SrcIp, VType: aggregator.StringType}, + aggregator.LabelSelector{Name: constlabels.SrcContainerId, VType: aggregator.StringType}, + aggregator.LabelSelector{Name: constlabels.SrcContainer, VType: aggregator.StringType}, + aggregator.LabelSelector{Name: constlabels.DstNode, VType: aggregator.StringType}, + aggregator.LabelSelector{Name: constlabels.DstNodeIp, VType: aggregator.StringType}, + aggregator.LabelSelector{Name: constlabels.DstNamespace, VType: aggregator.StringType}, + aggregator.LabelSelector{Name: constlabels.DstPod, VType: aggregator.StringType}, + aggregator.LabelSelector{Name: constlabels.DstWorkloadName, VType: aggregator.StringType}, + aggregator.LabelSelector{Name: constlabels.DstWorkloadKind, VType: aggregator.StringType}, + aggregator.LabelSelector{Name: constlabels.DstService, VType: aggregator.StringType}, + aggregator.LabelSelector{Name: constlabels.DstIp, VType: aggregator.StringType}, + aggregator.LabelSelector{Name: constlabels.DstPort, VType: aggregator.IntType}, + aggregator.LabelSelector{Name: constlabels.DstContainerId, VType: aggregator.StringType}, + aggregator.LabelSelector{Name: constlabels.DstContainer, VType: aggregator.StringType}, + aggregator.LabelSelector{Name: constlabels.Errno, VType: aggregator.IntType}, + aggregator.LabelSelector{Name: constlabels.Success, VType: aggregator.BooleanType}, + ) +} + func (p *AggregateProcessor) isSampled(dataGroup *model.DataGroup) bool { randSeed := rand.Intn(100) if isAbnormal(dataGroup) { diff --git a/collector/docker/kindling-collector-config.yml b/collector/docker/kindling-collector-config.yml index 619239ce9..3abc21bbe 100644 --- a/collector/docker/kindling-collector-config.yml +++ b/collector/docker/kindling-collector-config.yml @@ -27,7 +27,13 @@ receivers: - name: kprobe-tcp_rcv_established - name: kprobe-tcp_drop - name: kprobe-tcp_retransmit_skb + - name: syscall_exit-connect + - name: kretprobe-tcp_connect + - name: kprobe-tcp_set_state analyzers: + tcpconnectanalyzer: + channel_size: 10000 + wait_event_second: 10 tcpmetricanalyzer: networkanalyzer: connect_timeout: 100 @@ -95,6 +101,10 @@ processors: - kind: sum kindling_tcp_packet_loss_total: - kind: sum + kindling_tcp_connect_total: + - kind: sum + kindling_tcp_connect_duration_nanoseconds_total: + - kind: sum sampling_rate: normal_data: 0 slow_data: 100 @@ -124,6 +134,8 @@ exporters: kindling_tcp_srtt_microseconds: gauge kindling_tcp_retransmit_total: counter kindling_tcp_packet_loss_total: counter + kindling_tcp_connect_total: counter + kindling_tcp_connect_duration_nanoseconds_total: counter # Export data in the following ways: ["prometheus", "otlp", "stdout"] # Note: configure the corresponding section to make everything ok export_kind: prometheus diff --git a/collector/model/constlabels/const.go b/collector/model/constlabels/const.go index 9fcb46c5c..107d419af 100644 --- a/collector/model/constlabels/const.go +++ b/collector/model/constlabels/const.go @@ -49,6 +49,8 @@ const ( Ip = "ip" Port = "port" + Errno = "errno" + Success = "success" RequestContent = "request_content" ResponseContent = "response_content" StatusCode = "status_code" diff --git a/collector/model/constnames/const.go b/collector/model/constnames/const.go index 225935319..4c0cbee5a 100644 --- a/collector/model/constnames/const.go +++ b/collector/model/constnames/const.go @@ -9,11 +9,14 @@ const ( RecvFromEvent = "recvfrom" SendMsgEvent = "sendmsg" RecvMsgEvent = "recvmsg" + ConnectEvent = "connect" TcpCloseEvent = "tcp_close" TcpRcvEstablishedEvent = "tcp_rcv_established" TcpDropEvent = "tcp_drop" TcpRetransmitSkbEvent = "tcp_retransmit_skb" + TcpConnectEvent = "tcp_connect" + TcpSetStateEvent = "tcp_set_state" OtherEvent = "other" GrpcUprobeEvent = "grpc_uprobe" @@ -24,6 +27,7 @@ const ( // AggregatedNetRequestMetricGroup stands for the dataGroup after aggregation. AggregatedNetRequestMetricGroup = "aggregated_net_request_metric_group" - TcpMetricGroupName = "tcp_metric_metric_group" - NodeMetricGroupName = "node_metric_metric_group" + TcpMetricGroupName = "tcp_metric_metric_group" + NodeMetricGroupName = "node_metric_metric_group" + TcpConnectMetricGroupName = "tcp_connect_metric_group" ) diff --git a/collector/model/constnames/relabel_metric_const.go b/collector/model/constnames/relabel_metric_const.go index cf9bd3235..eb12d6a9a 100644 --- a/collector/model/constnames/relabel_metric_const.go +++ b/collector/model/constnames/relabel_metric_const.go @@ -34,6 +34,9 @@ const ( TcpRttMetricName = "kindling_tcp_srtt_microseconds" TcpRetransmitMetricName = "kindling_tcp_retransmit_total" TcpDropMetricName = "kindling_tcp_packet_loss_total" + + TcpConnectTotalMetric = "kindling_tcp_connect_total" + TcpConnectDurationMetric = "kindling_tcp_connect_duration_nanoseconds_total" ) const ( diff --git a/collector/model/kindling_event_helper.go b/collector/model/kindling_event_helper.go index 09026e704..c320de501 100644 --- a/collector/model/kindling_event_helper.go +++ b/collector/model/kindling_event_helper.go @@ -222,6 +222,18 @@ func (x *KindlingEvent) IsUdp() uint32 { return 0 } +func (x *KindlingEvent) IsTcp() bool { + context := x.GetCtx() + if context == nil { + return false + } + fd := context.GetFdInfo() + if fd == nil { + return false + } + return fd.GetProtocol() == L4Proto_TCP +} + func (x *KindlingEvent) IsConnect() bool { return x.Name == "connect" } diff --git a/collector/pkg/aggregator/label_key.go b/collector/pkg/aggregator/label_key.go index 0d26461f6..3d213e0fa 100644 --- a/collector/pkg/aggregator/label_key.go +++ b/collector/pkg/aggregator/label_key.go @@ -45,6 +45,10 @@ func (s *LabelSelectors) GetLabelKeys(labels *model.AttributeMap) *LabelKeys { return keys } +func (s *LabelSelectors) AppendSelectors(selectors ...LabelSelector) { + s.selectors = append(s.selectors, selectors...) +} + const maxLabelKeySize = 35 type LabelKeys struct { diff --git a/collector/receiver/udsreceiver/metrics.go b/collector/receiver/udsreceiver/metrics.go index a3ce748df..f727bc5bb 100644 --- a/collector/receiver/udsreceiver/metrics.go +++ b/collector/receiver/udsreceiver/metrics.go @@ -2,11 +2,12 @@ package udsreceiver import ( "context" + "sync" + "sync/atomic" + "github.com/Kindling-project/kindling/collector/model/constnames" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" - "sync" - "sync/atomic" ) var once sync.Once @@ -39,11 +40,14 @@ type stats struct { recvFrom int64 sendMsg int64 recvMsg int64 + connect int64 grpcUprobe int64 tcpClose int64 tcpRcvEstablished int64 tcpDrop int64 tcpRetransmitSkb int64 + tcpConnect int64 + tcpSetState int64 other int64 } @@ -75,6 +79,12 @@ func (i *stats) add(name string, value int64) { atomic.AddInt64(&i.tcpDrop, value) case constnames.TcpRetransmitSkbEvent: atomic.AddInt64(&i.tcpRetransmitSkb, value) + case constnames.ConnectEvent: + atomic.AddInt64(&i.connect, value) + case constnames.TcpConnectEvent: + atomic.AddInt64(&i.tcpConnect, value) + case constnames.TcpSetStateEvent: + atomic.AddInt64(&i.tcpSetState, value) default: atomic.AddInt64(&i.other, value) } @@ -95,6 +105,9 @@ func (i *stats) getStats() map[string]int64 { ret[constnames.TcpRcvEstablishedEvent] = atomic.LoadInt64(&i.tcpRcvEstablished) ret[constnames.TcpCloseEvent] = atomic.LoadInt64(&i.tcpClose) ret[constnames.TcpRetransmitSkbEvent] = atomic.LoadInt64(&i.tcpRetransmitSkb) + ret[constnames.ConnectEvent] = atomic.LoadInt64(&i.connect) + ret[constnames.TcpConnectEvent] = atomic.LoadInt64(&i.tcpConnect) + ret[constnames.TcpSetStateEvent] = atomic.LoadInt64(&i.tcpSetState) ret[constnames.OtherEvent] = atomic.LoadInt64(&i.other) return ret } diff --git a/deploy/agent/kindling-collector-config.yml b/deploy/agent/kindling-collector-config.yml index b08d2ff35..259968550 100644 --- a/deploy/agent/kindling-collector-config.yml +++ b/deploy/agent/kindling-collector-config.yml @@ -27,7 +27,13 @@ receivers: - name: kprobe-tcp_rcv_established - name: kprobe-tcp_drop - name: kprobe-tcp_retransmit_skb + - name: syscall_exit-connect + - name: kretprobe-tcp_connect + - name: kprobe-tcp_set_state analyzers: + tcpconnectanalyzer: + channel_size: 10000 + wait_event_second: 10 tcpmetricanalyzer: networkanalyzer: connect_timeout: 100 @@ -95,6 +101,10 @@ processors: - kind: sum kindling_tcp_packet_loss_total: - kind: sum + kindling_tcp_connect_total: + - kind: sum + kindling_tcp_connect_duration_nanoseconds_total: + - kind: sum sampling_rate: normal_data: 0 slow_data: 100 @@ -124,6 +134,8 @@ exporters: kindling_tcp_srtt_microseconds: gauge kindling_tcp_retransmit_total: counter kindling_tcp_packet_loss_total: counter + kindling_tcp_connect_total: counter + kindling_tcp_connect_duration_nanoseconds_total: counter # Export data in the following ways: ["prometheus", "otlp", "stdout"] # Note: configure the corresponding section to make everything ok export_kind: prometheus diff --git a/docs/prometheus_metrics.md b/docs/prometheus_metrics.md index 5e7ee62f1..8e36c8bd6 100644 --- a/docs/prometheus_metrics.md +++ b/docs/prometheus_metrics.md @@ -165,12 +165,12 @@ We made some rules for considering whether a request is abnormal. For the abnorm | `request_processing_status` | 3 | Processing indicates the duration until receiving the first byte.
1(green): latency <= 200ms
2(yellow): 2003(red): latency >= 1000 | | `response_rspxfer_status` | 1 | RspXfer indicates the duration for transferring response bopayloaddy.
1(green): latency <= 200ms
2(yellow): 2003(red): latency >= 1000 | -## TCP (Layer 4) Metrics +## TCP Status Metrics ### Metrics List | **Metric Name** | **Type** | **Description** | | --- | --- | --- | -| `kindling_tcp_srtt_microseconds` | Gauge | Smoothed round trip time of the tcp socket | +| `kindling_tcp_srtt_microseconds` | Gauge | Smoothed round trip time of the TCP socket | | `kindling_tcp_packet_loss_total` | Counter | Total number of dropped packets | | `kindling_tcp_retransmit_total` | Counter | Total times of retransmitting happens (not packets count) | @@ -196,6 +196,44 @@ We made some rules for considering whether a request is abnormal. For the abnorm | `dst_ip` | 10.1.11.24 | Pod's IP by default. If the destination is not a pod in Kubernetes, this is the IP address of an external entity | | `dst_port` | 80 | The listening port of the destination container, if applicable | +## TCP Socket Connects Metrics + +### Metrics List +| **Metric Name** | **Type** | **Description** | +| --- | --- | --- | +| `kindling_tcp_connect_total` | Counter | Total number of successfully and unsuccessfully established TCP connections | +| `kindling_tcp_connect_duration_nanoseconds_total` | Counter | Total duration of the successfully established TCP connections | + +### Labels List +| **Label Name** | **Example** | **Notes** | +| --- | --- | --- | +| `src_node` | slave-node1 | Which node the source pod is on | +| `src_namespace` | default | Namespace of the source pod | +| `src_workload_kind` | deployment | Workload kind of the source pod | +| `src_workload_name` | business1 | Workload name of the source pod | +| `src_service` | business1-svc | One of the services that target the source pod | +| `src_pod` | business1-0 | The name of the source pod | +| `src_container` | business-container | The name of the source container | +| `src_container_id` | 1a2b3c4d5e6f | The shorten container id which contains 12 characters | +| `src_ip` | 10.1.11.23 | Pod's IP by default. If the source is not a pod in Kubernetes, this is the IP address of an external entity | +| `dst_node` | slave-node2 | Which node the destination pod is on | +| `dst_namespace` | default | Namespace of the destination pod | +| `dst_workload_kind` | deployment | Workload kind of the destination pod | +| `dst_workload_name` | business2 | Workload name of the destination pod | +| `dst_service` | business2-svc | One of the services that target the destination pod | +| `dst_pod` | business2-0 | The name of the destination pod | +| `dst_container` | business-container | The name of the destination container | +| `dst_ip` | 10.1.11.24 | Pod's IP by default. If the destination is not a pod in Kubernetes, this is the IP address of an external entity | +| `dst_port` | 80 | The listening port of the destination container, if applicable | +| `success` | true | Whether the TCP connection is successfully established | +| `errno` | 0 | The error number of the TCP connection. 0 if no error. Note it could also be 0 even if there is an error. | + +### Notes +**Note 1**: The field `success` for `kindling_tcp_connect_duration_nanoseconds_total` is always `true`. + +**Note 2**: The field `errno` is not `0` only if the TCP socket is blocking and there is an error happened. There are multiple possible values it could contain. See the `ERRORS` section of the [connect(2) manual](https://man7.org/linux/man-pages/man2/connect.2.html) for more details. + + ## PromQL Example Here are some examples of how to use these metrics in Prometheus, which can help you understand them faster.