Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a switch whether to use Java-Trace to trigger sampling #462

Merged
merged 5 commits into from
Feb 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

## Unreleased
### New features
- Support trace-profiling sampling to reduce data output. ([#446](https://github.com/KindlingProject/kindling/pull/446))
- Support trace-profiling sampling to reduce data output. One trace is sampled every five seconds for each endpoint by default. ([#446](https://github.com/KindlingProject/kindling/pull/446)[#462](https://github.com/KindlingProject/kindling/pull/462))
-

### Enhancements
Expand Down
12 changes: 8 additions & 4 deletions collector/pkg/component/analyzer/cpuanalyzer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ type Config struct {
// SamplingInterval is the sampling interval for the same url.
// The unit is seconds.
SamplingInterval int `mapstructure:"sampling_interval"`
// OpenJavaTraceSampling a switch for whether to use Java-Trace to trigger sampling.
// The default is false.
OpenJavaTraceSampling bool `mapstructure:"open_java_trace_sampling"`
//JavaTraceSlowTime is used to identify the threshold of slow requests recognized by the apm side
// The unit is seconds.
JavaTraceSlowTime int `mapstructure:"java_trace_slow_time"`
Expand All @@ -21,9 +24,10 @@ type Config struct {

func NewDefaultConfig() *Config {
return &Config{
SamplingInterval: 5,
JavaTraceSlowTime: 500,
SegmentSize: 40,
EdgeEventsWindowSize: 2,
SamplingInterval: 5,
OpenJavaTraceSampling: false,
JavaTraceSlowTime: 500,
SegmentSize: 40,
EdgeEventsWindowSize: 2,
}
}
31 changes: 20 additions & 11 deletions collector/pkg/component/analyzer/cpuanalyzer/cpu_analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@ package cpuanalyzer

import (
"fmt"
"github.com/Kindling-project/kindling/collector/pkg/metadata/kubernetes"
"github.com/Kindling-project/kindling/collector/pkg/model/constlabels"
"github.com/Kindling-project/kindling/collector/pkg/model/constvalues"
"strconv"
"sync"
"time"

"github.com/Kindling-project/kindling/collector/pkg/metadata/kubernetes"
"github.com/Kindling-project/kindling/collector/pkg/model/constlabels"
"github.com/Kindling-project/kindling/collector/pkg/model/constvalues"

"go.uber.org/atomic"
"go.uber.org/zap/zapcore"

Expand Down Expand Up @@ -107,7 +108,9 @@ func (ca *CpuAnalyzer) ConsumeTransactionIdEvent(event *model.KindlingEvent) {
}
//ca.sendEventDirectly(event.GetPid(), event.Ctx.ThreadInfo.GetTid(), event.Ctx.ThreadInfo.Comm, ev)
ca.PutEventToSegments(event.GetPid(), event.Ctx.ThreadInfo.GetTid(), event.Ctx.ThreadInfo.Comm, ev)
ca.analyzerJavaTraceTime(ev)
if ca.cfg.OpenJavaTraceSampling {
ca.analyzerJavaTraceTime(ev)
}
}

func (ca *CpuAnalyzer) analyzerJavaTraceTime(ev *TransactionIdEvent) {
Expand Down Expand Up @@ -180,15 +183,21 @@ func (ca *CpuAnalyzer) ConsumeSpanEvent(event *model.KindlingEvent) {
ca.PutEventToSegments(event.GetPid(), event.Ctx.ThreadInfo.GetTid(), event.Ctx.ThreadInfo.Comm, ev)
}

func (ca *CpuAnalyzer) ConsumeTraces(trace SendTriggerEvent) {
tid := trace.OriginalData.Labels.GetIntValue(constlabels.RequestTid)
threadName := trace.OriginalData.Labels.GetStringValue(constlabels.Comm)
func (ca *CpuAnalyzer) ConsumeTraces(trace *model.DataGroup) {
pid := trace.Labels.GetIntValue("pid")
tid := trace.Labels.GetIntValue(constlabels.RequestTid)
threadName := trace.Labels.GetStringValue(constlabels.Comm)
duration, ok := trace.GetMetric(constvalues.RequestTotalTime)
if !ok {
ca.telemetry.Logger.Warnf("No request_total_time in the trace, pid=%d, threadName=%s", pid, threadName)
return
}
event := &InnerCall{
StartTime: trace.StartTime,
EndTime: trace.StartTime + trace.SpendTime,
Trace: trace.OriginalData,
StartTime: trace.Timestamp,
EndTime: trace.Timestamp + uint64(duration.GetInt().Value),
Trace: trace,
}
ca.PutEventToSegments(trace.Pid, uint32(tid), threadName, event)
ca.PutEventToSegments(uint32(pid), uint32(tid), threadName, event)
}

func (ca *CpuAnalyzer) ConsumeCpuEvent(event *model.KindlingEvent) {
Expand Down
12 changes: 9 additions & 3 deletions collector/pkg/component/analyzer/cpuanalyzer/module_control.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package cpuanalyzer

import "sync"
import (
"sync"

"github.com/Kindling-project/kindling/collector/pkg/model"
)

func (ca *CpuAnalyzer) ProfileModule() (submodule string, start func() error, stop func() error) {
return "cpuanalyzer", ca.StartProfile, ca.StopProfile
Expand All @@ -9,10 +13,12 @@ func (ca *CpuAnalyzer) ProfileModule() (submodule string, start func() error, st
func (ca *CpuAnalyzer) StartProfile() error {
// control flow changed
// Note that these two variables belongs to the package
sendChannel = make(chan SendTriggerEvent, 3e5)
triggerEventChan = make(chan SendTriggerEvent, 3e5)
traceChan = make(chan *model.DataGroup, 1e4)
enableProfile = true
once = sync.Once{}
go ca.ReceiveSendSignal()
go ca.ReadTriggerEventChan()
go ca.ReadTraceChan()
return nil
}

Expand Down
74 changes: 47 additions & 27 deletions collector/pkg/component/analyzer/cpuanalyzer/send_trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ import (
)

var (
enableProfile bool
once sync.Once
sendChannel chan SendTriggerEvent
sampleMap sync.Map
isInstallApm map[uint64]bool
enableProfile bool
once sync.Once
triggerEventChan chan SendTriggerEvent
traceChan chan *model.DataGroup
sampleMap sync.Map
isInstallApm map[uint64]bool
)

func init() {
Expand All @@ -30,23 +31,43 @@ func ReceiveDataGroupAsSignal(data *model.DataGroup) {
once.Do(func() {
// We must close the channel at the sender-side.
// Otherwise, we need complex codes to handle it.
if sendChannel != nil {
close(sendChannel)
if triggerEventChan != nil {
close(triggerEventChan)
}
if traceChan != nil {
close(traceChan)
}
})
return
}
if data.Labels.GetBoolValue("isInstallApm") {
isFromApm := data.Labels.GetBoolValue("isInstallApm")
if isFromApm {
isInstallApm[uint64(data.Labels.GetIntValue("pid"))] = true
if !data.Labels.GetBoolValue(constlabels.IsSlow) {
return
}
// We save the trace to sampleMap to make it as the sending trigger event.
pidString := strconv.FormatInt(data.Labels.GetIntValue("pid"), 10)
// The data is unnecessary to be cloned as it won't be reused.
sampleMap.LoadOrStore(data.Labels.GetStringValue(constlabels.ContentKey)+pidString, data)
} else {
if isInstallApm[uint64(data.Labels.GetIntValue("pid"))] && data.Labels.GetBoolValue(constlabels.IsServer) {
if !data.Labels.GetBoolValue(constlabels.IsSlow) {
return
}
}
if data.Labels.GetBoolValue(constlabels.IsSlow) {
_, ok := sampleMap.Load(data.Labels.GetStringValue(constlabels.ContentKey) + strconv.FormatInt(data.Labels.GetIntValue("pid"), 10))
if !ok {
sampleMap.Store(data.Labels.GetStringValue(constlabels.ContentKey)+strconv.FormatInt(data.Labels.GetIntValue("pid"), 10), data)
// Clone the data for further usage. Otherwise, it will be reused and loss fields.
trace := data.Clone()
// CpuAnalyzer consumes all traces from the client-side to add them to TimeSegments for data enrichment.
// Now we don't store the trace from the server-side due to the storage concern.
if !trace.Labels.GetBoolValue(constlabels.IsServer) {
traceChan <- trace
// The trace sent from the client-side won't be treated as trigger event, so we just return here.
return
}
// If the data is not from APM while there have been APM traces received, we don't make it as a signal.
if !isInstallApm[uint64(data.Labels.GetIntValue("pid"))] {
// We save the trace to sampleMap to make it as the sending trigger event.
pidString := strconv.FormatInt(data.Labels.GetIntValue("pid"), 10)
sampleMap.LoadOrStore(data.Labels.GetStringValue(constlabels.ContentKey)+pidString, trace)
}
}
}
Expand All @@ -58,17 +79,18 @@ type SendTriggerEvent struct {
OriginalData *model.DataGroup `json:"originalData"`
}

// ReceiveSendSignal todo: Modify the sampling algorithm to ensure that the data sampled by each multi-trace system is uniform. Now this is written because it is easier to implement
func (ca *CpuAnalyzer) ReceiveSendSignal() {
// ReadTraceChan reads the trace channel and make cpuanalyzer consume them as general events.
func (ca *CpuAnalyzer) ReadTraceChan() {
// Break the for loop if the channel is closed
for sendContent := range sendChannel {
// CpuAnalyzer consumes all traces from the client-side to add them to TimeSegments
// These traces are not considered as signals, so we skip them here. Note they won't
// be consumed by the following consumers.
if !sendContent.OriginalData.Labels.GetBoolValue(constlabels.IsServer) {
ca.ConsumeTraces(sendContent)
continue
}
for trace := range traceChan {
ca.ConsumeTraces(trace)
}
}

// ReadTriggerEventChan reads the triggerEvent channel and creates tasks to send cpuEvents.
func (ca *CpuAnalyzer) ReadTriggerEventChan() {
// Break the for loop if the channel is closed
for sendContent := range triggerEventChan {
// Only send the slow traces as the signals
if !sendContent.OriginalData.Labels.GetBoolValue(constlabels.IsSlow) {
continue
Expand Down Expand Up @@ -132,14 +154,12 @@ func (ca *CpuAnalyzer) sampleSend() {
SpendTime: uint64(duration.GetInt().Value),
OriginalData: data,
}
sendChannel <- event
triggerEventChan <- event
sampleMap.Delete(k)
return true
})

}
}

}

func (ca *CpuAnalyzer) sendEvents(keyElements *model.AttributeMap, pid uint32, startTime uint64, endTime uint64) {
Expand Down