diff --git a/collector/docker/kindling-collector-config.yml b/collector/docker/kindling-collector-config.yml index 6f376cbc6..00add4165 100644 --- a/collector/docker/kindling-collector-config.yml +++ b/collector/docker/kindling-collector-config.yml @@ -51,6 +51,12 @@ analyzers: # edge_events_window_size is the size of the duration window that seats the edge events. # The unit is second. The greater it is, the more data will be stored. edge_events_window_size: 2 + # java_trace_delete_interval is the interval for cleaning up expired data in javatraces. + # The unit is seconds. + java_trace_delete_interval: 20 + # java_trace_expiration_time is the expiration time for data in javatraces. + # The unit is seconds. + java_trace_expiration_time: 120 tcpconnectanalyzer: channel_size: 10000 wait_event_second: 10 @@ -254,4 +260,4 @@ observability: # Note: DO NOT add the prefix "http://" endpoint: 10.10.10.10:8080 stdout: - collect_period: 15s \ No newline at end of file + collect_period: 15s diff --git a/collector/pkg/component/analyzer/cpuanalyzer/config.go b/collector/pkg/component/analyzer/cpuanalyzer/config.go index 18ee61eb3..64e395d42 100644 --- a/collector/pkg/component/analyzer/cpuanalyzer/config.go +++ b/collector/pkg/component/analyzer/cpuanalyzer/config.go @@ -20,14 +20,22 @@ type Config struct { // EdgeEventsWindowSize is the size of the duration window that seats the edge events. // The unit is seconds. The greater it is, the more data will be stored. EdgeEventsWindowSize int `mapstructure:"edge_events_window_size"` + // JavaTraceDeleteInterval is the interval for cleaning up expired data in javatraces. + // The unit is seconds. + JavaTraceDeleteInterval int `mapstructure:"java_trace_delete_interval"` + // JavaTraceExpirationTime is the expiration time for data in javatraces. + // The unit is seconds. + JavaTraceExpirationTime int `mapstructure:"java_trace_expiration_time"` } func NewDefaultConfig() *Config { return &Config{ - SamplingInterval: 5, - OpenJavaTraceSampling: false, - JavaTraceSlowTime: 500, - SegmentSize: 40, - EdgeEventsWindowSize: 2, + SamplingInterval: 5, + OpenJavaTraceSampling: false, + JavaTraceSlowTime: 500, + SegmentSize: 40, + EdgeEventsWindowSize: 2, + JavaTraceDeleteInterval: 20, + JavaTraceExpirationTime: 120, } } diff --git a/collector/pkg/component/analyzer/cpuanalyzer/cpu_analyzer.go b/collector/pkg/component/analyzer/cpuanalyzer/cpu_analyzer.go index 8b3263815..f37ff65ac 100644 --- a/collector/pkg/component/analyzer/cpuanalyzer/cpu_analyzer.go +++ b/collector/pkg/component/analyzer/cpuanalyzer/cpu_analyzer.go @@ -27,17 +27,18 @@ const ( ) type CpuAnalyzer struct { - cfg *Config - cpuPidEvents map[uint32]map[uint32]*TimeSegments - routineSize *atomic.Int32 - lock sync.RWMutex - telemetry *component.TelemetryTools - tidExpiredQueue *tidDeleteQueue - javaTraces map[string]*TransactionIdEvent - nextConsumers []consumer.Consumer - metadata *kubernetes.K8sMetaDataCache - - stopProfileChan chan struct{} + cfg *Config + cpuPidEvents map[uint32]map[uint32]*TimeSegments + routineSize *atomic.Int32 + lock sync.RWMutex + jtlock sync.RWMutex + telemetry *component.TelemetryTools + tidExpiredQueue *tidDeleteQueue + javaTraces map[string]*TransactionIdEvent + javaTraceExpiredQueue *javaTraceDeleteQueue + nextConsumers []consumer.Consumer + metadata *kubernetes.K8sMetaDataCache + stopProfileChan chan struct{} } func (ca *CpuAnalyzer) Type() analyzer.Type { @@ -59,13 +60,16 @@ func NewCpuAnalyzer(cfg interface{}, telemetry *component.TelemetryTools, consum } ca.cpuPidEvents = make(map[uint32]map[uint32]*TimeSegments, 100000) ca.tidExpiredQueue = newTidDeleteQueue() + ca.javaTraceExpiredQueue = newJavaTraceDeleteQueue() ca.javaTraces = make(map[string]*TransactionIdEvent, 100000) newSelfMetrics(telemetry.MeterProvider, ca) return ca } func (ca *CpuAnalyzer) Start() error { - // Disable receiving and sending the profiling data by default. + interval := time.Duration(ca.cfg.JavaTraceDeleteInterval) * time.Second + expiredDuration := time.Duration(ca.cfg.JavaTraceExpirationTime) * time.Second + go ca.JavaTraceDelete(interval, expiredDuration) return nil } @@ -116,10 +120,18 @@ func (ca *CpuAnalyzer) ConsumeTransactionIdEvent(event *model.KindlingEvent) { } func (ca *CpuAnalyzer) analyzerJavaTraceTime(ev *TransactionIdEvent) { + ca.jtlock.Lock() + defer ca.jtlock.Unlock() + key := ev.TraceId + ev.PidString + ca.javaTraceExpiredQueue.Push(deleteVal{key: key, enterTime: time.Now()}) if ev.IsEntry == 1 { - ca.javaTraces[ev.TraceId+ev.PidString] = ev + ca.javaTraces[key] = ev } else { - oldEvent := ca.javaTraces[ev.TraceId+ev.PidString] + oldEvent, ok := ca.javaTraces[key] + if !ok { + ca.telemetry.Logger.Warnf("No javaTraces traceid=%s, pid=%s", ev.TraceId, ev.PidString) + return + } pid, _ := strconv.ParseInt(ev.PidString, 10, 64) spendTime := ev.Timestamp - oldEvent.Timestamp contentKey := oldEvent.Url diff --git a/collector/pkg/component/analyzer/cpuanalyzer/delete_javatrace.go b/collector/pkg/component/analyzer/cpuanalyzer/delete_javatrace.go new file mode 100644 index 000000000..8028d26b3 --- /dev/null +++ b/collector/pkg/component/analyzer/cpuanalyzer/delete_javatrace.go @@ -0,0 +1,75 @@ +package cpuanalyzer + +import ( + "sync" + "time" +) + +type javaTraceDeleteQueue struct { + queueMutex sync.Mutex + queue []deleteVal +} + +type deleteVal struct { + key string + enterTime time.Time +} + +func newJavaTraceDeleteQueue() *javaTraceDeleteQueue { + return &javaTraceDeleteQueue{queue: make([]deleteVal, 0)} +} + +func (dq *javaTraceDeleteQueue) GetFront() *deleteVal { + if len(dq.queue) > 0 { + return &dq.queue[0] + } + return nil +} + +func (dq *javaTraceDeleteQueue) Push(elem deleteVal) { + dq.queue = append(dq.queue, elem) +} + +func (dq *javaTraceDeleteQueue) Pop() { + if len(dq.queue) > 0 { + dq.queue = dq.queue[1:len(dq.queue)] + } +} + +func (ca *CpuAnalyzer) JavaTraceDelete(interval time.Duration, expiredDuration time.Duration) { + for { + select { + case <-ca.stopProfileChan: + return + case <-time.After(interval): + ca.telemetry.Logger.Debug("Start regular cleaning of javatrace...") + now := time.Now() + func() { + ca.javaTraceExpiredQueue.queueMutex.Lock() + defer ca.javaTraceExpiredQueue.queueMutex.Unlock() + for { + val := ca.javaTraceExpiredQueue.GetFront() + if val == nil { + break + } + if val.enterTime.Add(expiredDuration).After(now) { + break + } + + func() { + ca.jtlock.Lock() + defer ca.jtlock.Unlock() + event := ca.javaTraces[val.key] + if event == nil { + ca.javaTraceExpiredQueue.Pop() + } else { + ca.telemetry.Logger.Debugf("Delete expired javatrace... pid=%s, tid=%s", event.PidString, event.TraceId) + delete(ca.javaTraces, val.key) + ca.javaTraceExpiredQueue.Pop() + } + }() + } + }() + } + } +} diff --git a/collector/pkg/component/analyzer/cpuanalyzer/delete_javatrace_test.go b/collector/pkg/component/analyzer/cpuanalyzer/delete_javatrace_test.go new file mode 100644 index 000000000..027801342 --- /dev/null +++ b/collector/pkg/component/analyzer/cpuanalyzer/delete_javatrace_test.go @@ -0,0 +1,45 @@ +package cpuanalyzer + +import ( + "math/rand" + "strconv" + "testing" + "time" + + "github.com/Kindling-project/kindling/collector/pkg/component" +) + +var cnt int + +func TestJavaTraceDeleteQueue(t *testing.T) { + + jt := make(map[string]*TransactionIdEvent, 100000) + testTelemetry := component.NewTelemetryManager().GetGlobalTelemetryTools() + mycfg := &Config{SegmentSize: 40, JavaTraceDeleteInterval: 15, JavaTraceExpirationTime: 10} + ca = &CpuAnalyzer{javaTraces: jt, telemetry: testTelemetry, cfg: mycfg} + ca.javaTraceExpiredQueue = newJavaTraceDeleteQueue() + go ca.JavaTraceDelete(1*time.Second, 1*time.Second) + + for i := 0; i < 20; i++ { + ev := new(TransactionIdEvent) + ev.TraceId = strconv.Itoa(rand.Intn(10000)) + ev.PidString = strconv.Itoa(rand.Intn(10000)) + ev.IsEntry = 1 + key := ev.TraceId + ev.PidString + ca.javaTraces[key] = ev + val := new(deleteVal) + val.key = ev.TraceId + ev.PidString + val.enterTime = time.Now() + ca.javaTraceExpiredQueue.Push(*val) + t.Logf("pid=%s, tid=%s enter time=%s\n", ev.PidString, ev.TraceId, val.enterTime.Format("2006-01-02 15:04:05.000")) + cnt++ + } + time.Sleep(5 * time.Second) + + if len(ca.javaTraces) != 0 { + t.Fatalf("The number of javatraces that entering and exiting the queue is not equal! "+ + "enterCount=%d , len of javatrace is : %d\n", cnt, len(ca.javaTraces)) + } else { + t.Logf("All javatraces have cleaned normally. enterCount=%d\n", cnt) + } +} diff --git a/deploy/agent/kindling-collector-config.yml b/deploy/agent/kindling-collector-config.yml index 9289c1f65..8fc6daec7 100644 --- a/deploy/agent/kindling-collector-config.yml +++ b/deploy/agent/kindling-collector-config.yml @@ -51,6 +51,12 @@ analyzers: # edge_events_window_size is the size of the duration window that seats the edge events. # The unit is second. The greater it is, the more data will be stored. edge_events_window_size: 2 + # java_trace_delete_interval is the interval for cleaning up expired data in javatraces. + # The unit is seconds. + java_trace_delete_interval: 20 + # java_trace_expiration_time is the expiration time for data in javatraces. + # The unit is seconds. + java_trace_expiration_time: 120 tcpconnectanalyzer: channel_size: 10000 wait_event_second: 10 @@ -253,4 +259,4 @@ observability: # Note: DO NOT add the prefix "http://" endpoint: 10.10.10.10:8080 stdout: - collect_period: 15s \ No newline at end of file + collect_period: 15s