Skip to content

Commit

Permalink
scheduled cleaning in javatraces, and add null pointer checking (#514)
Browse files Browse the repository at this point in the history
Signed-off-by: Hui <anthonyhui@126.com>
Signed-off-by: AnthonyHui <48346142+hwz779866221@users.noreply.github.com>
Signed-off-by: anthonyhui <anthonyhui@126.com>
Co-authored-by: Hui <anthonyhui@126.com>
  • Loading branch information
hwz779866221 and Hui authored Sep 1, 2023
1 parent 42e5ad5 commit c9d4476
Show file tree
Hide file tree
Showing 6 changed files with 173 additions and 21 deletions.
8 changes: 7 additions & 1 deletion collector/docker/kindling-collector-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ analyzers:
# edge_events_window_size is the size of the duration window that seats the edge events.
# The unit is second. The greater it is, the more data will be stored.
edge_events_window_size: 2
# java_trace_delete_interval is the interval for cleaning up expired data in javatraces.
# The unit is seconds.
java_trace_delete_interval: 20
# java_trace_expiration_time is the expiration time for data in javatraces.
# The unit is seconds.
java_trace_expiration_time: 120
tcpconnectanalyzer:
channel_size: 10000
wait_event_second: 10
Expand Down Expand Up @@ -254,4 +260,4 @@ observability:
# Note: DO NOT add the prefix "http://"
endpoint: 10.10.10.10:8080
stdout:
collect_period: 15s
collect_period: 15s
18 changes: 13 additions & 5 deletions collector/pkg/component/analyzer/cpuanalyzer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,22 @@ type Config struct {
// EdgeEventsWindowSize is the size of the duration window that seats the edge events.
// The unit is seconds. The greater it is, the more data will be stored.
EdgeEventsWindowSize int `mapstructure:"edge_events_window_size"`
// JavaTraceDeleteInterval is the interval for cleaning up expired data in javatraces.
// The unit is seconds.
JavaTraceDeleteInterval int `mapstructure:"java_trace_delete_interval"`
// JavaTraceExpirationTime is the expiration time for data in javatraces.
// The unit is seconds.
JavaTraceExpirationTime int `mapstructure:"java_trace_expiration_time"`
}

func NewDefaultConfig() *Config {
return &Config{
SamplingInterval: 5,
OpenJavaTraceSampling: false,
JavaTraceSlowTime: 500,
SegmentSize: 40,
EdgeEventsWindowSize: 2,
SamplingInterval: 5,
OpenJavaTraceSampling: false,
JavaTraceSlowTime: 500,
SegmentSize: 40,
EdgeEventsWindowSize: 2,
JavaTraceDeleteInterval: 20,
JavaTraceExpirationTime: 120,
}
}
40 changes: 26 additions & 14 deletions collector/pkg/component/analyzer/cpuanalyzer/cpu_analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,18 @@ const (
)

type CpuAnalyzer struct {
cfg *Config
cpuPidEvents map[uint32]map[uint32]*TimeSegments
routineSize *atomic.Int32
lock sync.RWMutex
telemetry *component.TelemetryTools
tidExpiredQueue *tidDeleteQueue
javaTraces map[string]*TransactionIdEvent
nextConsumers []consumer.Consumer
metadata *kubernetes.K8sMetaDataCache

stopProfileChan chan struct{}
cfg *Config
cpuPidEvents map[uint32]map[uint32]*TimeSegments
routineSize *atomic.Int32
lock sync.RWMutex
jtlock sync.RWMutex
telemetry *component.TelemetryTools
tidExpiredQueue *tidDeleteQueue
javaTraces map[string]*TransactionIdEvent
javaTraceExpiredQueue *javaTraceDeleteQueue
nextConsumers []consumer.Consumer
metadata *kubernetes.K8sMetaDataCache
stopProfileChan chan struct{}
}

func (ca *CpuAnalyzer) Type() analyzer.Type {
Expand All @@ -59,13 +60,16 @@ func NewCpuAnalyzer(cfg interface{}, telemetry *component.TelemetryTools, consum
}
ca.cpuPidEvents = make(map[uint32]map[uint32]*TimeSegments, 100000)
ca.tidExpiredQueue = newTidDeleteQueue()
ca.javaTraceExpiredQueue = newJavaTraceDeleteQueue()
ca.javaTraces = make(map[string]*TransactionIdEvent, 100000)
newSelfMetrics(telemetry.MeterProvider, ca)
return ca
}

func (ca *CpuAnalyzer) Start() error {
// Disable receiving and sending the profiling data by default.
interval := time.Duration(ca.cfg.JavaTraceDeleteInterval) * time.Second
expiredDuration := time.Duration(ca.cfg.JavaTraceExpirationTime) * time.Second
go ca.JavaTraceDelete(interval, expiredDuration)
return nil
}

Expand Down Expand Up @@ -116,10 +120,18 @@ func (ca *CpuAnalyzer) ConsumeTransactionIdEvent(event *model.KindlingEvent) {
}

func (ca *CpuAnalyzer) analyzerJavaTraceTime(ev *TransactionIdEvent) {
ca.jtlock.Lock()
defer ca.jtlock.Unlock()
key := ev.TraceId + ev.PidString
ca.javaTraceExpiredQueue.Push(deleteVal{key: key, enterTime: time.Now()})
if ev.IsEntry == 1 {
ca.javaTraces[ev.TraceId+ev.PidString] = ev
ca.javaTraces[key] = ev
} else {
oldEvent := ca.javaTraces[ev.TraceId+ev.PidString]
oldEvent, ok := ca.javaTraces[key]
if !ok {
ca.telemetry.Logger.Warnf("No javaTraces traceid=%s, pid=%s", ev.TraceId, ev.PidString)
return
}
pid, _ := strconv.ParseInt(ev.PidString, 10, 64)
spendTime := ev.Timestamp - oldEvent.Timestamp
contentKey := oldEvent.Url
Expand Down
75 changes: 75 additions & 0 deletions collector/pkg/component/analyzer/cpuanalyzer/delete_javatrace.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package cpuanalyzer

import (
"sync"
"time"
)

type javaTraceDeleteQueue struct {
queueMutex sync.Mutex
queue []deleteVal
}

type deleteVal struct {
key string
enterTime time.Time
}

func newJavaTraceDeleteQueue() *javaTraceDeleteQueue {
return &javaTraceDeleteQueue{queue: make([]deleteVal, 0)}
}

func (dq *javaTraceDeleteQueue) GetFront() *deleteVal {
if len(dq.queue) > 0 {
return &dq.queue[0]
}
return nil
}

func (dq *javaTraceDeleteQueue) Push(elem deleteVal) {
dq.queue = append(dq.queue, elem)
}

func (dq *javaTraceDeleteQueue) Pop() {
if len(dq.queue) > 0 {
dq.queue = dq.queue[1:len(dq.queue)]
}
}

func (ca *CpuAnalyzer) JavaTraceDelete(interval time.Duration, expiredDuration time.Duration) {
for {
select {
case <-ca.stopProfileChan:
return
case <-time.After(interval):
ca.telemetry.Logger.Debug("Start regular cleaning of javatrace...")
now := time.Now()
func() {
ca.javaTraceExpiredQueue.queueMutex.Lock()
defer ca.javaTraceExpiredQueue.queueMutex.Unlock()
for {
val := ca.javaTraceExpiredQueue.GetFront()
if val == nil {
break
}
if val.enterTime.Add(expiredDuration).After(now) {
break
}

func() {
ca.jtlock.Lock()
defer ca.jtlock.Unlock()
event := ca.javaTraces[val.key]
if event == nil {
ca.javaTraceExpiredQueue.Pop()
} else {
ca.telemetry.Logger.Debugf("Delete expired javatrace... pid=%s, tid=%s", event.PidString, event.TraceId)
delete(ca.javaTraces, val.key)
ca.javaTraceExpiredQueue.Pop()
}
}()
}
}()
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package cpuanalyzer

import (
"math/rand"
"strconv"
"testing"
"time"

"github.com/Kindling-project/kindling/collector/pkg/component"
)

var cnt int

func TestJavaTraceDeleteQueue(t *testing.T) {

jt := make(map[string]*TransactionIdEvent, 100000)
testTelemetry := component.NewTelemetryManager().GetGlobalTelemetryTools()
mycfg := &Config{SegmentSize: 40, JavaTraceDeleteInterval: 15, JavaTraceExpirationTime: 10}
ca = &CpuAnalyzer{javaTraces: jt, telemetry: testTelemetry, cfg: mycfg}
ca.javaTraceExpiredQueue = newJavaTraceDeleteQueue()
go ca.JavaTraceDelete(1*time.Second, 1*time.Second)

for i := 0; i < 20; i++ {
ev := new(TransactionIdEvent)
ev.TraceId = strconv.Itoa(rand.Intn(10000))
ev.PidString = strconv.Itoa(rand.Intn(10000))
ev.IsEntry = 1
key := ev.TraceId + ev.PidString
ca.javaTraces[key] = ev
val := new(deleteVal)
val.key = ev.TraceId + ev.PidString
val.enterTime = time.Now()
ca.javaTraceExpiredQueue.Push(*val)
t.Logf("pid=%s, tid=%s enter time=%s\n", ev.PidString, ev.TraceId, val.enterTime.Format("2006-01-02 15:04:05.000"))
cnt++
}
time.Sleep(5 * time.Second)

if len(ca.javaTraces) != 0 {
t.Fatalf("The number of javatraces that entering and exiting the queue is not equal! "+
"enterCount=%d , len of javatrace is : %d\n", cnt, len(ca.javaTraces))
} else {
t.Logf("All javatraces have cleaned normally. enterCount=%d\n", cnt)
}
}
8 changes: 7 additions & 1 deletion deploy/agent/kindling-collector-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ analyzers:
# edge_events_window_size is the size of the duration window that seats the edge events.
# The unit is second. The greater it is, the more data will be stored.
edge_events_window_size: 2
# java_trace_delete_interval is the interval for cleaning up expired data in javatraces.
# The unit is seconds.
java_trace_delete_interval: 20
# java_trace_expiration_time is the expiration time for data in javatraces.
# The unit is seconds.
java_trace_expiration_time: 120
tcpconnectanalyzer:
channel_size: 10000
wait_event_second: 10
Expand Down Expand Up @@ -253,4 +259,4 @@ observability:
# Note: DO NOT add the prefix "http://"
endpoint: 10.10.10.10:8080
stdout:
collect_period: 15s
collect_period: 15s

0 comments on commit c9d4476

Please sign in to comment.