diff --git a/CHANGELOG.md b/CHANGELOG.md index 2952f516a..8eaba7a97 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,7 +5,7 @@ ## Unreleased ### New features -- Support trace-profiling sampling to reduce data output. ([#446](https://github.com/KindlingProject/kindling/pull/446)) +- Support trace-profiling sampling to reduce data output. One trace is sampled every five seconds for each endpoint by default. ([#446](https://github.com/KindlingProject/kindling/pull/446)[#462](https://github.com/KindlingProject/kindling/pull/462)) - ### Enhancements diff --git a/collector/pkg/component/analyzer/cpuanalyzer/config.go b/collector/pkg/component/analyzer/cpuanalyzer/config.go index 9ae39dde7..18ee61eb3 100644 --- a/collector/pkg/component/analyzer/cpuanalyzer/config.go +++ b/collector/pkg/component/analyzer/cpuanalyzer/config.go @@ -8,6 +8,9 @@ type Config struct { // SamplingInterval is the sampling interval for the same url. // The unit is seconds. SamplingInterval int `mapstructure:"sampling_interval"` + // OpenJavaTraceSampling a switch for whether to use Java-Trace to trigger sampling. + // The default is false. + OpenJavaTraceSampling bool `mapstructure:"open_java_trace_sampling"` //JavaTraceSlowTime is used to identify the threshold of slow requests recognized by the apm side // The unit is seconds. JavaTraceSlowTime int `mapstructure:"java_trace_slow_time"` @@ -21,9 +24,10 @@ type Config struct { func NewDefaultConfig() *Config { return &Config{ - SamplingInterval: 5, - JavaTraceSlowTime: 500, - SegmentSize: 40, - EdgeEventsWindowSize: 2, + SamplingInterval: 5, + OpenJavaTraceSampling: false, + JavaTraceSlowTime: 500, + SegmentSize: 40, + EdgeEventsWindowSize: 2, } } diff --git a/collector/pkg/component/analyzer/cpuanalyzer/cpu_analyzer.go b/collector/pkg/component/analyzer/cpuanalyzer/cpu_analyzer.go index 42206c4db..7d5d07f56 100644 --- a/collector/pkg/component/analyzer/cpuanalyzer/cpu_analyzer.go +++ b/collector/pkg/component/analyzer/cpuanalyzer/cpu_analyzer.go @@ -2,13 +2,14 @@ package cpuanalyzer import ( "fmt" - "github.com/Kindling-project/kindling/collector/pkg/metadata/kubernetes" - "github.com/Kindling-project/kindling/collector/pkg/model/constlabels" - "github.com/Kindling-project/kindling/collector/pkg/model/constvalues" "strconv" "sync" "time" + "github.com/Kindling-project/kindling/collector/pkg/metadata/kubernetes" + "github.com/Kindling-project/kindling/collector/pkg/model/constlabels" + "github.com/Kindling-project/kindling/collector/pkg/model/constvalues" + "go.uber.org/atomic" "go.uber.org/zap/zapcore" @@ -107,7 +108,9 @@ func (ca *CpuAnalyzer) ConsumeTransactionIdEvent(event *model.KindlingEvent) { } //ca.sendEventDirectly(event.GetPid(), event.Ctx.ThreadInfo.GetTid(), event.Ctx.ThreadInfo.Comm, ev) ca.PutEventToSegments(event.GetPid(), event.Ctx.ThreadInfo.GetTid(), event.Ctx.ThreadInfo.Comm, ev) - ca.analyzerJavaTraceTime(ev) + if ca.cfg.OpenJavaTraceSampling { + ca.analyzerJavaTraceTime(ev) + } } func (ca *CpuAnalyzer) analyzerJavaTraceTime(ev *TransactionIdEvent) { @@ -180,15 +183,21 @@ func (ca *CpuAnalyzer) ConsumeSpanEvent(event *model.KindlingEvent) { ca.PutEventToSegments(event.GetPid(), event.Ctx.ThreadInfo.GetTid(), event.Ctx.ThreadInfo.Comm, ev) } -func (ca *CpuAnalyzer) ConsumeTraces(trace SendTriggerEvent) { - tid := trace.OriginalData.Labels.GetIntValue(constlabels.RequestTid) - threadName := trace.OriginalData.Labels.GetStringValue(constlabels.Comm) +func (ca *CpuAnalyzer) ConsumeTraces(trace *model.DataGroup) { + pid := trace.Labels.GetIntValue("pid") + tid := trace.Labels.GetIntValue(constlabels.RequestTid) + threadName := trace.Labels.GetStringValue(constlabels.Comm) + duration, ok := trace.GetMetric(constvalues.RequestTotalTime) + if !ok { + ca.telemetry.Logger.Warnf("No request_total_time in the trace, pid=%d, threadName=%s", pid, threadName) + return + } event := &InnerCall{ - StartTime: trace.StartTime, - EndTime: trace.StartTime + trace.SpendTime, - Trace: trace.OriginalData, + StartTime: trace.Timestamp, + EndTime: trace.Timestamp + uint64(duration.GetInt().Value), + Trace: trace, } - ca.PutEventToSegments(trace.Pid, uint32(tid), threadName, event) + ca.PutEventToSegments(uint32(pid), uint32(tid), threadName, event) } func (ca *CpuAnalyzer) ConsumeCpuEvent(event *model.KindlingEvent) { diff --git a/collector/pkg/component/analyzer/cpuanalyzer/module_control.go b/collector/pkg/component/analyzer/cpuanalyzer/module_control.go index 5b798cfc4..81ff70ad1 100644 --- a/collector/pkg/component/analyzer/cpuanalyzer/module_control.go +++ b/collector/pkg/component/analyzer/cpuanalyzer/module_control.go @@ -1,6 +1,10 @@ package cpuanalyzer -import "sync" +import ( + "sync" + + "github.com/Kindling-project/kindling/collector/pkg/model" +) func (ca *CpuAnalyzer) ProfileModule() (submodule string, start func() error, stop func() error) { return "cpuanalyzer", ca.StartProfile, ca.StopProfile @@ -9,10 +13,12 @@ func (ca *CpuAnalyzer) ProfileModule() (submodule string, start func() error, st func (ca *CpuAnalyzer) StartProfile() error { // control flow changed // Note that these two variables belongs to the package - sendChannel = make(chan SendTriggerEvent, 3e5) + triggerEventChan = make(chan SendTriggerEvent, 3e5) + traceChan = make(chan *model.DataGroup, 1e4) enableProfile = true once = sync.Once{} - go ca.ReceiveSendSignal() + go ca.ReadTriggerEventChan() + go ca.ReadTraceChan() return nil } diff --git a/collector/pkg/component/analyzer/cpuanalyzer/send_trigger.go b/collector/pkg/component/analyzer/cpuanalyzer/send_trigger.go index 089144499..42462f0bc 100644 --- a/collector/pkg/component/analyzer/cpuanalyzer/send_trigger.go +++ b/collector/pkg/component/analyzer/cpuanalyzer/send_trigger.go @@ -12,11 +12,12 @@ import ( ) var ( - enableProfile bool - once sync.Once - sendChannel chan SendTriggerEvent - sampleMap sync.Map - isInstallApm map[uint64]bool + enableProfile bool + once sync.Once + triggerEventChan chan SendTriggerEvent + traceChan chan *model.DataGroup + sampleMap sync.Map + isInstallApm map[uint64]bool ) func init() { @@ -30,23 +31,43 @@ func ReceiveDataGroupAsSignal(data *model.DataGroup) { once.Do(func() { // We must close the channel at the sender-side. // Otherwise, we need complex codes to handle it. - if sendChannel != nil { - close(sendChannel) + if triggerEventChan != nil { + close(triggerEventChan) + } + if traceChan != nil { + close(traceChan) } }) return } - if data.Labels.GetBoolValue("isInstallApm") { + isFromApm := data.Labels.GetBoolValue("isInstallApm") + if isFromApm { isInstallApm[uint64(data.Labels.GetIntValue("pid"))] = true + if !data.Labels.GetBoolValue(constlabels.IsSlow) { + return + } + // We save the trace to sampleMap to make it as the sending trigger event. + pidString := strconv.FormatInt(data.Labels.GetIntValue("pid"), 10) + // The data is unnecessary to be cloned as it won't be reused. + sampleMap.LoadOrStore(data.Labels.GetStringValue(constlabels.ContentKey)+pidString, data) } else { - if isInstallApm[uint64(data.Labels.GetIntValue("pid"))] && data.Labels.GetBoolValue(constlabels.IsServer) { + if !data.Labels.GetBoolValue(constlabels.IsSlow) { return } - } - if data.Labels.GetBoolValue(constlabels.IsSlow) { - _, ok := sampleMap.Load(data.Labels.GetStringValue(constlabels.ContentKey) + strconv.FormatInt(data.Labels.GetIntValue("pid"), 10)) - if !ok { - sampleMap.Store(data.Labels.GetStringValue(constlabels.ContentKey)+strconv.FormatInt(data.Labels.GetIntValue("pid"), 10), data) + // Clone the data for further usage. Otherwise, it will be reused and loss fields. + trace := data.Clone() + // CpuAnalyzer consumes all traces from the client-side to add them to TimeSegments for data enrichment. + // Now we don't store the trace from the server-side due to the storage concern. + if !trace.Labels.GetBoolValue(constlabels.IsServer) { + traceChan <- trace + // The trace sent from the client-side won't be treated as trigger event, so we just return here. + return + } + // If the data is not from APM while there have been APM traces received, we don't make it as a signal. + if !isInstallApm[uint64(data.Labels.GetIntValue("pid"))] { + // We save the trace to sampleMap to make it as the sending trigger event. + pidString := strconv.FormatInt(data.Labels.GetIntValue("pid"), 10) + sampleMap.LoadOrStore(data.Labels.GetStringValue(constlabels.ContentKey)+pidString, trace) } } } @@ -58,17 +79,18 @@ type SendTriggerEvent struct { OriginalData *model.DataGroup `json:"originalData"` } -// ReceiveSendSignal todo: Modify the sampling algorithm to ensure that the data sampled by each multi-trace system is uniform. Now this is written because it is easier to implement -func (ca *CpuAnalyzer) ReceiveSendSignal() { +// ReadTraceChan reads the trace channel and make cpuanalyzer consume them as general events. +func (ca *CpuAnalyzer) ReadTraceChan() { // Break the for loop if the channel is closed - for sendContent := range sendChannel { - // CpuAnalyzer consumes all traces from the client-side to add them to TimeSegments - // These traces are not considered as signals, so we skip them here. Note they won't - // be consumed by the following consumers. - if !sendContent.OriginalData.Labels.GetBoolValue(constlabels.IsServer) { - ca.ConsumeTraces(sendContent) - continue - } + for trace := range traceChan { + ca.ConsumeTraces(trace) + } +} + +// ReadTriggerEventChan reads the triggerEvent channel and creates tasks to send cpuEvents. +func (ca *CpuAnalyzer) ReadTriggerEventChan() { + // Break the for loop if the channel is closed + for sendContent := range triggerEventChan { // Only send the slow traces as the signals if !sendContent.OriginalData.Labels.GetBoolValue(constlabels.IsSlow) { continue @@ -132,14 +154,12 @@ func (ca *CpuAnalyzer) sampleSend() { SpendTime: uint64(duration.GetInt().Value), OriginalData: data, } - sendChannel <- event + triggerEventChan <- event sampleMap.Delete(k) return true }) - } } - } func (ca *CpuAnalyzer) sendEvents(keyElements *model.AttributeMap, pid uint32, startTime uint64, endTime uint64) {