diff --git a/CHANGELOG.md b/CHANGELOG.md index 4584a61d0..8cd5799c4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,7 @@ - - Fix the bug that cpuEvent cache size continuously increases even if trace profiling is not enabled.([#362](https://github.com/CloudDectective-Harmonycloud/kindling/pull/362)) - Fix the bug that duplicate CPU events are indexed into Elasticsearch. ([#359](https://github.com/KindlingProject/kindling/pull/359)) +- Implement the delay queue for exited thread, so as to avoid losing the data in the period before the thread exits. ([#365](https://github.com/CloudDectective-Harmonycloud/kindling/pull/365)) - Fix the bug of incomplete records when threads arrive at the cpu analyzer for the first time. ([#364](https://github.com/CloudDectective-Harmonycloud/kindling/pull/364)) ## v0.5.0 - 2022-11-02 diff --git a/collector/pkg/component/analyzer/cpuanalyzer/cpu_analyzer.go b/collector/pkg/component/analyzer/cpuanalyzer/cpu_analyzer.go index 4a780b436..f808240a8 100644 --- a/collector/pkg/component/analyzer/cpuanalyzer/cpu_analyzer.go +++ b/collector/pkg/component/analyzer/cpuanalyzer/cpu_analyzer.go @@ -4,6 +4,7 @@ import ( "fmt" "strconv" "sync" + "time" "github.com/Kindling-project/kindling/collector/pkg/component" "github.com/Kindling-project/kindling/collector/pkg/component/analyzer" @@ -24,8 +25,8 @@ type CpuAnalyzer struct { sendEventsRoutineMap sync.Map lock sync.Mutex telemetry *component.TelemetryTools - - nextConsumers []consumer.Consumer + tidExpiredQueue *tidDeleteQueue + nextConsumers []consumer.Consumer } func (ca *CpuAnalyzer) Type() analyzer.Type { @@ -44,6 +45,8 @@ func NewCpuAnalyzer(cfg interface{}, telemetry *component.TelemetryTools, consum nextConsumers: consumers, } ca.cpuPidEvents = make(map[uint32]map[uint32]*TimeSegments, 100000) + ca.tidExpiredQueue = newTidDeleteQueue() + go ca.TidDelete(30*time.Second, 10*time.Second) return ca } @@ -243,6 +246,6 @@ func (ca *CpuAnalyzer) trimExitedThread(pid uint32, tid uint32) { if tidEventsMap == nil { return } - ca.telemetry.Logger.Debugf("Receive a procexit pid=%d, tid=%d, which will be deleted from map", pid, tid) - delete(tidEventsMap, tid) + ca.telemetry.Logger.Debugf("Receive a procexit pid=%d, tid=%d, which will be deleted from map after 10 seconds. ", pid, tid) + ca.AddTidToDeleteCache(time.Now(), pid, tid) } diff --git a/collector/pkg/component/analyzer/cpuanalyzer/delete_tid.go b/collector/pkg/component/analyzer/cpuanalyzer/delete_tid.go new file mode 100644 index 000000000..25919772d --- /dev/null +++ b/collector/pkg/component/analyzer/cpuanalyzer/delete_tid.go @@ -0,0 +1,84 @@ +package cpuanalyzer + +import ( + "sync" + "time" +) + +type tidDeleteQueue struct { + queueMutex sync.Mutex + queue []deleteTid +} + +type deleteTid struct { + pid uint32 + tid uint32 + exitTime time.Time +} + +func newTidDeleteQueue() *tidDeleteQueue { + return &tidDeleteQueue{queue: make([]deleteTid, 0)} +} + +func (dq *tidDeleteQueue) GetFront() *deleteTid { + if len(dq.queue) > 0 { + return &dq.queue[0] + } + return nil +} + +func (dq *tidDeleteQueue) Push(elem deleteTid) { + dq.queue = append(dq.queue, elem) +} + +func (dq *tidDeleteQueue) Pop() { + if len(dq.queue) > 0 { + dq.queue = dq.queue[1:len(dq.queue)] + } +} + +//Add procexit tid +func (ca *CpuAnalyzer) AddTidToDeleteCache(curTime time.Time, pid uint32, tid uint32) { + cacheElem := deleteTid{pid: pid, tid: tid, exitTime: curTime} + ca.tidExpiredQueue.queueMutex.Lock() + defer ca.tidExpiredQueue.queueMutex.Unlock() + ca.tidExpiredQueue.Push(cacheElem) +} + +func (ca *CpuAnalyzer) DeleteTid(tidEventsMap map[uint32]*TimeSegments, tid uint32) { + ca.lock.Lock() + defer ca.lock.Unlock() + delete(tidEventsMap, tid) +} + +func (ca *CpuAnalyzer) TidDelete(interval time.Duration, expiredDuration time.Duration) { + for { + select { + case <-time.After(interval): + now := time.Now() + func() { + ca.tidExpiredQueue.queueMutex.Lock() + defer ca.tidExpiredQueue.queueMutex.Unlock() + for { + elem := ca.tidExpiredQueue.GetFront() + if elem == nil { + break + } + if elem.exitTime.Add(expiredDuration).Before(now) { + tidEventsMap := ca.cpuPidEvents[elem.pid] + if tidEventsMap == nil { + ca.tidExpiredQueue.Pop() + continue + } + ca.telemetry.Logger.Debugf("Delete expired thread... pid=%d, tid=%d", elem.pid, elem.tid) + //fmt.Printf("Go Test: Delete expired thread... pid=%d, tid=%d\n", elem.pid, elem.tid) + ca.DeleteTid(tidEventsMap, elem.tid) + ca.tidExpiredQueue.Pop() + } else { + break + } + } + }() + } + } +} diff --git a/collector/pkg/component/analyzer/cpuanalyzer/delete_tid_test.go b/collector/pkg/component/analyzer/cpuanalyzer/delete_tid_test.go new file mode 100644 index 000000000..f7bfbd312 --- /dev/null +++ b/collector/pkg/component/analyzer/cpuanalyzer/delete_tid_test.go @@ -0,0 +1,107 @@ +package cpuanalyzer + +import ( + "strconv" + "testing" + "time" + + "github.com/Kindling-project/kindling/collector/pkg/component" +) + +var ( + visit []deleteTid + ca *CpuAnalyzer + exitTid map[uint32]int + enterCnt int + exitCnt int +) + +const timeDuration time.Duration = 100 * time.Millisecond + +func TestDeleteQueue(t *testing.T) { + + cpupidEvents := make(map[uint32]map[uint32]*TimeSegments, 100000) + testTelemetry := component.NewTelemetryManager().GetGlobalTelemetryTools() + mycfg := &Config{SegmentSize: 40} + ca = &CpuAnalyzer{cpuPidEvents: cpupidEvents, telemetry: testTelemetry, cfg: mycfg} + + ca.tidExpiredQueue = newTidDeleteQueue() + + visit = make([]deleteTid, 0) + exitTid = make(map[uint32]int, 0) + + go ca.TidDelete(5*timeDuration, 4*timeDuration) + go CheckQueueLoop(t) + for i := 0; i < 20; i++ { + + ev := new(CpuEvent) + curTime := time.Now() + ev.EndTime = uint64(curTime.Add(timeDuration).Nanosecond()) + ev.StartTime = uint64(curTime.Nanosecond()) + + //check tid which exist in queue but not in the map + if i%4 != 0 { + ca.PutEventToSegments(uint32(i), uint32(i)+5, "threadname"+strconv.Itoa(i+100), ev) + } + + var queueLen int + + func() { + ca.tidExpiredQueue.queueMutex.Lock() + defer ca.tidExpiredQueue.queueMutex.Unlock() + queueLen = len(ca.tidExpiredQueue.queue) + + cacheElem := deleteTid{uint32(i), uint32(i) + 5, curTime.Add(timeDuration)} + ca.tidExpiredQueue.Push(cacheElem) + visit = append(visit, cacheElem) + if len(ca.tidExpiredQueue.queue) != queueLen+1 { + t.Errorf("the length of queue is not added, expection: %d but: %d\n", queueLen+1, len(ca.tidExpiredQueue.queue)) + } + }() + + t.Logf("pid=%d, tid=%d enter time=%s\n", uint32(i), uint32(i)+5, curTime.Format("2006-01-02 15:04:05.000")) + enterCnt++ + time.Sleep(3 * timeDuration) + } + time.Sleep(10 * timeDuration) + + if enterCnt != exitCnt { + t.Fatalf("The number of threads that entering and exiting the queue is not equal! enterCount=%d, exitCount=%d\n", enterCnt, exitCnt) + } else { + t.Logf("All threads have exited normally. enterCount=%d, exitCount=%d\n", enterCnt, exitCnt) + } + +} + +func CheckQueueLoop(t *testing.T) { + for { + select { + case <-time.After(timeDuration * 3): + func() { + ca.tidExpiredQueue.queueMutex.Lock() + defer ca.tidExpiredQueue.queueMutex.Unlock() + queueLen := len(ca.tidExpiredQueue.queue) + curTime := time.Now() + for i := 0; i < len(visit); i++ { + tmpv := visit[i] + var j int + for j = 0; j < queueLen; j++ { + tmpq := ca.tidExpiredQueue.queue[j] + if tmpv.tid == tmpq.tid { + if curTime.After(tmpq.exitTime.Add(12 * timeDuration)) { + t.Errorf("there is a expired threads that is not deleted. pid=%d, tid=%d, exitTime=%s\n", tmpv.pid, tmpv.tid, tmpv.exitTime.Format("2006-01-02 15:04:05.000")) + } + break + } + } + + if _, exist := exitTid[tmpv.tid]; j >= queueLen && !exist { + exitTid[tmpv.tid] = 1 + exitCnt++ + t.Logf("pid=%d, tid=%d exit time=%s\n", tmpv.pid, tmpv.tid, curTime.Format("2006-01-02 15:04:05.000")) + } + } + }() + } + } +}