From 668ac9d89d524b6f94925e3c0510948a2eb5326f Mon Sep 17 00:00:00 2001 From: Daxin Wang Date: Tue, 31 May 2022 17:19:38 +0800 Subject: [PATCH 01/17] Add tcp connection metrics Signed-off-by: Daxin Wang --- .../analyzer/tcpconnectanalyzer/analyzer.go | 223 ++++++++++++ .../analyzer/tcpconnectanalyzer/config.go | 13 + .../internal/connect_monitor.go | 334 ++++++++++++++++++ .../internal/connection_stats.go | 132 +++++++ .../internal/net_ip_socket.go | 163 +++++++++ .../internal/net_ip_socket_test.go | 74 ++++ .../internal/state_machine.go | 67 ++++ .../internal/state_machine_test.go | 57 +++ .../tcpconnectanalyzer/internal/tcp_stat.go | 11 + .../analyzer/tcpconnectanalyzer/metrics.go | 23 ++ collector/application/application.go | 7 +- .../exporter/otelexporter/otelexporter.go | 12 +- .../processor/aggregateprocessor/processor.go | 38 +- .../docker/kindling-collector-config.yml | 12 + collector/model/constlabels/const.go | 2 + collector/model/constnames/const.go | 4 + .../model/constnames/relabel_metric_const.go | 3 + collector/model/kindling_event_helper.go | 12 + collector/pkg/aggregator/label_key.go | 4 + collector/receiver/udsreceiver/metrics.go | 17 +- collector/receiver/udsreceiver/udsreceiver.go | 14 +- deploy/agent/kindling-collector-config.yml | 12 + 22 files changed, 1219 insertions(+), 15 deletions(-) create mode 100644 collector/analyzer/tcpconnectanalyzer/analyzer.go create mode 100644 collector/analyzer/tcpconnectanalyzer/config.go create mode 100644 collector/analyzer/tcpconnectanalyzer/internal/connect_monitor.go create mode 100644 collector/analyzer/tcpconnectanalyzer/internal/connection_stats.go create mode 100644 collector/analyzer/tcpconnectanalyzer/internal/net_ip_socket.go create mode 100644 collector/analyzer/tcpconnectanalyzer/internal/net_ip_socket_test.go create mode 100644 collector/analyzer/tcpconnectanalyzer/internal/state_machine.go create mode 100644 collector/analyzer/tcpconnectanalyzer/internal/state_machine_test.go create mode 100644 collector/analyzer/tcpconnectanalyzer/internal/tcp_stat.go create mode 100644 collector/analyzer/tcpconnectanalyzer/metrics.go diff --git a/collector/analyzer/tcpconnectanalyzer/analyzer.go b/collector/analyzer/tcpconnectanalyzer/analyzer.go new file mode 100644 index 000000000..6b85c923d --- /dev/null +++ b/collector/analyzer/tcpconnectanalyzer/analyzer.go @@ -0,0 +1,223 @@ +package tcpconnectanalyzer + +import ( + "time" + + "github.com/Kindling-project/kindling/collector/analyzer" + "github.com/Kindling-project/kindling/collector/analyzer/tcpconnectanalyzer/internal" + "github.com/Kindling-project/kindling/collector/component" + "github.com/Kindling-project/kindling/collector/consumer" + conntrackerpackge "github.com/Kindling-project/kindling/collector/metadata/conntracker" + "github.com/Kindling-project/kindling/collector/model" + "github.com/Kindling-project/kindling/collector/model/constlabels" + "github.com/Kindling-project/kindling/collector/model/constnames" + "github.com/hashicorp/go-multierror" + "go.uber.org/zap" +) + +const Type analyzer.Type = "tcpconnectanalyzer" + +var consumableEvents = map[string]bool{ + constnames.ConnectEvent: true, + constnames.TcpConnectEvent: true, + constnames.TcpSetStateEvent: true, +} + +type TcpConnectAnalyzer struct { + config *Config + nextConsumers []consumer.Consumer + conntracker conntrackerpackge.Conntracker + + eventChannel chan *model.KindlingEvent + connectMonitor *internal.ConnectMonitor + + stopCh chan bool + + telemetry *component.TelemetryTools +} + +func New(cfg interface{}, telemetry *component.TelemetryTools, consumers []consumer.Consumer) analyzer.Analyzer { + config := cfg.(*Config) + ret := &TcpConnectAnalyzer{ + config: config, + nextConsumers: consumers, + telemetry: telemetry, + eventChannel: make(chan *model.KindlingEvent, config.ChannelSize), + stopCh: make(chan bool), + + connectMonitor: internal.NewConnectMonitor(telemetry.Logger), + } + conntracker, err := conntrackerpackge.NewConntracker(nil) + if err != nil { + telemetry.Logger.Warn("Conntracker cannot work as expected:", zap.Error(err)) + } + ret.conntracker = conntracker + newSelfMetrics(telemetry.MeterProvider, ret.connectMonitor) + return ret +} + +// Start initializes the analyzer +func (a *TcpConnectAnalyzer) Start() error { + go func() { + timeoutTicker := time.NewTicker(time.Duration(a.config.TimeoutSecond/5) * time.Second) + scanTcpStateTicker := time.NewTicker(5 * time.Second) + for { + select { + case <-timeoutTicker.C: + a.trimExpiredConnStats() + case <-scanTcpStateTicker.C: + a.trimConnectionsWithTcpStat() + case event := <-a.eventChannel: + a.consumeChannelEvent(event) + case <-a.stopCh: + // Only trim the connections expired. For those unfinished, we leave them + // unchanged and just shutdown this goroutine. + a.trimConnectionsWithTcpStat() + a.trimExpiredConnStats() + return + } + } + }() + return nil +} + +// ConsumeEvent gets the event from the previous component +func (a *TcpConnectAnalyzer) ConsumeEvent(event *model.KindlingEvent) error { + eventName := event.Name + if ok := consumableEvents[eventName]; !ok { + return nil + } + a.eventChannel <- event + return nil +} + +func (a *TcpConnectAnalyzer) consumeChannelEvent(event *model.KindlingEvent) { + var ( + connectStats *internal.ConnectionStats + err error + ) + + switch event.Name { + case constnames.ConnectEvent: + if !event.IsTcp() { + return + } + connectStats, err = a.connectMonitor.ReadInConnectExitSyscall(event) + case constnames.TcpConnectEvent: + connectStats, err = a.connectMonitor.ReadInTcpConnect(event) + case constnames.TcpSetStateEvent: + connectStats, err = a.connectMonitor.ReadInTcpSetState(event) + } + + if err != nil { + a.telemetry.Logger.Debug("Cannot update connection stats:", zap.Error(err)) + return + } + // Connection is not established yet + if connectStats == nil { + return + } + + gaugeGroup := a.generateGaugeGroup(connectStats) + a.passThroughConsumers(gaugeGroup) +} + +func (a *TcpConnectAnalyzer) trimExpiredConnStats() { + connStats := a.connectMonitor.TrimExpiredConnections(a.config.TimeoutSecond) + for _, connStat := range connStats { + gaugeGroup := a.generateGaugeGroup(connStat) + a.passThroughConsumers(gaugeGroup) + } +} + +func (a *TcpConnectAnalyzer) trimConnectionsWithTcpStat() { + connStats := a.connectMonitor.TrimConnectionsWithTcpStat() + for _, connStat := range connStats { + gaugeGroup := a.generateGaugeGroup(connStat) + a.passThroughConsumers(gaugeGroup) + } +} + +func (a *TcpConnectAnalyzer) passThroughConsumers(gaugeGroup *model.GaugeGroup) { + var retError error + for _, nextConsumer := range a.nextConsumers { + err := nextConsumer.Consume(gaugeGroup) + if err != nil { + retError = multierror.Append(retError, err) + } + } + if retError != nil { + a.telemetry.Logger.Warn("Error happened while passing through processors:", zap.Error(retError)) + } +} + +func (a *TcpConnectAnalyzer) generateGaugeGroup(connectStats *internal.ConnectionStats) *model.GaugeGroup { + labels := model.NewAttributeMap() + // The connect events always come from the client-side + labels.AddBoolValue(constlabels.IsServer, false) + if connectStats.ConnectSyscall != nil { + labels.AddStringValue(constlabels.ContainerId, connectStats.ConnectSyscall.GetContainerId()) + } + labels.AddIntValue(constlabels.Errno, int64(connectStats.Code)) + if connectStats.StateMachine.GetCurrentState() == internal.Closed { + lastState := connectStats.StateMachine.GetLastState() + if lastState == internal.Success { + labels.AddBoolValue(constlabels.Success, true) + } else { + labels.AddBoolValue(constlabels.Success, false) + } + } else if connectStats.StateMachine.GetCurrentState() == internal.Success { + labels.AddBoolValue(constlabels.Success, true) + } else { + labels.AddBoolValue(constlabels.Success, false) + } + srcIp := connectStats.ConnKey.SrcIP + dstIp := connectStats.ConnKey.DstIP + srcPort := connectStats.ConnKey.SrcPort + dstPort := connectStats.ConnKey.DstPort + labels.UpdateAddStringValue(constlabels.SrcIp, srcIp) + labels.UpdateAddStringValue(constlabels.DstIp, dstIp) + labels.UpdateAddIntValue(constlabels.SrcPort, int64(srcPort)) + labels.UpdateAddIntValue(constlabels.DstPort, int64(dstPort)) + dNatIp, dNatPort := a.findDNatTuple(srcIp, uint64(srcPort), dstIp, uint64(dstPort)) + labels.AddStringValue(constlabels.DnatIp, dNatIp) + labels.AddIntValue(constlabels.DnatPort, dNatPort) + + countValue := &model.Gauge{ + Name: constnames.TcpConnectTotalMetric, + Value: 1, + } + durationValue := &model.Gauge{ + Name: constnames.TcpConnectDurationMetric, + Value: connectStats.GetConnectDuration(), + } + + retGaugeGroup := model.NewGaugeGroup( + constnames.TcpConnectGaugeGroupName, + labels, + connectStats.EndTimestamp, + countValue, durationValue) + + return retGaugeGroup +} + +func (a *TcpConnectAnalyzer) findDNatTuple(sIp string, sPort uint64, dIp string, dPort uint64) (string, int64) { + dNat := a.conntracker.GetDNATTupleWithString(sIp, dIp, uint16(sPort), uint16(dPort), 0) + if dNat == nil { + return "", -1 + } + dNatIp := dNat.ReplSrcIP.String() + dNatPort := dNat.ReplSrcPort + return dNatIp, int64(dNatPort) +} + +// Shutdown cleans all the resources used by the analyzer +func (a *TcpConnectAnalyzer) Shutdown() error { + a.stopCh <- true + return nil +} + +// Type returns the type of the analyzer +func (a *TcpConnectAnalyzer) Type() analyzer.Type { + return Type +} diff --git a/collector/analyzer/tcpconnectanalyzer/config.go b/collector/analyzer/tcpconnectanalyzer/config.go new file mode 100644 index 000000000..a9d945a21 --- /dev/null +++ b/collector/analyzer/tcpconnectanalyzer/config.go @@ -0,0 +1,13 @@ +package tcpconnectanalyzer + +type Config struct { + ChannelSize int `mapstructure:"channel_size"` + TimeoutSecond int `mapstructure:"timeout_second"` +} + +func NewDefaultConfig() *Config { + return &Config{ + ChannelSize: 2000, + TimeoutSecond: 60, + } +} diff --git a/collector/analyzer/tcpconnectanalyzer/internal/connect_monitor.go b/collector/analyzer/tcpconnectanalyzer/internal/connect_monitor.go new file mode 100644 index 000000000..627691079 --- /dev/null +++ b/collector/analyzer/tcpconnectanalyzer/internal/connect_monitor.go @@ -0,0 +1,334 @@ +package internal + +import ( + "fmt" + "os" + "syscall" + "time" + + "github.com/Kindling-project/kindling/collector/model" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +// ConnectMonitor reads in events related to TCP connect operations and updates its +// status to record the connection procedure. +// This is not thread safe to use. +type ConnectMonitor struct { + connMap map[ConnKey]*ConnectionStats + statesResource StatesResource + hostProcPath string + logger *zap.Logger +} + +const HostProc = "HOST_PROC_PATH" + +func NewConnectMonitor(logger *zap.Logger) *ConnectMonitor { + path, ok := os.LookupEnv(HostProc) + if !ok { + path = "/proc" + } + return &ConnectMonitor{ + connMap: make(map[ConnKey]*ConnectionStats), + statesResource: createStatesResource(), + hostProcPath: path, + logger: logger, + } +} + +func (c *ConnectMonitor) ReadInConnectExitSyscall(event *model.KindlingEvent) (*ConnectionStats, error) { + retValue := event.GetUserAttribute("res") + if retValue == nil { + return nil, fmt.Errorf("res of connect_exit is nil") + } + retValueInt := retValue.GetIntValue() + + connKey := ConnKey{ + SrcIP: event.GetSip(), + SrcPort: event.GetSport(), + DstIP: event.GetDip(), + DstPort: event.GetDport(), + } + if ce := c.logger.Check(zapcore.DebugLevel, "Receive connect_exit event:"); ce != nil { + ce.Write( + zap.String("ConnKey", connKey.String()), + zap.Int64("retValue", retValueInt), + ) + } + + connStats, ok := c.connMap[connKey] + if !ok { + // Maybe the connStats have been closed by tcp_set_state_from_established event. + // We don't care about it. + return nil, nil + } else { + // "connect_exit" comes to analyzer after "tcp_connect" + connStats.EndTimestamp = event.Timestamp + connStats.ConnectSyscall = event + connStats.pid = event.GetPid() + var eventType EventType + if retValueInt == 0 { + eventType = connectExitSyscallSuccess + } else if isNotErrorReturnCode(connCode(retValueInt)) { + eventType = connectExitSyscallNotConcern + } else { + eventType = connectExitSyscallFailure + connStats.Code = connCode(retValueInt) + } + return connStats.StateMachine.ReceiveEvent(eventType, c.connMap) + } +} + +func isNotErrorReturnCode(code connCode) bool { + return code == einprogress || code == eintr || code == eisconn || code == ealready +} + +func (c *ConnectMonitor) ReadInTcpConnect(event *model.KindlingEvent) (*ConnectionStats, error) { + connKey, err := getConnKeyForTcpConnect(event) + if err != nil { + return nil, err + } + retValue := event.GetUserAttribute("retval") + if retValue == nil { + return nil, fmt.Errorf("retval of tcp_connect is nil") + } + retValueInt := retValue.GetUintValue() + + if ce := c.logger.Check(zapcore.DebugLevel, "Receive tcp_connect event:"); ce != nil { + ce.Write( + zap.String("ConnKey", connKey.String()), + zap.Uint64("retValue", retValueInt), + ) + } + + var eventType EventType + if retValueInt == 0 { + eventType = tcpConnectNoError + } else { + eventType = tcpConnectError + } + + connStats, ok := c.connMap[connKey] + if !ok { + // "tcp_connect" comes to analyzer before "connect_exit" + connStats = &ConnectionStats{ + ConnKey: connKey, + InitialTimestamp: event.Timestamp, + EndTimestamp: event.Timestamp, + Code: connCode(retValueInt), + ConnectSyscall: nil, + TcpConnect: event, + TcpSetState: nil, + } + connStats.StateMachine = NewStateMachine(Inprogress, c.statesResource, connStats) + c.connMap[connKey] = connStats + } else { + // "tcp_connect" comes to analyzer after "connect_exit" + connStats.TcpConnect = event + connStats.EndTimestamp = event.Timestamp + connStats.Code = connCode(retValueInt) + } + return connStats.StateMachine.ReceiveEvent(eventType, c.connMap) +} + +const ( + establishedState = 1 +) + +func (c *ConnectMonitor) ReadInTcpSetState(event *model.KindlingEvent) (*ConnectionStats, error) { + connKey, err := getConnKeyForTcpConnect(event) + if err != nil { + return nil, err + } + + oldState := event.GetUserAttribute("old_state") + newState := event.GetUserAttribute("new_state") + if oldState == nil || newState == nil { + return nil, fmt.Errorf("tcp_set_state events have nil state, ConnKey: %s", connKey.String()) + } + oldStateInt := oldState.GetIntValue() + newStateInt := newState.GetIntValue() + + if oldStateInt == establishedState { + return c.readInTcpSetStateFromEstablished(connKey, event) + } else if newStateInt == establishedState { + return c.readInTcpSetStateToEstablished(connKey, event) + } else { + return nil, fmt.Errorf("no state is 'established' for tcp_set_state event, "+ + "old state: %d, new state: %d", oldStateInt, newStateInt) + } +} + +func (c *ConnectMonitor) readInTcpSetStateToEstablished(connKey ConnKey, event *model.KindlingEvent) (*ConnectionStats, error) { + if ce := c.logger.Check(zapcore.DebugLevel, "Receive tcp_set_state(to established) event:"); ce != nil { + ce.Write( + zap.String("ConnKey", connKey.String()), + ) + } + connStats, ok := c.connMap[connKey] + if !ok { + // No tcp_connect or connect_exit received. + // This is the events from server-side. + c.logger.Debug("No tcp_connect or connect_exit, but receive tcp_set_state_to_established") + return nil, nil + } else { + connStats.TcpSetState = event + connStats.EndTimestamp = event.Timestamp + if connStats.TcpConnect == nil { + c.logger.Debug("No tcp_connect event, but receive tcp_set_state_to_established") + } + return connStats.StateMachine.ReceiveEvent(tcpSetStateToEstablished, c.connMap) + } +} + +func (c *ConnectMonitor) readInTcpSetStateFromEstablished(connKey ConnKey, event *model.KindlingEvent) (*ConnectionStats, error) { + if ce := c.logger.Check(zapcore.DebugLevel, "Receive tcp_set_state(from established) event:"); ce != nil { + ce.Write( + zap.String("ConnKey", connKey.String()), + ) + } + connStats, ok := c.connMap[connKey] + if !ok { + // Connection has been established and the connStats have been emitted. + return nil, nil + } else { + connStats.TcpSetState = event + connStats.EndTimestamp = event.Timestamp + // There should be multiple transmission happened. + if connStats.StateMachine.currentStateType == Inprogress { + stats, err := connStats.StateMachine.ReceiveEvent(tcpSetStateFromEstablished, c.connMap) + _, _ = connStats.StateMachine.ReceiveEvent(tcpSetStateFromEstablished, c.connMap) + return stats, err + } + return connStats.StateMachine.ReceiveEvent(tcpSetStateFromEstablished, c.connMap) + } +} + +// TrimExpiredConnections traverses the map, remove the expired entries based on timeout, +// and return them. +// The unit of timeout is second. +func (c *ConnectMonitor) TrimExpiredConnections(timeout int) []*ConnectionStats { + ret := make([]*ConnectionStats, 0) + if timeout <= 0 { + return ret + } + timeoutNano := int64(timeout) * 1000000000 + for _, connStat := range c.connMap { + if time.Now().UnixNano()-int64(connStat.InitialTimestamp) >= timeoutNano { + stats, err := connStat.StateMachine.ReceiveEvent(expiredEvent, c.connMap) + if err != nil { + c.logger.Warn("error happened when receiving event:", zap.Error(err)) + continue + } + if stats != nil { + ret = append(ret, stats) + } + } + } + + return ret +} + +func (c *ConnectMonitor) TrimConnectionsWithTcpStat() []*ConnectionStats { + ret := make([]*ConnectionStats, 0, len(c.connMap)) + // Only scan once for each pid + pidTcpStateMap := make(map[uint32]NetSocketStateMap) + for key, connStat := range c.connMap { + if connStat.pid == 0 { + continue + } + tcpStateMap, ok := pidTcpStateMap[connStat.pid] + if !ok { + tcpState, err := NewPidTcpStat(c.hostProcPath, int(connStat.pid)) + if err != nil { + if err == syscall.ENOENT { + // No such file or directory, which means the process has been purged. + // We consider the connection failed to be established. + stats, err := connStat.StateMachine.ReceiveEvent(expiredEvent, c.connMap) + if err != nil { + c.logger.Warn("error happened when receiving event:", zap.Error(err)) + } + if stats != nil { + ret = append(ret, stats) + } + } else { + c.logger.Info("error happened when scanning net/tcp", + zap.Uint32("pid", connStat.pid), zap.Error(err)) + } + continue + } + pidTcpStateMap[connStat.pid] = tcpState + tcpStateMap = tcpState + } + state, ok := tcpStateMap[key.toSocketKey()] + // There are two possible reasons for no such socket found: + // 1. The connection was established and has been closed. + // There should have been received a tcp_set_state event, and c.connMap + // should not contain such socket. In this case, we won't enter this piece of code. + // 2. The connection failed to be established and has been closed. + if !ok { + stats, err := connStat.StateMachine.ReceiveEvent(expiredEvent, c.connMap) + if err != nil { + c.logger.Warn("error happened when receiving event:", zap.Error(err)) + } + if stats != nil { + ret = append(ret, stats) + } + continue + } + if state == established { + stats, err := connStat.StateMachine.ReceiveEvent(tcpSetStateToEstablished, c.connMap) + if err != nil { + c.logger.Warn("error happened when receiving event:", zap.Error(err)) + } + if stats != nil { + ret = append(ret, stats) + } + } else if state == synSent || state == synRecv { + continue + } else { + // These states are behind the ESTABLISHED state. As we believe that + // tcp_set_state event that is from established to other states always comes, + // the codes should not run into this branch. + c.logger.Debug("See sockets whose state is behind ESTABLISHED, which means no "+ + "tcp_set_state_from_established received.", zap.String("state", state)) + stats, err := connStat.StateMachine.ReceiveEvent(tcpSetStateFromEstablished, c.connMap) + if err != nil { + c.logger.Warn("error happened when receiving event:", zap.Error(err)) + } + if stats != nil { + ret = append(ret, stats) + } + if connStat.StateMachine.currentStateType == Success { + _, _ = connStat.StateMachine.ReceiveEvent(tcpSetStateFromEstablished, c.connMap) + } + } + } + return ret +} + +func (c *ConnectMonitor) GetMapSize() int { + return len(c.connMap) +} + +func getConnKeyForTcpConnect(event *model.KindlingEvent) (ConnKey, error) { + sIp := event.GetUserAttribute("sip") + sPort := event.GetUserAttribute("sport") + dIp := event.GetUserAttribute("dip") + dPort := event.GetUserAttribute("dport") + + if sIp == nil || sPort == nil || dIp == nil || dPort == nil { + return ConnKey{}, fmt.Errorf("one of sip or dip or dport is nil for event %s", event.Name) + } + sIpString := model.IPLong2String(uint32(sIp.GetUintValue())) + sPortUint := sPort.GetUintValue() + dIpString := model.IPLong2String(uint32(dIp.GetUintValue())) + dPortUint := dPort.GetUintValue() + connKey := ConnKey{ + SrcIP: sIpString, + SrcPort: uint32(sPortUint), + DstIP: dIpString, + DstPort: uint32(dPortUint), + } + return connKey, nil +} diff --git a/collector/analyzer/tcpconnectanalyzer/internal/connection_stats.go b/collector/analyzer/tcpconnectanalyzer/internal/connection_stats.go new file mode 100644 index 000000000..e582e5aa7 --- /dev/null +++ b/collector/analyzer/tcpconnectanalyzer/internal/connection_stats.go @@ -0,0 +1,132 @@ +package internal + +import ( + "fmt" + + "github.com/Kindling-project/kindling/collector/model" +) + +type connCode int + +const ( + noError connCode = iota + // See in Linux + einprogress = -115 + ealready = -114 + eisconn = -106 + eintr = -4 +) + +const ( + established = "01" + synSent = "02" + synRecv = "03" + finWait1 = "04" + finWait2 = "05" + timeWait = "06" + close = "07" + closeWait = "08" + lastAck = "09" + listen = "0A" + closing = "0B" +) + +type ConnectionStats struct { + pid uint32 + ConnKey ConnKey + StateMachine *StateMachine + InitialTimestamp uint64 + EndTimestamp uint64 + Code connCode + + ConnectSyscall *model.KindlingEvent + TcpConnect *model.KindlingEvent + TcpSetState *model.KindlingEvent +} + +func (c *ConnectionStats) GetConnectDuration() int64 { + return int64(c.EndTimestamp - c.InitialTimestamp) +} + +type ConnKey struct { + SrcIP string + SrcPort uint32 + DstIP string + DstPort uint32 +} + +func (k *ConnKey) toSocketKey() SocketKey { + return SocketKey{ + LocalAddr: k.SrcIP, + LocalPort: uint64(k.SrcPort), + RemAddr: k.DstIP, + RemPort: uint64(k.DstPort), + } +} + +func (k *ConnKey) String() string { + return fmt.Sprintf("src: %s:%d, dst: %s:%d", k.SrcIP, k.SrcPort, k.DstIP, k.DstPort) +} + +const ( + Inprogress StateType = "inprogress" + Success StateType = "success" + Failure StateType = "failure" + Closed StateType = "closed" + + tcpConnectError EventType = "tcp_connect_negative" + tcpConnectNoError EventType = "tcp_connect_zero" + tcpSetStateToEstablished EventType = "tcp_set_state_to_established" + tcpSetStateFromEstablished EventType = "tcp_set_state_from_established" + connectExitSyscallSuccess EventType = "connect_exit_syscall_zero" + connectExitSyscallFailure EventType = "connect_exit_syscall_failure" + connectExitSyscallNotConcern EventType = "connect_exit_syscall_not_concern" + expiredEvent EventType = "expired_event" +) + +func createStatesResource() StatesResource { + return StatesResource{ + Inprogress: State{ + eventsMap: map[EventType]StateType{ + tcpConnectNoError: Inprogress, + tcpConnectError: Failure, + tcpSetStateToEstablished: Success, + tcpSetStateFromEstablished: Success, + connectExitSyscallSuccess: Success, + connectExitSyscallFailure: Failure, + connectExitSyscallNotConcern: Inprogress, + expiredEvent: Failure, + }, + callback: nil, + }, + Success: { + eventsMap: map[EventType]StateType{ + tcpSetStateToEstablished: Success, + tcpSetStateFromEstablished: Closed, + connectExitSyscallSuccess: Success, + connectExitSyscallNotConcern: Success, + expiredEvent: Closed, + }, + callback: func(connStats *ConnectionStats, connMap map[ConnKey]*ConnectionStats) *ConnectionStats { + return connStats + }, + }, + Failure: { + eventsMap: map[EventType]StateType{ + connectExitSyscallFailure: Failure, + connectExitSyscallNotConcern: Failure, + expiredEvent: Closed, + }, + callback: func(connStats *ConnectionStats, connMap map[ConnKey]*ConnectionStats) *ConnectionStats { + return connStats + }, + }, + Closed: { + eventsMap: map[EventType]StateType{}, + callback: func(connStats *ConnectionStats, connMap map[ConnKey]*ConnectionStats) *ConnectionStats { + delete(connMap, connStats.ConnKey) + return nil + }, + }, + } +} diff --git a/collector/analyzer/tcpconnectanalyzer/internal/net_ip_socket.go b/collector/analyzer/tcpconnectanalyzer/internal/net_ip_socket.go new file mode 100644 index 000000000..dd66763dc --- /dev/null +++ b/collector/analyzer/tcpconnectanalyzer/internal/net_ip_socket.go @@ -0,0 +1,163 @@ +// Copyright 2020 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Modification: +// 1. Change the return parameter of newNetIPSocket to map. +// 2. Remove unused fields parsed. + +package internal + +import ( + "bufio" + "encoding/hex" + "fmt" + "io" + "net" + "os" + "strconv" + "strings" +) + +const ( + // readLimit is used by io.LimitReader while reading the content of the + // /proc/net/udp{,6} files. The number of lines inside such a file is dynamic + // as each line represents a single used socket. + // In theory, the number of available sockets is 65535 (2^16 - 1) per IP. + // With e.g. 150 Byte per line and the maximum number of 65535, + // the reader needs to handle 150 Byte * 65535 =~ 10 MB for a single IP. + readLimit = 4294967296 // Byte -> 4 GiB +) + +// This contains generic data structures for both udp and tcp sockets. +type ( + SocketKey struct { + LocalAddr string + LocalPort uint64 + RemAddr string + RemPort uint64 + } + + // NetSocketStateMap represents the map between socket and its state. + NetSocketStateMap map[SocketKey]string + + // netIPSocketLine represents the fields parsed from a single line + // in /proc/net/{t,u}dp{,6}. Fields which are not used by IPSocket are skipped. + // For the proc file format details, see https://linux.die.net/man/5/proc. + netIPSocketLine struct { + LocalAddr net.IP + LocalPort uint64 + RemAddr net.IP + RemPort uint64 + St string + } +) + +func newNetIPSocket(file string) (NetSocketStateMap, error) { + f, err := os.Open(file) + if err != nil { + return nil, err + } + defer func() { + _ = f.Close() + }() + + netIPSocket := make(NetSocketStateMap) + + lr := io.LimitReader(f, readLimit) + s := bufio.NewScanner(lr) + s.Scan() // skip first line with headers + for s.Scan() { + fields := strings.Fields(s.Text()) + line, err := parseNetIPSocketLine(fields) + if err != nil { + return nil, err + } + socketKey := SocketKey{ + LocalAddr: line.LocalAddr.String(), + LocalPort: line.LocalPort, + RemAddr: line.RemAddr.String(), + RemPort: line.RemPort, + } + netIPSocket[socketKey] = line.St + } + if err := s.Err(); err != nil { + return nil, err + } + return netIPSocket, nil +} + +// the /proc/net/{t,u}dp{,6} files are network byte order for ipv4 and for ipv6 the address +// is four words consisting of four bytes each. In each of those four words the four bytes +// are written in reverse order. +func parseIP(hexIP string) (net.IP, error) { + var byteIP []byte + byteIP, err := hex.DecodeString(hexIP) + if err != nil { + return nil, fmt.Errorf("cannot parse address field in socket line %q", hexIP) + } + switch len(byteIP) { + case 4: + return net.IP{byteIP[3], byteIP[2], byteIP[1], byteIP[0]}, nil + case 16: + i := net.IP{ + byteIP[3], byteIP[2], byteIP[1], byteIP[0], + byteIP[7], byteIP[6], byteIP[5], byteIP[4], + byteIP[11], byteIP[10], byteIP[9], byteIP[8], + byteIP[15], byteIP[14], byteIP[13], byteIP[12], + } + return i, nil + default: + return nil, fmt.Errorf("unable to parse IP %s", hexIP) + } +} + +// parseNetIPSocketLine parses a single line, represented by a list of fields. +func parseNetIPSocketLine(fields []string) (*netIPSocketLine, error) { + line := &netIPSocketLine{} + if len(fields) < 10 { + return nil, fmt.Errorf( + "cannot parse net socket line as it has less then 10 columns %q", + strings.Join(fields, " "), + ) + } + var err error // parse error + + // local_address + l := strings.Split(fields[1], ":") + if len(l) != 2 { + return nil, fmt.Errorf("cannot parse local_address field in socket line %q", fields[1]) + } + if line.LocalAddr, err = parseIP(l[0]); err != nil { + return nil, err + } + if line.LocalPort, err = strconv.ParseUint(l[1], 16, 64); err != nil { + return nil, fmt.Errorf("cannot parse local_address port value in socket line: %w", err) + } + + // remote_address + r := strings.Split(fields[2], ":") + if len(r) != 2 { + return nil, fmt.Errorf("cannot parse rem_address field in socket line %q", fields[1]) + } + if line.RemAddr, err = parseIP(r[0]); err != nil { + return nil, err + } + if line.RemPort, err = strconv.ParseUint(r[1], 16, 64); err != nil { + return nil, fmt.Errorf("cannot parse rem_address port value in socket line: %w", err) + } + + // st + line.St = fields[3] + + return line, nil +} diff --git a/collector/analyzer/tcpconnectanalyzer/internal/net_ip_socket_test.go b/collector/analyzer/tcpconnectanalyzer/internal/net_ip_socket_test.go new file mode 100644 index 000000000..1ef71923c --- /dev/null +++ b/collector/analyzer/tcpconnectanalyzer/internal/net_ip_socket_test.go @@ -0,0 +1,74 @@ +// Copyright 2020 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal + +import ( + "net" + "reflect" + "testing" +) + +func Test_parseNetIPSocketLine(t *testing.T) { + tests := []struct { + fields []string + name string + want *netIPSocketLine + wantErr bool + }{ + { + name: "reading valid lines, no issue should happened", + fields: []string{"11:", "00000000:0000", "00000000:0000", "0A", "00000017:0000002A", "0:0", "0", "1000", "0", "39309"}, + want: &netIPSocketLine{ + LocalAddr: net.IP{0, 0, 0, 0}, + LocalPort: 0, + RemAddr: net.IP{0, 0, 0, 0}, + RemPort: 0, + St: "0A", + }, + }, + { + name: "error case - invalid line - number of fields/columns < 10", + fields: []string{"1:", "00000000:0000", "00000000:0000", "07", "0:0", "0", "0"}, + want: nil, + wantErr: true, + }, + { + name: "error case - parse local_address - not a valid hex", + fields: []string{"1:", "0000000O:0000", "00000000:0000", "07", "00000000:00000001", "0:0", "0", "0", "0", "39309"}, + want: nil, + wantErr: true, + }, + { + name: "error case - parse rem_address - not a valid hex", + fields: []string{"1:", "00000000:0000", "0000000O:0000", "07", "00000000:00000001", "0:0", "0", "0", "0", "39309"}, + want: nil, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := parseNetIPSocketLine(tt.fields) + if (err != nil) != tt.wantErr { + t.Errorf("parseNetIPSocketLine() error = %v, wantErr %v", err, tt.wantErr) + return + } + if tt.want == nil && got != nil { + t.Errorf("parseNetIPSocketLine() = %v, want %v", got, tt.want) + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("parseNetIPSocketLine() = %#v, want %#v", got, tt.want) + } + }) + } +} diff --git a/collector/analyzer/tcpconnectanalyzer/internal/state_machine.go b/collector/analyzer/tcpconnectanalyzer/internal/state_machine.go new file mode 100644 index 000000000..9ff0f5245 --- /dev/null +++ b/collector/analyzer/tcpconnectanalyzer/internal/state_machine.go @@ -0,0 +1,67 @@ +package internal + +import ( + "fmt" +) + +type StateType string + +type StateMachine struct { + connStats *ConnectionStats + lastStateType StateType + currentStateType StateType + states StatesResource +} + +func NewStateMachine(initialState StateType, statesResource StatesResource, connStats *ConnectionStats) *StateMachine { + return &StateMachine{ + connStats: connStats, + lastStateType: initialState, + currentStateType: initialState, + states: statesResource, + } +} + +func (s *StateMachine) GetCurrentState() StateType { + return s.currentStateType +} + +func (s *StateMachine) GetLastState() StateType { + return s.lastStateType +} + +func (s *StateMachine) ReceiveEvent(event EventType, connMap map[ConnKey]*ConnectionStats) (*ConnectionStats, error) { + currentState, ok := s.states[s.currentStateType] + if !ok { + return nil, fmt.Errorf("no current state [%v]", s.currentStateType) + } + nextStateType, ok := currentState.eventsMap[event] + if !ok { + return nil, fmt.Errorf("receive not supported event [%v] in state [%v], ConnKey: [%s]", + event, s.currentStateType, s.connStats.ConnKey.String()) + } + nextState, ok := s.states[nextStateType] + if !ok { + return nil, fmt.Errorf("no next state [%v]", nextStateType) + } + // Only trigger the callback when the state changes + if nextStateType != s.currentStateType && nextState.callback != nil { + s.lastStateType = s.currentStateType + s.currentStateType = nextStateType + return nextState.callback(s.connStats, connMap), nil + } + s.lastStateType = s.currentStateType + s.currentStateType = nextStateType + return nil, nil +} + +type Callback func(connStats *ConnectionStats, connMap map[ConnKey]*ConnectionStats) *ConnectionStats + +type EventType string + +type State struct { + eventsMap map[EventType]StateType + callback Callback +} + +type StatesResource map[StateType]State diff --git a/collector/analyzer/tcpconnectanalyzer/internal/state_machine_test.go b/collector/analyzer/tcpconnectanalyzer/internal/state_machine_test.go new file mode 100644 index 000000000..d8ed290a1 --- /dev/null +++ b/collector/analyzer/tcpconnectanalyzer/internal/state_machine_test.go @@ -0,0 +1,57 @@ +package internal + +import "testing" + +func TestCallback(t *testing.T) { + connMap := make(map[ConnKey]*ConnectionStats) + connKey := ConnKey{ + SrcIP: "10.10.10.10", + SrcPort: 40040, + DstIP: "10.10.10.23", + DstPort: 80, + } + statesResource := createStatesResource() + connStats := &ConnectionStats{ + pid: 0, + ConnKey: connKey, + InitialTimestamp: 0, + EndTimestamp: 0, + Code: 0, + ConnectSyscall: nil, + TcpConnect: nil, + TcpSetState: nil, + } + connStats.StateMachine = NewStateMachine(Inprogress, statesResource, connStats) + connMap[connKey] = connStats + + stats, err := connStats.StateMachine.ReceiveEvent(tcpConnectNoError, connMap) + if err != nil { + t.Fatal(err) + } + if connStats.StateMachine.currentStateType != Inprogress { + t.Errorf("expected inprogress, got %v", connStats.StateMachine.currentStateType) + } + + stats, err = connStats.StateMachine.ReceiveEvent(tcpSetStateToEstablished, connMap) + if err != nil { + t.Fatal(err) + } + if stats == nil { + t.Errorf("expected stats emitted, but none got") + } + if connStats.StateMachine.currentStateType != Success { + t.Errorf("expected success, got %v", connStats.StateMachine.currentStateType) + } + + stats, err = connStats.StateMachine.ReceiveEvent(tcpSetStateFromEstablished, connMap) + if err != nil { + t.Fatal(err) + } + if connStats.StateMachine.currentStateType != Closed { + t.Errorf("expected closed, got %v", connStats.StateMachine.currentStateType) + } + + if len(connMap) != 0 { + t.Errorf("expected empty map, but current map is %v", connMap) + } +} diff --git a/collector/analyzer/tcpconnectanalyzer/internal/tcp_stat.go b/collector/analyzer/tcpconnectanalyzer/internal/tcp_stat.go new file mode 100644 index 000000000..32fdb3b70 --- /dev/null +++ b/collector/analyzer/tcpconnectanalyzer/internal/tcp_stat.go @@ -0,0 +1,11 @@ +package internal + +import ( + "path" + "strconv" +) + +func NewPidTcpStat(hostProc string, pid int) (NetSocketStateMap, error) { + tcpFilePath := path.Join(hostProc, strconv.Itoa(pid), "net/tcp") + return newNetIPSocket(tcpFilePath) +} diff --git a/collector/analyzer/tcpconnectanalyzer/metrics.go b/collector/analyzer/tcpconnectanalyzer/metrics.go new file mode 100644 index 000000000..1daa82e5a --- /dev/null +++ b/collector/analyzer/tcpconnectanalyzer/metrics.go @@ -0,0 +1,23 @@ +package tcpconnectanalyzer + +import ( + "context" + "sync" + + "github.com/Kindling-project/kindling/collector/analyzer/tcpconnectanalyzer/internal" + "go.opentelemetry.io/otel/metric" +) + +var once sync.Once + +const mapSizeMetric = "kindling_telemetry_tcpconnectanalyzer_map_size" + +func newSelfMetrics(meterProvider metric.MeterProvider, monitor *internal.ConnectMonitor) { + once.Do(func() { + meter := metric.Must(meterProvider.Meter("kindling")) + meter.NewInt64GaugeObserver(mapSizeMetric, + func(ctx context.Context, result metric.Int64ObserverResult) { + result.Observe(int64(monitor.GetMapSize())) + }) + }) +} diff --git a/collector/application/application.go b/collector/application/application.go index 018265b4d..860b41e59 100644 --- a/collector/application/application.go +++ b/collector/application/application.go @@ -3,9 +3,11 @@ package application import ( "flag" "fmt" + "github.com/Kindling-project/kindling/collector/analyzer" "github.com/Kindling-project/kindling/collector/analyzer/loganalyzer" "github.com/Kindling-project/kindling/collector/analyzer/network" + "github.com/Kindling-project/kindling/collector/analyzer/tcpconnectanalyzer" "github.com/Kindling-project/kindling/collector/analyzer/tcpmetricanalyzer" "github.com/Kindling-project/kindling/collector/component" "github.com/Kindling-project/kindling/collector/consumer" @@ -79,6 +81,7 @@ func (a *Application) registerFactory() { a.componentsFactory.RegisterExporter(logexporter.Type, logexporter.New, &logexporter.Config{}) a.componentsFactory.RegisterAnalyzer(loganalyzer.Type.String(), loganalyzer.New, &loganalyzer.Config{}) a.componentsFactory.RegisterProcessor(aggregateprocessor.Type, aggregateprocessor.New, aggregateprocessor.NewDefaultConfig()) + a.componentsFactory.RegisterAnalyzer(tcpconnectanalyzer.Type.String(), tcpconnectanalyzer.New, tcpconnectanalyzer.NewDefaultConfig()) } func (a *Application) readInConfig(path string) error { @@ -119,8 +122,10 @@ func (a *Application) buildPipeline() error { k8sMetadataProcessor2 := k8sProcessorFactory.NewFunc(k8sProcessorFactory.Config, a.telemetry.Telemetry, aggregateProcessorForTcp) tcpAnalyzerFactory := a.componentsFactory.Analyzers[tcpmetricanalyzer.TcpMetric.String()] tcpAnalyzer := tcpAnalyzerFactory.NewFunc(tcpAnalyzerFactory.Config, a.telemetry.Telemetry, []consumer.Consumer{k8sMetadataProcessor2}) + tcpConnectAnalyzerFactory := a.componentsFactory.Analyzers[tcpconnectanalyzer.Type.String()] + tcpConnectAnalyzer := tcpConnectAnalyzerFactory.NewFunc(tcpConnectAnalyzerFactory.Config, a.telemetry.Telemetry, []consumer.Consumer{k8sMetadataProcessor}) // Initialize receiver packaged with multiple analyzers - analyzerManager, err := analyzer.NewManager(networkAnalyzer, tcpAnalyzer) + analyzerManager, err := analyzer.NewManager(networkAnalyzer, tcpAnalyzer, tcpConnectAnalyzer) if err != nil { return fmt.Errorf("error happened while creating analyzer manager: %w", err) } diff --git a/collector/consumer/exporter/otelexporter/otelexporter.go b/collector/consumer/exporter/otelexporter/otelexporter.go index c691771d0..ae9c60b2c 100644 --- a/collector/consumer/exporter/otelexporter/otelexporter.go +++ b/collector/consumer/exporter/otelexporter/otelexporter.go @@ -4,14 +4,12 @@ import ( "context" "errors" "fmt" - - "github.com/Kindling-project/kindling/collector/component" - "github.com/Kindling-project/kindling/collector/consumer/exporter" - "github.com/Kindling-project/kindling/collector/consumer/exporter/tools/adapter" - "os" "time" + "github.com/Kindling-project/kindling/collector/component" + "github.com/Kindling-project/kindling/collector/consumer/exporter" + "github.com/Kindling-project/kindling/collector/consumer/exporter/otelexporter/defaultadapter" "github.com/Kindling-project/kindling/collector/model/constnames" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" @@ -153,7 +151,7 @@ func NewExporter(config interface{}, telemetry *component.TelemetryTools) export StorePodDetail: cfg.AdapterConfig.NeedPodDetail, StoreExternalSrcIP: cfg.AdapterConfig.StoreExternalSrcIP, }), - adapter.NewSimpleAdapter([]string{constnames.TcpMetricGroupName}, customLabels), + defaultadapter.NewSimpleAdapter([]string{constnames.TcpMetricGroupName, constnames.TcpConnectGaugeGroupName}, customLabels), }, } go func() { @@ -220,7 +218,7 @@ func NewExporter(config interface{}, telemetry *component.TelemetryTools) export StorePodDetail: cfg.AdapterConfig.NeedPodDetail, StoreExternalSrcIP: cfg.AdapterConfig.StoreExternalSrcIP, }), - adapter.NewSimpleAdapter([]string{constnames.TcpMetricGroupName}, customLabels), + defaultadapter.NewSimpleAdapter([]string{constnames.TcpMetricGroupName, , constnames.TcpConnectGaugeGroupName}, customLabels), }, } diff --git a/collector/consumer/processor/aggregateprocessor/processor.go b/collector/consumer/processor/aggregateprocessor/processor.go index d33948d4c..4c0f9ef8b 100644 --- a/collector/consumer/processor/aggregateprocessor/processor.go +++ b/collector/consumer/processor/aggregateprocessor/processor.go @@ -1,6 +1,9 @@ package aggregateprocessor import ( + "math/rand" + "time" + "github.com/Kindling-project/kindling/collector/component" "github.com/Kindling-project/kindling/collector/consumer" "github.com/Kindling-project/kindling/collector/consumer/processor" @@ -10,8 +13,6 @@ import ( "github.com/Kindling-project/kindling/collector/pkg/aggregator" "github.com/Kindling-project/kindling/collector/pkg/aggregator/defaultaggregator" "go.uber.org/zap" - "math/rand" - "time" ) const Type = "aggregateprocessor" @@ -19,6 +20,8 @@ const Type = "aggregateprocessor" var exponentialInt64Boundaries = []int64{10e6, 20e6, 50e6, 80e6, 130e6, 200e6, 300e6, 400e6, 500e6, 700e6, 1e9, 2e9, 5e9, 30e9} +var tcpConnectLabelSelectors = newTcpConnectLabelSelectors() + type AggregateProcessor struct { cfg *Config telemetry *component.TelemetryTools @@ -122,6 +125,9 @@ func (p *AggregateProcessor) Consume(dataGroup *model.DataGroup) error { case constnames.TcpMetricGroupName: p.aggregator.Aggregate(dataGroup, p.tcpLabelSelectors) return nil + case constnames.TcpConnectGaugeGroupName: + p.aggregator.Aggregate(dataGroup, tcpConnectLabelSelectors) + return nil default: p.aggregator.Aggregate(dataGroup, p.netRequestLabelSelectors) return nil @@ -197,6 +203,34 @@ func newTcpLabelSelectors() *aggregator.LabelSelectors { ) } +func newTcpConnectLabelSelectors() *aggregator.LabelSelectors { + return aggregator.NewLabelSelectors( + aggregator.LabelSelector{Name: constlabels.SrcNode, VType: aggregator.StringType}, + aggregator.LabelSelector{Name: constlabels.SrcNodeIp, VType: aggregator.StringType}, + aggregator.LabelSelector{Name: constlabels.SrcNamespace, VType: aggregator.StringType}, + aggregator.LabelSelector{Name: constlabels.SrcPod, VType: aggregator.StringType}, + aggregator.LabelSelector{Name: constlabels.SrcWorkloadName, VType: aggregator.StringType}, + aggregator.LabelSelector{Name: constlabels.SrcWorkloadKind, VType: aggregator.StringType}, + aggregator.LabelSelector{Name: constlabels.SrcService, VType: aggregator.StringType}, + aggregator.LabelSelector{Name: constlabels.SrcIp, VType: aggregator.StringType}, + aggregator.LabelSelector{Name: constlabels.SrcContainerId, VType: aggregator.StringType}, + aggregator.LabelSelector{Name: constlabels.SrcContainer, VType: aggregator.StringType}, + aggregator.LabelSelector{Name: constlabels.DstNode, VType: aggregator.StringType}, + aggregator.LabelSelector{Name: constlabels.DstNodeIp, VType: aggregator.StringType}, + aggregator.LabelSelector{Name: constlabels.DstNamespace, VType: aggregator.StringType}, + aggregator.LabelSelector{Name: constlabels.DstPod, VType: aggregator.StringType}, + aggregator.LabelSelector{Name: constlabels.DstWorkloadName, VType: aggregator.StringType}, + aggregator.LabelSelector{Name: constlabels.DstWorkloadKind, VType: aggregator.StringType}, + aggregator.LabelSelector{Name: constlabels.DstService, VType: aggregator.StringType}, + aggregator.LabelSelector{Name: constlabels.DstIp, VType: aggregator.StringType}, + aggregator.LabelSelector{Name: constlabels.DstPort, VType: aggregator.IntType}, + aggregator.LabelSelector{Name: constlabels.DstContainerId, VType: aggregator.StringType}, + aggregator.LabelSelector{Name: constlabels.DstContainer, VType: aggregator.StringType}, + aggregator.LabelSelector{Name: constlabels.Errno, VType: aggregator.IntType}, + aggregator.LabelSelector{Name: constlabels.Success, VType: aggregator.BooleanType}, + ) +} + func (p *AggregateProcessor) isSampled(dataGroup *model.DataGroup) bool { randSeed := rand.Intn(100) if isAbnormal(dataGroup) { diff --git a/collector/docker/kindling-collector-config.yml b/collector/docker/kindling-collector-config.yml index 619239ce9..098707290 100644 --- a/collector/docker/kindling-collector-config.yml +++ b/collector/docker/kindling-collector-config.yml @@ -27,7 +27,13 @@ receivers: - name: kprobe-tcp_rcv_established - name: kprobe-tcp_drop - name: kprobe-tcp_retransmit_skb + - name: syscall_exit-connect + - name: kretprobe-tcp_connect + - name: kprobe-tcp_set_state analyzers: + tcpconnectanalyzer: + channel_size: 10000 + timeout_second: 60 tcpmetricanalyzer: networkanalyzer: connect_timeout: 100 @@ -95,6 +101,10 @@ processors: - kind: sum kindling_tcp_packet_loss_total: - kind: sum + kindling_tcp_connect_total: + - kind: sum + kindling_tcp_connect_duration_nanoseconds_total: + - kind: sum sampling_rate: normal_data: 0 slow_data: 100 @@ -124,6 +134,8 @@ exporters: kindling_tcp_srtt_microseconds: gauge kindling_tcp_retransmit_total: counter kindling_tcp_packet_loss_total: counter + kindling_tcp_connect_total: counter + kindling_tcp_connect_duration_nanoseconds_total: counter # Export data in the following ways: ["prometheus", "otlp", "stdout"] # Note: configure the corresponding section to make everything ok export_kind: prometheus diff --git a/collector/model/constlabels/const.go b/collector/model/constlabels/const.go index 9fcb46c5c..107d419af 100644 --- a/collector/model/constlabels/const.go +++ b/collector/model/constlabels/const.go @@ -49,6 +49,8 @@ const ( Ip = "ip" Port = "port" + Errno = "errno" + Success = "success" RequestContent = "request_content" ResponseContent = "response_content" StatusCode = "status_code" diff --git a/collector/model/constnames/const.go b/collector/model/constnames/const.go index 225935319..899dcc883 100644 --- a/collector/model/constnames/const.go +++ b/collector/model/constnames/const.go @@ -9,11 +9,14 @@ const ( RecvFromEvent = "recvfrom" SendMsgEvent = "sendmsg" RecvMsgEvent = "recvmsg" + ConnectEvent = "connect" TcpCloseEvent = "tcp_close" TcpRcvEstablishedEvent = "tcp_rcv_established" TcpDropEvent = "tcp_drop" TcpRetransmitSkbEvent = "tcp_retransmit_skb" + TcpConnectEvent = "tcp_connect" + TcpSetStateEvent = "tcp_set_state" OtherEvent = "other" GrpcUprobeEvent = "grpc_uprobe" @@ -26,4 +29,5 @@ const ( TcpMetricGroupName = "tcp_metric_metric_group" NodeMetricGroupName = "node_metric_metric_group" + TcpConnectGaugeGroupName = "tcp_connect_gauge_group" ) diff --git a/collector/model/constnames/relabel_metric_const.go b/collector/model/constnames/relabel_metric_const.go index cf9bd3235..eb12d6a9a 100644 --- a/collector/model/constnames/relabel_metric_const.go +++ b/collector/model/constnames/relabel_metric_const.go @@ -34,6 +34,9 @@ const ( TcpRttMetricName = "kindling_tcp_srtt_microseconds" TcpRetransmitMetricName = "kindling_tcp_retransmit_total" TcpDropMetricName = "kindling_tcp_packet_loss_total" + + TcpConnectTotalMetric = "kindling_tcp_connect_total" + TcpConnectDurationMetric = "kindling_tcp_connect_duration_nanoseconds_total" ) const ( diff --git a/collector/model/kindling_event_helper.go b/collector/model/kindling_event_helper.go index 09026e704..c320de501 100644 --- a/collector/model/kindling_event_helper.go +++ b/collector/model/kindling_event_helper.go @@ -222,6 +222,18 @@ func (x *KindlingEvent) IsUdp() uint32 { return 0 } +func (x *KindlingEvent) IsTcp() bool { + context := x.GetCtx() + if context == nil { + return false + } + fd := context.GetFdInfo() + if fd == nil { + return false + } + return fd.GetProtocol() == L4Proto_TCP +} + func (x *KindlingEvent) IsConnect() bool { return x.Name == "connect" } diff --git a/collector/pkg/aggregator/label_key.go b/collector/pkg/aggregator/label_key.go index 0d26461f6..3d213e0fa 100644 --- a/collector/pkg/aggregator/label_key.go +++ b/collector/pkg/aggregator/label_key.go @@ -45,6 +45,10 @@ func (s *LabelSelectors) GetLabelKeys(labels *model.AttributeMap) *LabelKeys { return keys } +func (s *LabelSelectors) AppendSelectors(selectors ...LabelSelector) { + s.selectors = append(s.selectors, selectors...) +} + const maxLabelKeySize = 35 type LabelKeys struct { diff --git a/collector/receiver/udsreceiver/metrics.go b/collector/receiver/udsreceiver/metrics.go index a3ce748df..f727bc5bb 100644 --- a/collector/receiver/udsreceiver/metrics.go +++ b/collector/receiver/udsreceiver/metrics.go @@ -2,11 +2,12 @@ package udsreceiver import ( "context" + "sync" + "sync/atomic" + "github.com/Kindling-project/kindling/collector/model/constnames" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" - "sync" - "sync/atomic" ) var once sync.Once @@ -39,11 +40,14 @@ type stats struct { recvFrom int64 sendMsg int64 recvMsg int64 + connect int64 grpcUprobe int64 tcpClose int64 tcpRcvEstablished int64 tcpDrop int64 tcpRetransmitSkb int64 + tcpConnect int64 + tcpSetState int64 other int64 } @@ -75,6 +79,12 @@ func (i *stats) add(name string, value int64) { atomic.AddInt64(&i.tcpDrop, value) case constnames.TcpRetransmitSkbEvent: atomic.AddInt64(&i.tcpRetransmitSkb, value) + case constnames.ConnectEvent: + atomic.AddInt64(&i.connect, value) + case constnames.TcpConnectEvent: + atomic.AddInt64(&i.tcpConnect, value) + case constnames.TcpSetStateEvent: + atomic.AddInt64(&i.tcpSetState, value) default: atomic.AddInt64(&i.other, value) } @@ -95,6 +105,9 @@ func (i *stats) getStats() map[string]int64 { ret[constnames.TcpRcvEstablishedEvent] = atomic.LoadInt64(&i.tcpRcvEstablished) ret[constnames.TcpCloseEvent] = atomic.LoadInt64(&i.tcpClose) ret[constnames.TcpRetransmitSkbEvent] = atomic.LoadInt64(&i.tcpRetransmitSkb) + ret[constnames.ConnectEvent] = atomic.LoadInt64(&i.connect) + ret[constnames.TcpConnectEvent] = atomic.LoadInt64(&i.tcpConnect) + ret[constnames.TcpSetStateEvent] = atomic.LoadInt64(&i.tcpSetState) ret[constnames.OtherEvent] = atomic.LoadInt64(&i.other) return ret } diff --git a/collector/receiver/udsreceiver/udsreceiver.go b/collector/receiver/udsreceiver/udsreceiver.go index 511b819d8..f0c867b55 100644 --- a/collector/receiver/udsreceiver/udsreceiver.go +++ b/collector/receiver/udsreceiver/udsreceiver.go @@ -1,8 +1,13 @@ package udsreceiver import ( + "os" + "sync" + "time" + analyzerpackage "github.com/Kindling-project/kindling/collector/analyzer" "github.com/Kindling-project/kindling/collector/analyzer/network" + "github.com/Kindling-project/kindling/collector/analyzer/tcpconnectanalyzer" "github.com/Kindling-project/kindling/collector/analyzer/tcpmetricanalyzer" "github.com/Kindling-project/kindling/collector/component" "github.com/Kindling-project/kindling/collector/model" @@ -12,9 +17,6 @@ import ( zmq "github.com/pebbe/zmq4" "go.uber.org/zap" "go.uber.org/zap/zapcore" - "os" - "sync" - "time" ) const ( @@ -239,6 +241,12 @@ func (r *UdsReceiver) SendToNextConsumer(events *model.KindlingEventList) error fallthrough case constnames.TcpRetransmitSkbEvent: analyzer, isFound = r.analyzerManager.GetAnalyzer(tcpmetricanalyzer.TcpMetric) + case constnames.ConnectEvent: + fallthrough + case constnames.TcpConnectEvent: + fallthrough + case constnames.TcpSetStateEvent: + analyzer, isFound = r.analyzerManager.GetAnalyzer(tcpconnectanalyzer.Type) default: analyzer, isFound = r.analyzerManager.GetAnalyzer(network.Network) } diff --git a/deploy/agent/kindling-collector-config.yml b/deploy/agent/kindling-collector-config.yml index b08d2ff35..b85c428c2 100644 --- a/deploy/agent/kindling-collector-config.yml +++ b/deploy/agent/kindling-collector-config.yml @@ -27,7 +27,13 @@ receivers: - name: kprobe-tcp_rcv_established - name: kprobe-tcp_drop - name: kprobe-tcp_retransmit_skb + - name: syscall_exit-connect + - name: kretprobe-tcp_connect + - name: kprobe-tcp_set_state analyzers: + tcpconnectanalyzer: + channel_size: 10000 + timeout_second: 60 tcpmetricanalyzer: networkanalyzer: connect_timeout: 100 @@ -95,6 +101,10 @@ processors: - kind: sum kindling_tcp_packet_loss_total: - kind: sum + kindling_tcp_connect_total: + - kind: sum + kindling_tcp_connect_duration_nanoseconds_total: + - kind: sum sampling_rate: normal_data: 0 slow_data: 100 @@ -124,6 +134,8 @@ exporters: kindling_tcp_srtt_microseconds: gauge kindling_tcp_retransmit_total: counter kindling_tcp_packet_loss_total: counter + kindling_tcp_connect_total: counter + kindling_tcp_connect_duration_nanoseconds_total: counter # Export data in the following ways: ["prometheus", "otlp", "stdout"] # Note: configure the corresponding section to make everything ok export_kind: prometheus From 024c7aeaf7315986e0572cca5d2bd34989b5d043 Mon Sep 17 00:00:00 2001 From: Daxin Wang Date: Tue, 31 May 2022 19:43:50 +0800 Subject: [PATCH 02/17] Add interval checking before scanning /proc/tcp Signed-off-by: Daxin Wang --- .../analyzer/tcpconnectanalyzer/analyzer.go | 10 ++----- .../analyzer/tcpconnectanalyzer/config.go | 8 ++--- .../internal/connect_monitor.go | 29 +++++++++---------- .../docker/kindling-collector-config.yml | 2 +- deploy/agent/kindling-collector-config.yml | 2 +- 5 files changed, 23 insertions(+), 28 deletions(-) diff --git a/collector/analyzer/tcpconnectanalyzer/analyzer.go b/collector/analyzer/tcpconnectanalyzer/analyzer.go index 6b85c923d..2641d7550 100644 --- a/collector/analyzer/tcpconnectanalyzer/analyzer.go +++ b/collector/analyzer/tcpconnectanalyzer/analyzer.go @@ -59,12 +59,9 @@ func New(cfg interface{}, telemetry *component.TelemetryTools, consumers []consu // Start initializes the analyzer func (a *TcpConnectAnalyzer) Start() error { go func() { - timeoutTicker := time.NewTicker(time.Duration(a.config.TimeoutSecond/5) * time.Second) - scanTcpStateTicker := time.NewTicker(5 * time.Second) + scanTcpStateTicker := time.NewTicker(time.Duration(a.config.WaitEventSecond/3) * time.Second) for { select { - case <-timeoutTicker.C: - a.trimExpiredConnStats() case <-scanTcpStateTicker.C: a.trimConnectionsWithTcpStat() case event := <-a.eventChannel: @@ -73,7 +70,6 @@ func (a *TcpConnectAnalyzer) Start() error { // Only trim the connections expired. For those unfinished, we leave them // unchanged and just shutdown this goroutine. a.trimConnectionsWithTcpStat() - a.trimExpiredConnStats() return } } @@ -123,7 +119,7 @@ func (a *TcpConnectAnalyzer) consumeChannelEvent(event *model.KindlingEvent) { } func (a *TcpConnectAnalyzer) trimExpiredConnStats() { - connStats := a.connectMonitor.TrimExpiredConnections(a.config.TimeoutSecond) + connStats := a.connectMonitor.TrimExpiredConnections(a.config.WaitEventSecond * 3) for _, connStat := range connStats { gaugeGroup := a.generateGaugeGroup(connStat) a.passThroughConsumers(gaugeGroup) @@ -131,7 +127,7 @@ func (a *TcpConnectAnalyzer) trimExpiredConnStats() { } func (a *TcpConnectAnalyzer) trimConnectionsWithTcpStat() { - connStats := a.connectMonitor.TrimConnectionsWithTcpStat() + connStats := a.connectMonitor.TrimConnectionsWithTcpStat(a.config.WaitEventSecond) for _, connStat := range connStats { gaugeGroup := a.generateGaugeGroup(connStat) a.passThroughConsumers(gaugeGroup) diff --git a/collector/analyzer/tcpconnectanalyzer/config.go b/collector/analyzer/tcpconnectanalyzer/config.go index a9d945a21..0ac90e526 100644 --- a/collector/analyzer/tcpconnectanalyzer/config.go +++ b/collector/analyzer/tcpconnectanalyzer/config.go @@ -1,13 +1,13 @@ package tcpconnectanalyzer type Config struct { - ChannelSize int `mapstructure:"channel_size"` - TimeoutSecond int `mapstructure:"timeout_second"` + ChannelSize int `mapstructure:"channel_size"` + WaitEventSecond int `mapstructure:"wait_event_second"` } func NewDefaultConfig() *Config { return &Config{ - ChannelSize: 2000, - TimeoutSecond: 60, + ChannelSize: 2000, + WaitEventSecond: 10, } } diff --git a/collector/analyzer/tcpconnectanalyzer/internal/connect_monitor.go b/collector/analyzer/tcpconnectanalyzer/internal/connect_monitor.go index 627691079..34a0088ad 100644 --- a/collector/analyzer/tcpconnectanalyzer/internal/connect_monitor.go +++ b/collector/analyzer/tcpconnectanalyzer/internal/connect_monitor.go @@ -3,7 +3,6 @@ package internal import ( "fmt" "os" - "syscall" "time" "github.com/Kindling-project/kindling/collector/model" @@ -229,31 +228,31 @@ func (c *ConnectMonitor) TrimExpiredConnections(timeout int) []*ConnectionStats return ret } -func (c *ConnectMonitor) TrimConnectionsWithTcpStat() []*ConnectionStats { +func (c *ConnectMonitor) TrimConnectionsWithTcpStat(waitForEventSecond int) []*ConnectionStats { ret := make([]*ConnectionStats, 0, len(c.connMap)) // Only scan once for each pid pidTcpStateMap := make(map[uint32]NetSocketStateMap) + waitForEventNano := int64(waitForEventSecond) * 1000000000 for key, connStat := range c.connMap { if connStat.pid == 0 { continue } + if time.Now().UnixNano()-int64(connStat.InitialTimestamp) < waitForEventNano { + // Still waiting for other events + continue + } tcpStateMap, ok := pidTcpStateMap[connStat.pid] if !ok { tcpState, err := NewPidTcpStat(c.hostProcPath, int(connStat.pid)) if err != nil { - if err == syscall.ENOENT { - // No such file or directory, which means the process has been purged. - // We consider the connection failed to be established. - stats, err := connStat.StateMachine.ReceiveEvent(expiredEvent, c.connMap) - if err != nil { - c.logger.Warn("error happened when receiving event:", zap.Error(err)) - } - if stats != nil { - ret = append(ret, stats) - } - } else { - c.logger.Info("error happened when scanning net/tcp", - zap.Uint32("pid", connStat.pid), zap.Error(err)) + // No such file or directory, which means the process has been purged. + // We consider the connection failed to be established. + stats, err := connStat.StateMachine.ReceiveEvent(expiredEvent, c.connMap) + if err != nil { + c.logger.Warn("error happened when receiving event:", zap.Error(err)) + } + if stats != nil { + ret = append(ret, stats) } continue } diff --git a/collector/docker/kindling-collector-config.yml b/collector/docker/kindling-collector-config.yml index 098707290..3abc21bbe 100644 --- a/collector/docker/kindling-collector-config.yml +++ b/collector/docker/kindling-collector-config.yml @@ -33,7 +33,7 @@ receivers: analyzers: tcpconnectanalyzer: channel_size: 10000 - timeout_second: 60 + wait_event_second: 10 tcpmetricanalyzer: networkanalyzer: connect_timeout: 100 diff --git a/deploy/agent/kindling-collector-config.yml b/deploy/agent/kindling-collector-config.yml index b85c428c2..259968550 100644 --- a/deploy/agent/kindling-collector-config.yml +++ b/deploy/agent/kindling-collector-config.yml @@ -33,7 +33,7 @@ receivers: analyzers: tcpconnectanalyzer: channel_size: 10000 - timeout_second: 60 + wait_event_second: 10 tcpmetricanalyzer: networkanalyzer: connect_timeout: 100 From cd2fadd9e6da03d25b863e0e5585306b32023d77 Mon Sep 17 00:00:00 2001 From: Daxin Wang Date: Tue, 31 May 2022 19:49:22 +0800 Subject: [PATCH 03/17] add logs when tcp_connect has nil fields Signed-off-by: Daxin Wang --- .../internal/connect_monitor.go | 24 +++++++++++++++---- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/collector/analyzer/tcpconnectanalyzer/internal/connect_monitor.go b/collector/analyzer/tcpconnectanalyzer/internal/connect_monitor.go index 34a0088ad..846e77d27 100644 --- a/collector/analyzer/tcpconnectanalyzer/internal/connect_monitor.go +++ b/collector/analyzer/tcpconnectanalyzer/internal/connect_monitor.go @@ -311,18 +311,32 @@ func (c *ConnectMonitor) GetMapSize() int { } func getConnKeyForTcpConnect(event *model.KindlingEvent) (ConnKey, error) { + var sIpString string + var sPortUint uint64 + var dIpString string + var dPortUint uint64 sIp := event.GetUserAttribute("sip") + if sIp != nil { + sIpString = model.IPLong2String(uint32(sIp.GetUintValue())) + } sPort := event.GetUserAttribute("sport") + if sPort != nil { + sPortUint = sPort.GetUintValue() + } dIp := event.GetUserAttribute("dip") + if dIp != nil { + dIpString = model.IPLong2String(uint32(dIp.GetUintValue())) + } dPort := event.GetUserAttribute("dport") + if dPort != nil { + dPortUint = dPort.GetUintValue() + } if sIp == nil || sPort == nil || dIp == nil || dPort == nil { - return ConnKey{}, fmt.Errorf("one of sip or dip or dport is nil for event %s", event.Name) + return ConnKey{}, fmt.Errorf("some fields are nil for event %s. srcIp=%v, srcPort=%v, "+ + "dstIp=%v, dstPort=%v", event.Name, sIpString, sPortUint, dIpString, dPortUint) } - sIpString := model.IPLong2String(uint32(sIp.GetUintValue())) - sPortUint := sPort.GetUintValue() - dIpString := model.IPLong2String(uint32(dIp.GetUintValue())) - dPortUint := dPort.GetUintValue() + connKey := ConnKey{ SrcIP: sIpString, SrcPort: uint32(sPortUint), From 2932f3d40b6c14536c37b50febd237210d459a80 Mon Sep 17 00:00:00 2001 From: Daxin Wang Date: Wed, 1 Jun 2022 10:02:15 +0800 Subject: [PATCH 04/17] add log of the error when scanning tcp states Signed-off-by: Daxin Wang --- .../analyzer/tcpconnectanalyzer/internal/connect_monitor.go | 1 + 1 file changed, 1 insertion(+) diff --git a/collector/analyzer/tcpconnectanalyzer/internal/connect_monitor.go b/collector/analyzer/tcpconnectanalyzer/internal/connect_monitor.go index 846e77d27..420aa6c6f 100644 --- a/collector/analyzer/tcpconnectanalyzer/internal/connect_monitor.go +++ b/collector/analyzer/tcpconnectanalyzer/internal/connect_monitor.go @@ -245,6 +245,7 @@ func (c *ConnectMonitor) TrimConnectionsWithTcpStat(waitForEventSecond int) []*C if !ok { tcpState, err := NewPidTcpStat(c.hostProcPath, int(connStat.pid)) if err != nil { + c.logger.Debug("error happened when scanning tcp state:", zap.Error(err)) // No such file or directory, which means the process has been purged. // We consider the connection failed to be established. stats, err := connStat.StateMachine.ReceiveEvent(expiredEvent, c.connMap) From 58c49e2b1ca1961b18bd01d83eb03becd7688fe2 Mon Sep 17 00:00:00 2001 From: Daxin Wang Date: Wed, 1 Jun 2022 10:06:38 +0800 Subject: [PATCH 05/17] add log of the error when scanning tcp states Signed-off-by: Daxin Wang --- .../analyzer/tcpconnectanalyzer/internal/connect_monitor.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/collector/analyzer/tcpconnectanalyzer/internal/connect_monitor.go b/collector/analyzer/tcpconnectanalyzer/internal/connect_monitor.go index 420aa6c6f..abae23557 100644 --- a/collector/analyzer/tcpconnectanalyzer/internal/connect_monitor.go +++ b/collector/analyzer/tcpconnectanalyzer/internal/connect_monitor.go @@ -245,7 +245,8 @@ func (c *ConnectMonitor) TrimConnectionsWithTcpStat(waitForEventSecond int) []*C if !ok { tcpState, err := NewPidTcpStat(c.hostProcPath, int(connStat.pid)) if err != nil { - c.logger.Debug("error happened when scanning tcp state:", zap.Error(err)) + c.logger.Debug("error happened when scanning net/tcp", + zap.Uint32("pid", connStat.pid), zap.Error(err)) // No such file or directory, which means the process has been purged. // We consider the connection failed to be established. stats, err := connStat.StateMachine.ReceiveEvent(expiredEvent, c.connMap) From ad56b18fed1d1c2226d9c5f5e81954eb5123d91b Mon Sep 17 00:00:00 2001 From: Daxin Wang Date: Wed, 1 Jun 2022 17:56:21 +0800 Subject: [PATCH 06/17] add sendRequestEvent to help transmit the state Signed-off-by: Daxin Wang --- .../analyzer/tcpconnectanalyzer/analyzer.go | 90 ++++++++++++------- .../internal/connect_monitor.go | 81 ++++++++++------- .../internal/connection_stats.go | 17 +++- .../exporter/otelexporter/otelexporter.go | 6 +- .../processor/aggregateprocessor/processor.go | 2 +- collector/model/constnames/const.go | 6 +- 6 files changed, 130 insertions(+), 72 deletions(-) diff --git a/collector/analyzer/tcpconnectanalyzer/analyzer.go b/collector/analyzer/tcpconnectanalyzer/analyzer.go index 2641d7550..435d1ad44 100644 --- a/collector/analyzer/tcpconnectanalyzer/analyzer.go +++ b/collector/analyzer/tcpconnectanalyzer/analyzer.go @@ -17,12 +17,6 @@ import ( const Type analyzer.Type = "tcpconnectanalyzer" -var consumableEvents = map[string]bool{ - constnames.ConnectEvent: true, - constnames.TcpConnectEvent: true, - constnames.TcpSetStateEvent: true, -} - type TcpConnectAnalyzer struct { config *Config nextConsumers []consumer.Consumer @@ -56,6 +50,18 @@ func New(cfg interface{}, telemetry *component.TelemetryTools, consumers []consu return ret } +func (a *TcpConnectAnalyzer) ConsumableEvents() []string { + return []string{ + constnames.ConnectEvent, + constnames.TcpConnectEvent, + constnames.TcpSetStateEvent, + constnames.WriteEvent, + constnames.WritevEvent, + constnames.SendMsgEvent, + constnames.SendToEvent, + } +} + // Start initializes the analyzer func (a *TcpConnectAnalyzer) Start() error { go func() { @@ -79,10 +85,6 @@ func (a *TcpConnectAnalyzer) Start() error { // ConsumeEvent gets the event from the previous component func (a *TcpConnectAnalyzer) ConsumeEvent(event *model.KindlingEvent) error { - eventName := event.Name - if ok := consumableEvents[eventName]; !ok { - return nil - } a.eventChannel <- event return nil } @@ -103,6 +105,17 @@ func (a *TcpConnectAnalyzer) consumeChannelEvent(event *model.KindlingEvent) { connectStats, err = a.connectMonitor.ReadInTcpConnect(event) case constnames.TcpSetStateEvent: connectStats, err = a.connectMonitor.ReadInTcpSetState(event) + case constnames.WriteEvent: + fallthrough + case constnames.WritevEvent: + fallthrough + case constnames.SendToEvent: + fallthrough + case constnames.SendMsgEvent: + if filterRequestEvent(event) { + return + } + connectStats, err = a.connectMonitor.ReadSendRequestSyscall(event) } if err != nil { @@ -114,30 +127,53 @@ func (a *TcpConnectAnalyzer) consumeChannelEvent(event *model.KindlingEvent) { return } - gaugeGroup := a.generateGaugeGroup(connectStats) - a.passThroughConsumers(gaugeGroup) + dataGroup := a.generateMetricGroup(connectStats) + a.passThroughConsumers(dataGroup) +} + +func filterRequestEvent(event *model.KindlingEvent) bool { + if event.Category != model.Category_CAT_NET { + return true + } + + ctx := event.GetCtx() + if ctx == nil || ctx.GetThreadInfo() == nil { + return true + } + fd := ctx.GetFdInfo() + if fd == nil { + return true + } + if fd.GetProtocol() != model.L4Proto_TCP { + return true + } + if fd.GetSip() == nil || fd.GetDip() == nil { + return true + } + + return false } func (a *TcpConnectAnalyzer) trimExpiredConnStats() { connStats := a.connectMonitor.TrimExpiredConnections(a.config.WaitEventSecond * 3) for _, connStat := range connStats { - gaugeGroup := a.generateGaugeGroup(connStat) - a.passThroughConsumers(gaugeGroup) + dataGroup := a.generateMetricGroup(connStat) + a.passThroughConsumers(dataGroup) } } func (a *TcpConnectAnalyzer) trimConnectionsWithTcpStat() { connStats := a.connectMonitor.TrimConnectionsWithTcpStat(a.config.WaitEventSecond) for _, connStat := range connStats { - gaugeGroup := a.generateGaugeGroup(connStat) - a.passThroughConsumers(gaugeGroup) + dataGroup := a.generateMetricGroup(connStat) + a.passThroughConsumers(dataGroup) } } -func (a *TcpConnectAnalyzer) passThroughConsumers(gaugeGroup *model.GaugeGroup) { +func (a *TcpConnectAnalyzer) passThroughConsumers(dataGroup *model.DataGroup) { var retError error for _, nextConsumer := range a.nextConsumers { - err := nextConsumer.Consume(gaugeGroup) + err := nextConsumer.Consume(dataGroup) if err != nil { retError = multierror.Append(retError, err) } @@ -147,7 +183,7 @@ func (a *TcpConnectAnalyzer) passThroughConsumers(gaugeGroup *model.GaugeGroup) } } -func (a *TcpConnectAnalyzer) generateGaugeGroup(connectStats *internal.ConnectionStats) *model.GaugeGroup { +func (a *TcpConnectAnalyzer) generateMetricGroup(connectStats *internal.ConnectionStats) *model.DataGroup { labels := model.NewAttributeMap() // The connect events always come from the client-side labels.AddBoolValue(constlabels.IsServer, false) @@ -179,22 +215,16 @@ func (a *TcpConnectAnalyzer) generateGaugeGroup(connectStats *internal.Connectio labels.AddStringValue(constlabels.DnatIp, dNatIp) labels.AddIntValue(constlabels.DnatPort, dNatPort) - countValue := &model.Gauge{ - Name: constnames.TcpConnectTotalMetric, - Value: 1, - } - durationValue := &model.Gauge{ - Name: constnames.TcpConnectDurationMetric, - Value: connectStats.GetConnectDuration(), - } + countValue := model.NewIntMetric(constnames.TcpConnectTotalMetric, 1) + durationValue := model.NewIntMetric(constnames.TcpConnectDurationMetric, connectStats.GetConnectDuration()) - retGaugeGroup := model.NewGaugeGroup( - constnames.TcpConnectGaugeGroupName, + retDataGroup := model.NewDataGroup( + constnames.TcpConnectMetricGroupName, labels, connectStats.EndTimestamp, countValue, durationValue) - return retGaugeGroup + return retDataGroup } func (a *TcpConnectAnalyzer) findDNatTuple(sIp string, sPort uint64, dIp string, dPort uint64) (string, int64) { diff --git a/collector/analyzer/tcpconnectanalyzer/internal/connect_monitor.go b/collector/analyzer/tcpconnectanalyzer/internal/connect_monitor.go index abae23557..876f826c2 100644 --- a/collector/analyzer/tcpconnectanalyzer/internal/connect_monitor.go +++ b/collector/analyzer/tcpconnectanalyzer/internal/connect_monitor.go @@ -60,22 +60,44 @@ func (c *ConnectMonitor) ReadInConnectExitSyscall(event *model.KindlingEvent) (* // Maybe the connStats have been closed by tcp_set_state_from_established event. // We don't care about it. return nil, nil + } + // "connect_exit" comes to analyzer after "tcp_connect" + connStats.EndTimestamp = event.Timestamp + connStats.ConnectSyscall = event + connStats.pid = event.GetPid() + var eventType EventType + if retValueInt == 0 { + eventType = connectExitSyscallSuccess + } else if isNotErrorReturnCode(connCode(retValueInt)) { + eventType = connectExitSyscallNotConcern } else { - // "connect_exit" comes to analyzer after "tcp_connect" - connStats.EndTimestamp = event.Timestamp - connStats.ConnectSyscall = event - connStats.pid = event.GetPid() - var eventType EventType - if retValueInt == 0 { - eventType = connectExitSyscallSuccess - } else if isNotErrorReturnCode(connCode(retValueInt)) { - eventType = connectExitSyscallNotConcern - } else { - eventType = connectExitSyscallFailure - connStats.Code = connCode(retValueInt) - } - return connStats.StateMachine.ReceiveEvent(eventType, c.connMap) + eventType = connectExitSyscallFailure + connStats.Code = connCode(retValueInt) } + return connStats.StateMachine.ReceiveEvent(eventType, c.connMap) +} + +func (c *ConnectMonitor) ReadSendRequestSyscall(event *model.KindlingEvent) (*ConnectionStats, error) { + // The events without sip/sport/dip/dport have been filtered outside this method. + connKey := ConnKey{ + SrcIP: event.GetSip(), + SrcPort: event.GetSport(), + DstIP: event.GetDip(), + DstPort: event.GetDport(), + } + if ce := c.logger.Check(zapcore.DebugLevel, "Receive sendRequestSyscall event:"); ce != nil { + ce.Write( + zap.String("ConnKey", connKey.String()), + zap.String("eventName", event.Name), + ) + } + + connStats, ok := c.connMap[connKey] + if !ok { + return nil, nil + } + connStats.pid = event.GetPid() + return connStats.StateMachine.ReceiveEvent(sendRequestSyscall, c.connMap) } func isNotErrorReturnCode(code connCode) bool { @@ -170,14 +192,13 @@ func (c *ConnectMonitor) readInTcpSetStateToEstablished(connKey ConnKey, event * // This is the events from server-side. c.logger.Debug("No tcp_connect or connect_exit, but receive tcp_set_state_to_established") return nil, nil - } else { - connStats.TcpSetState = event - connStats.EndTimestamp = event.Timestamp - if connStats.TcpConnect == nil { - c.logger.Debug("No tcp_connect event, but receive tcp_set_state_to_established") - } - return connStats.StateMachine.ReceiveEvent(tcpSetStateToEstablished, c.connMap) } + connStats.TcpSetState = event + connStats.EndTimestamp = event.Timestamp + if connStats.TcpConnect == nil { + c.logger.Debug("No tcp_connect event, but receive tcp_set_state_to_established") + } + return connStats.StateMachine.ReceiveEvent(tcpSetStateToEstablished, c.connMap) } func (c *ConnectMonitor) readInTcpSetStateFromEstablished(connKey ConnKey, event *model.KindlingEvent) (*ConnectionStats, error) { @@ -190,17 +211,15 @@ func (c *ConnectMonitor) readInTcpSetStateFromEstablished(connKey ConnKey, event if !ok { // Connection has been established and the connStats have been emitted. return nil, nil - } else { - connStats.TcpSetState = event - connStats.EndTimestamp = event.Timestamp - // There should be multiple transmission happened. - if connStats.StateMachine.currentStateType == Inprogress { - stats, err := connStats.StateMachine.ReceiveEvent(tcpSetStateFromEstablished, c.connMap) - _, _ = connStats.StateMachine.ReceiveEvent(tcpSetStateFromEstablished, c.connMap) - return stats, err - } - return connStats.StateMachine.ReceiveEvent(tcpSetStateFromEstablished, c.connMap) } + connStats.TcpSetState = event + // There should be multiple transmission happened. + if connStats.StateMachine.currentStateType == Inprogress { + stats, err := connStats.StateMachine.ReceiveEvent(tcpSetStateFromEstablished, c.connMap) + _, _ = connStats.StateMachine.ReceiveEvent(tcpSetStateFromEstablished, c.connMap) + return stats, err + } + return connStats.StateMachine.ReceiveEvent(tcpSetStateFromEstablished, c.connMap) } // TrimExpiredConnections traverses the map, remove the expired entries based on timeout, diff --git a/collector/analyzer/tcpconnectanalyzer/internal/connection_stats.go b/collector/analyzer/tcpconnectanalyzer/internal/connection_stats.go index e582e5aa7..c37ddb639 100644 --- a/collector/analyzer/tcpconnectanalyzer/internal/connection_stats.go +++ b/collector/analyzer/tcpconnectanalyzer/internal/connection_stats.go @@ -82,15 +82,21 @@ const ( connectExitSyscallFailure EventType = "connect_exit_syscall_failure" connectExitSyscallNotConcern EventType = "connect_exit_syscall_not_concern" expiredEvent EventType = "expired_event" + sendRequestSyscall EventType = "send_request_syscall" ) func createStatesResource() StatesResource { return StatesResource{ Inprogress: State{ eventsMap: map[EventType]StateType{ - tcpConnectNoError: Inprogress, - tcpConnectError: Failure, - tcpSetStateToEstablished: Success, + tcpConnectNoError: Inprogress, + tcpConnectError: Failure, + tcpSetStateToEstablished: Success, + // Sometimes tcpSetStateToEstablished and tcpSetStateFromEstablished are both missing, + // so sendRequestSyscall is used to mark the state as Success from Inprogress. + sendRequestSyscall: Success, + // Sometimes tcpSetStateToEstablished is missing and sendRequestSyscall is not triggered, + // so tcpSetStateFromEstablished is used to mark the state as Success from Inprogress. tcpSetStateFromEstablished: Success, connectExitSyscallSuccess: Success, connectExitSyscallFailure: Failure, @@ -102,10 +108,13 @@ func createStatesResource() StatesResource { Success: { eventsMap: map[EventType]StateType{ tcpSetStateToEstablished: Success, + sendRequestSyscall: Success, tcpSetStateFromEstablished: Closed, connectExitSyscallSuccess: Success, connectExitSyscallNotConcern: Success, - expiredEvent: Closed, + // Sometimes tcpSetStateFromEstablished is missing, so expiredEvent is used to + // close the connection. + expiredEvent: Closed, }, callback: func(connStats *ConnectionStats, connMap map[ConnKey]*ConnectionStats) *ConnectionStats { return connStats diff --git a/collector/consumer/exporter/otelexporter/otelexporter.go b/collector/consumer/exporter/otelexporter/otelexporter.go index ae9c60b2c..e3c9d2d20 100644 --- a/collector/consumer/exporter/otelexporter/otelexporter.go +++ b/collector/consumer/exporter/otelexporter/otelexporter.go @@ -9,7 +9,7 @@ import ( "github.com/Kindling-project/kindling/collector/component" "github.com/Kindling-project/kindling/collector/consumer/exporter" - "github.com/Kindling-project/kindling/collector/consumer/exporter/otelexporter/defaultadapter" + "github.com/Kindling-project/kindling/collector/consumer/exporter/tools/adapter" "github.com/Kindling-project/kindling/collector/model/constnames" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" @@ -151,7 +151,7 @@ func NewExporter(config interface{}, telemetry *component.TelemetryTools) export StorePodDetail: cfg.AdapterConfig.NeedPodDetail, StoreExternalSrcIP: cfg.AdapterConfig.StoreExternalSrcIP, }), - defaultadapter.NewSimpleAdapter([]string{constnames.TcpMetricGroupName, constnames.TcpConnectGaugeGroupName}, customLabels), + adapter.NewSimpleAdapter([]string{constnames.TcpMetricGroupName, constnames.TcpConnectMetricGroupName}, customLabels), }, } go func() { @@ -218,7 +218,7 @@ func NewExporter(config interface{}, telemetry *component.TelemetryTools) export StorePodDetail: cfg.AdapterConfig.NeedPodDetail, StoreExternalSrcIP: cfg.AdapterConfig.StoreExternalSrcIP, }), - defaultadapter.NewSimpleAdapter([]string{constnames.TcpMetricGroupName, , constnames.TcpConnectGaugeGroupName}, customLabels), + adapter.NewSimpleAdapter([]string{constnames.TcpMetricGroupName, constnames.TcpConnectMetricGroupName}, customLabels), }, } diff --git a/collector/consumer/processor/aggregateprocessor/processor.go b/collector/consumer/processor/aggregateprocessor/processor.go index 4c0f9ef8b..67892290b 100644 --- a/collector/consumer/processor/aggregateprocessor/processor.go +++ b/collector/consumer/processor/aggregateprocessor/processor.go @@ -125,7 +125,7 @@ func (p *AggregateProcessor) Consume(dataGroup *model.DataGroup) error { case constnames.TcpMetricGroupName: p.aggregator.Aggregate(dataGroup, p.tcpLabelSelectors) return nil - case constnames.TcpConnectGaugeGroupName: + case constnames.TcpConnectMetricGroupName: p.aggregator.Aggregate(dataGroup, tcpConnectLabelSelectors) return nil default: diff --git a/collector/model/constnames/const.go b/collector/model/constnames/const.go index 899dcc883..4c0cbee5a 100644 --- a/collector/model/constnames/const.go +++ b/collector/model/constnames/const.go @@ -27,7 +27,7 @@ const ( // AggregatedNetRequestMetricGroup stands for the dataGroup after aggregation. AggregatedNetRequestMetricGroup = "aggregated_net_request_metric_group" - TcpMetricGroupName = "tcp_metric_metric_group" - NodeMetricGroupName = "node_metric_metric_group" - TcpConnectGaugeGroupName = "tcp_connect_gauge_group" + TcpMetricGroupName = "tcp_metric_metric_group" + NodeMetricGroupName = "node_metric_metric_group" + TcpConnectMetricGroupName = "tcp_connect_metric_group" ) From 79b5195b7482f1bf3139c7d937c160a12778500a Mon Sep 17 00:00:00 2001 From: Daxin Wang Date: Wed, 1 Jun 2022 18:54:47 +0800 Subject: [PATCH 07/17] add container_id to connStats Signed-off-by: Daxin Wang --- collector/analyzer/tcpconnectanalyzer/analyzer.go | 4 +--- .../analyzer/tcpconnectanalyzer/internal/connect_monitor.go | 2 ++ .../analyzer/tcpconnectanalyzer/internal/connection_stats.go | 1 + 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/collector/analyzer/tcpconnectanalyzer/analyzer.go b/collector/analyzer/tcpconnectanalyzer/analyzer.go index 435d1ad44..0ea0aa5a3 100644 --- a/collector/analyzer/tcpconnectanalyzer/analyzer.go +++ b/collector/analyzer/tcpconnectanalyzer/analyzer.go @@ -187,9 +187,7 @@ func (a *TcpConnectAnalyzer) generateMetricGroup(connectStats *internal.Connecti labels := model.NewAttributeMap() // The connect events always come from the client-side labels.AddBoolValue(constlabels.IsServer, false) - if connectStats.ConnectSyscall != nil { - labels.AddStringValue(constlabels.ContainerId, connectStats.ConnectSyscall.GetContainerId()) - } + labels.AddStringValue(constlabels.ContainerId, connectStats.ContainerId) labels.AddIntValue(constlabels.Errno, int64(connectStats.Code)) if connectStats.StateMachine.GetCurrentState() == internal.Closed { lastState := connectStats.StateMachine.GetLastState() diff --git a/collector/analyzer/tcpconnectanalyzer/internal/connect_monitor.go b/collector/analyzer/tcpconnectanalyzer/internal/connect_monitor.go index 876f826c2..22eda1e88 100644 --- a/collector/analyzer/tcpconnectanalyzer/internal/connect_monitor.go +++ b/collector/analyzer/tcpconnectanalyzer/internal/connect_monitor.go @@ -65,6 +65,7 @@ func (c *ConnectMonitor) ReadInConnectExitSyscall(event *model.KindlingEvent) (* connStats.EndTimestamp = event.Timestamp connStats.ConnectSyscall = event connStats.pid = event.GetPid() + connStats.ContainerId = event.GetContainerId() var eventType EventType if retValueInt == 0 { eventType = connectExitSyscallSuccess @@ -97,6 +98,7 @@ func (c *ConnectMonitor) ReadSendRequestSyscall(event *model.KindlingEvent) (*Co return nil, nil } connStats.pid = event.GetPid() + connStats.ContainerId = event.GetContainerId() return connStats.StateMachine.ReceiveEvent(sendRequestSyscall, c.connMap) } diff --git a/collector/analyzer/tcpconnectanalyzer/internal/connection_stats.go b/collector/analyzer/tcpconnectanalyzer/internal/connection_stats.go index c37ddb639..a7d4961b4 100644 --- a/collector/analyzer/tcpconnectanalyzer/internal/connection_stats.go +++ b/collector/analyzer/tcpconnectanalyzer/internal/connection_stats.go @@ -33,6 +33,7 @@ const ( type ConnectionStats struct { pid uint32 + ContainerId string ConnKey ConnKey StateMachine *StateMachine InitialTimestamp uint64 From aaecc09a77636f34b5b646d56d2b3b6e71e91154 Mon Sep 17 00:00:00 2001 From: Daxin Wang Date: Wed, 1 Jun 2022 19:01:48 +0800 Subject: [PATCH 08/17] remove the connStats from the map when the state is success or failure Signed-off-by: Daxin Wang --- .../internal/connect_monitor.go | 16 +++---------- .../internal/connection_stats.go | 24 +++---------------- 2 files changed, 6 insertions(+), 34 deletions(-) diff --git a/collector/analyzer/tcpconnectanalyzer/internal/connect_monitor.go b/collector/analyzer/tcpconnectanalyzer/internal/connect_monitor.go index 22eda1e88..65b5f9f42 100644 --- a/collector/analyzer/tcpconnectanalyzer/internal/connect_monitor.go +++ b/collector/analyzer/tcpconnectanalyzer/internal/connect_monitor.go @@ -146,7 +146,7 @@ func (c *ConnectMonitor) ReadInTcpConnect(event *model.KindlingEvent) (*Connecti connStats.StateMachine = NewStateMachine(Inprogress, c.statesResource, connStats) c.connMap[connKey] = connStats } else { - // "tcp_connect" comes to analyzer after "connect_exit" + // Not possible to enter this branch connStats.TcpConnect = event connStats.EndTimestamp = event.Timestamp connStats.Code = connCode(retValueInt) @@ -215,12 +215,6 @@ func (c *ConnectMonitor) readInTcpSetStateFromEstablished(connKey ConnKey, event return nil, nil } connStats.TcpSetState = event - // There should be multiple transmission happened. - if connStats.StateMachine.currentStateType == Inprogress { - stats, err := connStats.StateMachine.ReceiveEvent(tcpSetStateFromEstablished, c.connMap) - _, _ = connStats.StateMachine.ReceiveEvent(tcpSetStateFromEstablished, c.connMap) - return stats, err - } return connStats.StateMachine.ReceiveEvent(tcpSetStateFromEstablished, c.connMap) } @@ -309,9 +303,8 @@ func (c *ConnectMonitor) TrimConnectionsWithTcpStat(waitForEventSecond int) []*C } else if state == synSent || state == synRecv { continue } else { - // These states are behind the ESTABLISHED state. As we believe that - // tcp_set_state event that is from established to other states always comes, - // the codes should not run into this branch. + // These states are behind the ESTABLISHED state. + // The codes could run into this branch if tcpSetStateToEstablished not received. c.logger.Debug("See sockets whose state is behind ESTABLISHED, which means no "+ "tcp_set_state_from_established received.", zap.String("state", state)) stats, err := connStat.StateMachine.ReceiveEvent(tcpSetStateFromEstablished, c.connMap) @@ -321,9 +314,6 @@ func (c *ConnectMonitor) TrimConnectionsWithTcpStat(waitForEventSecond int) []*C if stats != nil { ret = append(ret, stats) } - if connStat.StateMachine.currentStateType == Success { - _, _ = connStat.StateMachine.ReceiveEvent(tcpSetStateFromEstablished, c.connMap) - } } } return ret diff --git a/collector/analyzer/tcpconnectanalyzer/internal/connection_stats.go b/collector/analyzer/tcpconnectanalyzer/internal/connection_stats.go index a7d4961b4..3036820c6 100644 --- a/collector/analyzer/tcpconnectanalyzer/internal/connection_stats.go +++ b/collector/analyzer/tcpconnectanalyzer/internal/connection_stats.go @@ -107,35 +107,17 @@ func createStatesResource() StatesResource { callback: nil, }, Success: { - eventsMap: map[EventType]StateType{ - tcpSetStateToEstablished: Success, - sendRequestSyscall: Success, - tcpSetStateFromEstablished: Closed, - connectExitSyscallSuccess: Success, - connectExitSyscallNotConcern: Success, - // Sometimes tcpSetStateFromEstablished is missing, so expiredEvent is used to - // close the connection. - expiredEvent: Closed, - }, + eventsMap: map[EventType]StateType{}, callback: func(connStats *ConnectionStats, connMap map[ConnKey]*ConnectionStats) *ConnectionStats { + delete(connMap, connStats.ConnKey) return connStats }, }, Failure: { - eventsMap: map[EventType]StateType{ - connectExitSyscallFailure: Failure, - connectExitSyscallNotConcern: Failure, - expiredEvent: Closed, - }, - callback: func(connStats *ConnectionStats, connMap map[ConnKey]*ConnectionStats) *ConnectionStats { - return connStats - }, - }, - Closed: { eventsMap: map[EventType]StateType{}, callback: func(connStats *ConnectionStats, connMap map[ConnKey]*ConnectionStats) *ConnectionStats { delete(connMap, connStats.ConnKey) - return nil + return connStats }, }, } From 92a6a733ab2b3b8dfdb01583fb6a3d1947c2e4b3 Mon Sep 17 00:00:00 2001 From: Daxin Wang Date: Wed, 1 Jun 2022 19:07:44 +0800 Subject: [PATCH 09/17] remove unnecessary type variables Signed-off-by: Daxin Wang --- .../internal/connect_monitor.go | 22 +++++-------------- .../internal/connection_stats.go | 11 +--------- .../internal/state_machine_test.go | 3 --- 3 files changed, 7 insertions(+), 29 deletions(-) diff --git a/collector/analyzer/tcpconnectanalyzer/internal/connect_monitor.go b/collector/analyzer/tcpconnectanalyzer/internal/connect_monitor.go index 65b5f9f42..c8c6120a0 100644 --- a/collector/analyzer/tcpconnectanalyzer/internal/connect_monitor.go +++ b/collector/analyzer/tcpconnectanalyzer/internal/connect_monitor.go @@ -63,17 +63,16 @@ func (c *ConnectMonitor) ReadInConnectExitSyscall(event *model.KindlingEvent) (* } // "connect_exit" comes to analyzer after "tcp_connect" connStats.EndTimestamp = event.Timestamp - connStats.ConnectSyscall = event connStats.pid = event.GetPid() connStats.ContainerId = event.GetContainerId() var eventType EventType if retValueInt == 0 { eventType = connectExitSyscallSuccess - } else if isNotErrorReturnCode(connCode(retValueInt)) { + } else if isNotErrorReturnCode(retValueInt) { eventType = connectExitSyscallNotConcern } else { eventType = connectExitSyscallFailure - connStats.Code = connCode(retValueInt) + connStats.Code = int(retValueInt) } return connStats.StateMachine.ReceiveEvent(eventType, c.connMap) } @@ -102,7 +101,7 @@ func (c *ConnectMonitor) ReadSendRequestSyscall(event *model.KindlingEvent) (*Co return connStats.StateMachine.ReceiveEvent(sendRequestSyscall, c.connMap) } -func isNotErrorReturnCode(code connCode) bool { +func isNotErrorReturnCode(code int64) bool { return code == einprogress || code == eintr || code == eisconn || code == ealready } @@ -138,18 +137,14 @@ func (c *ConnectMonitor) ReadInTcpConnect(event *model.KindlingEvent) (*Connecti ConnKey: connKey, InitialTimestamp: event.Timestamp, EndTimestamp: event.Timestamp, - Code: connCode(retValueInt), - ConnectSyscall: nil, - TcpConnect: event, - TcpSetState: nil, + Code: int(retValueInt), } connStats.StateMachine = NewStateMachine(Inprogress, c.statesResource, connStats) c.connMap[connKey] = connStats } else { // Not possible to enter this branch - connStats.TcpConnect = event connStats.EndTimestamp = event.Timestamp - connStats.Code = connCode(retValueInt) + connStats.Code = int(retValueInt) } return connStats.StateMachine.ReceiveEvent(eventType, c.connMap) } @@ -192,14 +187,10 @@ func (c *ConnectMonitor) readInTcpSetStateToEstablished(connKey ConnKey, event * if !ok { // No tcp_connect or connect_exit received. // This is the events from server-side. - c.logger.Debug("No tcp_connect or connect_exit, but receive tcp_set_state_to_established") + c.logger.Debug("No tcp_connect received, but receive tcp_set_state_to_established") return nil, nil } - connStats.TcpSetState = event connStats.EndTimestamp = event.Timestamp - if connStats.TcpConnect == nil { - c.logger.Debug("No tcp_connect event, but receive tcp_set_state_to_established") - } return connStats.StateMachine.ReceiveEvent(tcpSetStateToEstablished, c.connMap) } @@ -214,7 +205,6 @@ func (c *ConnectMonitor) readInTcpSetStateFromEstablished(connKey ConnKey, event // Connection has been established and the connStats have been emitted. return nil, nil } - connStats.TcpSetState = event return connStats.StateMachine.ReceiveEvent(tcpSetStateFromEstablished, c.connMap) } diff --git a/collector/analyzer/tcpconnectanalyzer/internal/connection_stats.go b/collector/analyzer/tcpconnectanalyzer/internal/connection_stats.go index 3036820c6..f62e48f26 100644 --- a/collector/analyzer/tcpconnectanalyzer/internal/connection_stats.go +++ b/collector/analyzer/tcpconnectanalyzer/internal/connection_stats.go @@ -2,14 +2,9 @@ package internal import ( "fmt" - - "github.com/Kindling-project/kindling/collector/model" ) -type connCode int - const ( - noError connCode = iota // See in Linux einprogress = -115 ealready = -114 @@ -38,11 +33,7 @@ type ConnectionStats struct { StateMachine *StateMachine InitialTimestamp uint64 EndTimestamp uint64 - Code connCode - - ConnectSyscall *model.KindlingEvent - TcpConnect *model.KindlingEvent - TcpSetState *model.KindlingEvent + Code int } func (c *ConnectionStats) GetConnectDuration() int64 { diff --git a/collector/analyzer/tcpconnectanalyzer/internal/state_machine_test.go b/collector/analyzer/tcpconnectanalyzer/internal/state_machine_test.go index d8ed290a1..8ee4cec24 100644 --- a/collector/analyzer/tcpconnectanalyzer/internal/state_machine_test.go +++ b/collector/analyzer/tcpconnectanalyzer/internal/state_machine_test.go @@ -17,9 +17,6 @@ func TestCallback(t *testing.T) { InitialTimestamp: 0, EndTimestamp: 0, Code: 0, - ConnectSyscall: nil, - TcpConnect: nil, - TcpSetState: nil, } connStats.StateMachine = NewStateMachine(Inprogress, statesResource, connStats) connMap[connKey] = connStats From 6ef4295135ecbb03620863590cd42a3ab2d82319 Mon Sep 17 00:00:00 2001 From: Daxin Wang Date: Wed, 1 Jun 2022 19:18:19 +0800 Subject: [PATCH 10/17] refactor the codes of generating dataGroup Signed-off-by: Daxin Wang --- .../analyzer/tcpconnectanalyzer/analyzer.go | 49 ++++++++----------- .../internal/connect_monitor.go | 26 +--------- 2 files changed, 21 insertions(+), 54 deletions(-) diff --git a/collector/analyzer/tcpconnectanalyzer/analyzer.go b/collector/analyzer/tcpconnectanalyzer/analyzer.go index 0ea0aa5a3..6eacf8dbd 100644 --- a/collector/analyzer/tcpconnectanalyzer/analyzer.go +++ b/collector/analyzer/tcpconnectanalyzer/analyzer.go @@ -127,7 +127,7 @@ func (a *TcpConnectAnalyzer) consumeChannelEvent(event *model.KindlingEvent) { return } - dataGroup := a.generateMetricGroup(connectStats) + dataGroup := a.generateDataGroup(connectStats) a.passThroughConsumers(dataGroup) } @@ -154,18 +154,10 @@ func filterRequestEvent(event *model.KindlingEvent) bool { return false } -func (a *TcpConnectAnalyzer) trimExpiredConnStats() { - connStats := a.connectMonitor.TrimExpiredConnections(a.config.WaitEventSecond * 3) - for _, connStat := range connStats { - dataGroup := a.generateMetricGroup(connStat) - a.passThroughConsumers(dataGroup) - } -} - func (a *TcpConnectAnalyzer) trimConnectionsWithTcpStat() { connStats := a.connectMonitor.TrimConnectionsWithTcpStat(a.config.WaitEventSecond) for _, connStat := range connStats { - dataGroup := a.generateMetricGroup(connStat) + dataGroup := a.generateDataGroup(connStat) a.passThroughConsumers(dataGroup) } } @@ -183,23 +175,32 @@ func (a *TcpConnectAnalyzer) passThroughConsumers(dataGroup *model.DataGroup) { } } -func (a *TcpConnectAnalyzer) generateMetricGroup(connectStats *internal.ConnectionStats) *model.DataGroup { +func (a *TcpConnectAnalyzer) generateDataGroup(connectStats *internal.ConnectionStats) *model.DataGroup { + labels := a.generateLabels(connectStats, true) + countValue := model.NewIntMetric(constnames.TcpConnectTotalMetric, 1) + durationValue := model.NewIntMetric(constnames.TcpConnectDurationMetric, connectStats.GetConnectDuration()) + + retDataGroup := model.NewDataGroup( + constnames.TcpConnectMetricGroupName, + labels, + connectStats.EndTimestamp, + countValue, durationValue) + + return retDataGroup +} + +func (a *TcpConnectAnalyzer) generateLabels(connectStats *internal.ConnectionStats, includeState bool) *model.AttributeMap { labels := model.NewAttributeMap() // The connect events always come from the client-side labels.AddBoolValue(constlabels.IsServer, false) labels.AddStringValue(constlabels.ContainerId, connectStats.ContainerId) labels.AddIntValue(constlabels.Errno, int64(connectStats.Code)) - if connectStats.StateMachine.GetCurrentState() == internal.Closed { - lastState := connectStats.StateMachine.GetLastState() - if lastState == internal.Success { + if includeState { + if connectStats.StateMachine.GetCurrentState() == internal.Success { labels.AddBoolValue(constlabels.Success, true) } else { labels.AddBoolValue(constlabels.Success, false) } - } else if connectStats.StateMachine.GetCurrentState() == internal.Success { - labels.AddBoolValue(constlabels.Success, true) - } else { - labels.AddBoolValue(constlabels.Success, false) } srcIp := connectStats.ConnKey.SrcIP dstIp := connectStats.ConnKey.DstIP @@ -212,17 +213,7 @@ func (a *TcpConnectAnalyzer) generateMetricGroup(connectStats *internal.Connecti dNatIp, dNatPort := a.findDNatTuple(srcIp, uint64(srcPort), dstIp, uint64(dstPort)) labels.AddStringValue(constlabels.DnatIp, dNatIp) labels.AddIntValue(constlabels.DnatPort, dNatPort) - - countValue := model.NewIntMetric(constnames.TcpConnectTotalMetric, 1) - durationValue := model.NewIntMetric(constnames.TcpConnectDurationMetric, connectStats.GetConnectDuration()) - - retDataGroup := model.NewDataGroup( - constnames.TcpConnectMetricGroupName, - labels, - connectStats.EndTimestamp, - countValue, durationValue) - - return retDataGroup + return labels } func (a *TcpConnectAnalyzer) findDNatTuple(sIp string, sPort uint64, dIp string, dPort uint64) (string, int64) { diff --git a/collector/analyzer/tcpconnectanalyzer/internal/connect_monitor.go b/collector/analyzer/tcpconnectanalyzer/internal/connect_monitor.go index c8c6120a0..4fd481512 100644 --- a/collector/analyzer/tcpconnectanalyzer/internal/connect_monitor.go +++ b/collector/analyzer/tcpconnectanalyzer/internal/connect_monitor.go @@ -205,34 +205,10 @@ func (c *ConnectMonitor) readInTcpSetStateFromEstablished(connKey ConnKey, event // Connection has been established and the connStats have been emitted. return nil, nil } + connStats.EndTimestamp = event.Timestamp return connStats.StateMachine.ReceiveEvent(tcpSetStateFromEstablished, c.connMap) } -// TrimExpiredConnections traverses the map, remove the expired entries based on timeout, -// and return them. -// The unit of timeout is second. -func (c *ConnectMonitor) TrimExpiredConnections(timeout int) []*ConnectionStats { - ret := make([]*ConnectionStats, 0) - if timeout <= 0 { - return ret - } - timeoutNano := int64(timeout) * 1000000000 - for _, connStat := range c.connMap { - if time.Now().UnixNano()-int64(connStat.InitialTimestamp) >= timeoutNano { - stats, err := connStat.StateMachine.ReceiveEvent(expiredEvent, c.connMap) - if err != nil { - c.logger.Warn("error happened when receiving event:", zap.Error(err)) - continue - } - if stats != nil { - ret = append(ret, stats) - } - } - } - - return ret -} - func (c *ConnectMonitor) TrimConnectionsWithTcpStat(waitForEventSecond int) []*ConnectionStats { ret := make([]*ConnectionStats, 0, len(c.connMap)) // Only scan once for each pid From a09f49a75afce7193429d7538dead728d3909914 Mon Sep 17 00:00:00 2001 From: Daxin Wang Date: Wed, 1 Jun 2022 19:48:43 +0800 Subject: [PATCH 11/17] fix testcase Signed-off-by: Daxin Wang --- .../internal/state_machine_test.go | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/collector/analyzer/tcpconnectanalyzer/internal/state_machine_test.go b/collector/analyzer/tcpconnectanalyzer/internal/state_machine_test.go index 8ee4cec24..f2ce5c9c0 100644 --- a/collector/analyzer/tcpconnectanalyzer/internal/state_machine_test.go +++ b/collector/analyzer/tcpconnectanalyzer/internal/state_machine_test.go @@ -39,16 +39,4 @@ func TestCallback(t *testing.T) { if connStats.StateMachine.currentStateType != Success { t.Errorf("expected success, got %v", connStats.StateMachine.currentStateType) } - - stats, err = connStats.StateMachine.ReceiveEvent(tcpSetStateFromEstablished, connMap) - if err != nil { - t.Fatal(err) - } - if connStats.StateMachine.currentStateType != Closed { - t.Errorf("expected closed, got %v", connStats.StateMachine.currentStateType) - } - - if len(connMap) != 0 { - t.Errorf("expected empty map, but current map is %v", connMap) - } } From 1a56e26a21de35eee1f489dd4cdcfb0c1647b1a3 Mon Sep 17 00:00:00 2001 From: Daxin Wang Date: Wed, 1 Jun 2022 20:44:44 +0800 Subject: [PATCH 12/17] Only record the connection's duration when it is successfully established Signed-off-by: Daxin Wang --- .../analyzer/tcpconnectanalyzer/analyzer.go | 25 +++++++++++-------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/collector/analyzer/tcpconnectanalyzer/analyzer.go b/collector/analyzer/tcpconnectanalyzer/analyzer.go index 6eacf8dbd..1c456e4e4 100644 --- a/collector/analyzer/tcpconnectanalyzer/analyzer.go +++ b/collector/analyzer/tcpconnectanalyzer/analyzer.go @@ -176,32 +176,35 @@ func (a *TcpConnectAnalyzer) passThroughConsumers(dataGroup *model.DataGroup) { } func (a *TcpConnectAnalyzer) generateDataGroup(connectStats *internal.ConnectionStats) *model.DataGroup { - labels := a.generateLabels(connectStats, true) - countValue := model.NewIntMetric(constnames.TcpConnectTotalMetric, 1) - durationValue := model.NewIntMetric(constnames.TcpConnectDurationMetric, connectStats.GetConnectDuration()) + labels := a.generateLabels(connectStats) + metrics := make([]*model.Metric, 0, 2) + metrics = append(metrics, model.NewIntMetric(constnames.TcpConnectTotalMetric, 1)) + // Only record the connection's duration when it is successfully established + if connectStats.StateMachine.GetCurrentState() == internal.Success { + metrics = append(metrics, model.NewIntMetric(constnames.TcpConnectDurationMetric, connectStats.GetConnectDuration())) + } retDataGroup := model.NewDataGroup( constnames.TcpConnectMetricGroupName, labels, connectStats.EndTimestamp, - countValue, durationValue) + metrics...) return retDataGroup } -func (a *TcpConnectAnalyzer) generateLabels(connectStats *internal.ConnectionStats, includeState bool) *model.AttributeMap { +func (a *TcpConnectAnalyzer) generateLabels(connectStats *internal.ConnectionStats) *model.AttributeMap { labels := model.NewAttributeMap() // The connect events always come from the client-side labels.AddBoolValue(constlabels.IsServer, false) labels.AddStringValue(constlabels.ContainerId, connectStats.ContainerId) labels.AddIntValue(constlabels.Errno, int64(connectStats.Code)) - if includeState { - if connectStats.StateMachine.GetCurrentState() == internal.Success { - labels.AddBoolValue(constlabels.Success, true) - } else { - labels.AddBoolValue(constlabels.Success, false) - } + if connectStats.StateMachine.GetCurrentState() == internal.Success { + labels.AddBoolValue(constlabels.Success, true) + } else { + labels.AddBoolValue(constlabels.Success, false) } + srcIp := connectStats.ConnKey.SrcIP dstIp := connectStats.ConnKey.DstIP srcPort := connectStats.ConnKey.SrcPort From 0338e8389796a9bce283799a17643f7ef22fb893 Mon Sep 17 00:00:00 2001 From: Daxin Wang Date: Thu, 2 Jun 2022 09:39:54 +0800 Subject: [PATCH 13/17] use time.Now as a variable Signed-off-by: Daxin Wang --- .../analyzer/tcpconnectanalyzer/internal/connect_monitor.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/collector/analyzer/tcpconnectanalyzer/internal/connect_monitor.go b/collector/analyzer/tcpconnectanalyzer/internal/connect_monitor.go index 4fd481512..562d925be 100644 --- a/collector/analyzer/tcpconnectanalyzer/internal/connect_monitor.go +++ b/collector/analyzer/tcpconnectanalyzer/internal/connect_monitor.go @@ -214,11 +214,12 @@ func (c *ConnectMonitor) TrimConnectionsWithTcpStat(waitForEventSecond int) []*C // Only scan once for each pid pidTcpStateMap := make(map[uint32]NetSocketStateMap) waitForEventNano := int64(waitForEventSecond) * 1000000000 + timeNow := time.Now().UnixNano() for key, connStat := range c.connMap { if connStat.pid == 0 { continue } - if time.Now().UnixNano()-int64(connStat.InitialTimestamp) < waitForEventNano { + if timeNow-int64(connStat.InitialTimestamp) < waitForEventNano { // Still waiting for other events continue } From 8aa4c6fb43dfa006c91332a12cace14e8951b301 Mon Sep 17 00:00:00 2001 From: Daxin Wang Date: Thu, 2 Jun 2022 10:28:30 +0800 Subject: [PATCH 14/17] add tcp connect metrics description Signed-off-by: Daxin Wang --- docs/prometheus_metrics.md | 42 ++++++++++++++++++++++++++++++++++++-- 1 file changed, 40 insertions(+), 2 deletions(-) diff --git a/docs/prometheus_metrics.md b/docs/prometheus_metrics.md index 5e7ee62f1..008d0a122 100644 --- a/docs/prometheus_metrics.md +++ b/docs/prometheus_metrics.md @@ -165,12 +165,12 @@ We made some rules for considering whether a request is abnormal. For the abnorm | `request_processing_status` | 3 | Processing indicates the duration until receiving the first byte.
1(green): latency <= 200ms
2(yellow): 2003(red): latency >= 1000 | | `response_rspxfer_status` | 1 | RspXfer indicates the duration for transferring response bopayloaddy.
1(green): latency <= 200ms
2(yellow): 2003(red): latency >= 1000 | -## TCP (Layer 4) Metrics +## TCP Status Metrics ### Metrics List | **Metric Name** | **Type** | **Description** | | --- | --- | --- | -| `kindling_tcp_srtt_microseconds` | Gauge | Smoothed round trip time of the tcp socket | +| `kindling_tcp_srtt_microseconds` | Gauge | Smoothed round trip time of the TCP socket | | `kindling_tcp_packet_loss_total` | Counter | Total number of dropped packets | | `kindling_tcp_retransmit_total` | Counter | Total times of retransmitting happens (not packets count) | @@ -196,6 +196,44 @@ We made some rules for considering whether a request is abnormal. For the abnorm | `dst_ip` | 10.1.11.24 | Pod's IP by default. If the destination is not a pod in Kubernetes, this is the IP address of an external entity | | `dst_port` | 80 | The listening port of the destination container, if applicable | +## TCP Connection Establish Metrics + +### Metrics List +| **Metric Name** | **Type** | **Description** | +| --- | --- | --- | +| `kindling_tcp_connect_total` | Counter | Total number of successfully and unsuccessfully established TCP connections | +| `kindling_tcp_connect_duration_nanoseconds_total` | Counter | Total duration of the successfully established TCP connections | + +### Labels List +| **Label Name** | **Example** | **Notes** | +| --- | --- | --- | +| `src_node` | slave-node1 | Which node the source pod is on | +| `src_namespace` | default | Namespace of the source pod | +| `src_workload_kind` | deployment | Workload kind of the source pod | +| `src_workload_name` | business1 | Workload name of the source pod | +| `src_service` | business1-svc | One of the services that target the source pod | +| `src_pod` | business1-0 | The name of the source pod | +| `src_container` | business-container | The name of the source container | +| `src_ip` | 10.1.11.23 | Pod's IP by default. If the source is not a pod in Kubernetes, this is the IP address of an external entity | +| `src_port` | 80 | The listening port of the source container, if applicable | +| `dst_node` | slave-node2 | Which node the destination pod is on | +| `dst_namespace` | default | Namespace of the destination pod | +| `dst_workload_kind` | deployment | Workload kind of the destination pod | +| `dst_workload_name` | business2 | Workload name of the destination pod | +| `dst_service` | business2-svc | One of the services that target the destination pod | +| `dst_pod` | business2-0 | The name of the destination pod | +| `dst_container` | business-container | The name of the destination container | +| `dst_ip` | 10.1.11.24 | Pod's IP by default. If the destination is not a pod in Kubernetes, this is the IP address of an external entity | +| `dst_port` | 80 | The listening port of the destination container, if applicable | +| `success` | true | Whether the TCP connection is successfully established | +| `errno` | 0 | The error number of the TCP connection. 0 if no error. Note it could also be 0 even if there is an error. | + +### Notes +**Note 1**: The field `success` for `kindling_tcp_connect_duration_nanoseconds_total` is always `true`. + +**Note 2**: The field `errno` is not `0` only if the TCP socket is blocking and there is an error happened. There are multiple possible values it could contain. See the `ERRORS` section of the [connect(2) manual](https://man7.org/linux/man-pages/man2/connect.2.html) for more details. + + ## PromQL Example Here are some examples of how to use these metrics in Prometheus, which can help you understand them faster. From e45a8c4ddb52986555ec84cc3f0e654215c13427 Mon Sep 17 00:00:00 2001 From: Daxin Wang Date: Thu, 2 Jun 2022 10:39:12 +0800 Subject: [PATCH 15/17] fix metrics description Signed-off-by: Daxin Wang --- docs/prometheus_metrics.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/docs/prometheus_metrics.md b/docs/prometheus_metrics.md index 008d0a122..28f03c9c7 100644 --- a/docs/prometheus_metrics.md +++ b/docs/prometheus_metrics.md @@ -196,7 +196,7 @@ We made some rules for considering whether a request is abnormal. For the abnorm | `dst_ip` | 10.1.11.24 | Pod's IP by default. If the destination is not a pod in Kubernetes, this is the IP address of an external entity | | `dst_port` | 80 | The listening port of the destination container, if applicable | -## TCP Connection Establish Metrics +## TCP Socket Connects Metrics ### Metrics List | **Metric Name** | **Type** | **Description** | @@ -215,7 +215,6 @@ We made some rules for considering whether a request is abnormal. For the abnorm | `src_pod` | business1-0 | The name of the source pod | | `src_container` | business-container | The name of the source container | | `src_ip` | 10.1.11.23 | Pod's IP by default. If the source is not a pod in Kubernetes, this is the IP address of an external entity | -| `src_port` | 80 | The listening port of the source container, if applicable | | `dst_node` | slave-node2 | Which node the destination pod is on | | `dst_namespace` | default | Namespace of the destination pod | | `dst_workload_kind` | deployment | Workload kind of the destination pod | From f57780ec60ee645f16e3b7f77c114b12e324b296 Mon Sep 17 00:00:00 2001 From: Daxin Wang Date: Thu, 2 Jun 2022 10:42:11 +0800 Subject: [PATCH 16/17] add field src_container_id to metrics description Signed-off-by: Daxin Wang --- docs/prometheus_metrics.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/prometheus_metrics.md b/docs/prometheus_metrics.md index 28f03c9c7..8e36c8bd6 100644 --- a/docs/prometheus_metrics.md +++ b/docs/prometheus_metrics.md @@ -214,6 +214,7 @@ We made some rules for considering whether a request is abnormal. For the abnorm | `src_service` | business1-svc | One of the services that target the source pod | | `src_pod` | business1-0 | The name of the source pod | | `src_container` | business-container | The name of the source container | +| `src_container_id` | 1a2b3c4d5e6f | The shorten container id which contains 12 characters | | `src_ip` | 10.1.11.23 | Pod's IP by default. If the source is not a pod in Kubernetes, this is the IP address of an external entity | | `dst_node` | slave-node2 | Which node the destination pod is on | | `dst_namespace` | default | Namespace of the destination pod | From 86cc6c7b521b77146f0712fe2c9b5e6aacb0e1ac Mon Sep 17 00:00:00 2001 From: Daxin Wang Date: Thu, 2 Jun 2022 10:49:01 +0800 Subject: [PATCH 17/17] add log if there are two tcp_connect events come Signed-off-by: Daxin Wang --- .../analyzer/tcpconnectanalyzer/internal/connect_monitor.go | 1 + 1 file changed, 1 insertion(+) diff --git a/collector/analyzer/tcpconnectanalyzer/internal/connect_monitor.go b/collector/analyzer/tcpconnectanalyzer/internal/connect_monitor.go index 562d925be..9e1cd56cd 100644 --- a/collector/analyzer/tcpconnectanalyzer/internal/connect_monitor.go +++ b/collector/analyzer/tcpconnectanalyzer/internal/connect_monitor.go @@ -143,6 +143,7 @@ func (c *ConnectMonitor) ReadInTcpConnect(event *model.KindlingEvent) (*Connecti c.connMap[connKey] = connStats } else { // Not possible to enter this branch + c.logger.Info("Receive another unexpected tcp_connect event", zap.String("connKey", connKey.String())) connStats.EndTimestamp = event.Timestamp connStats.Code = int(retValueInt) }