Skip to content

Commit

Permalink
Implement the delay queue for exited thread (#365)
Browse files Browse the repository at this point in the history
Signed-off-by: yaofighting <siyao@zju.edu.cn>
  • Loading branch information
yaofighting authored Nov 24, 2022
1 parent d4e2652 commit d4e8e64
Show file tree
Hide file tree
Showing 4 changed files with 199 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
-
- Fix the bug that cpuEvent cache size continuously increases even if trace profiling is not enabled.([#362](https://github.com/CloudDectective-Harmonycloud/kindling/pull/362))
- Fix the bug that duplicate CPU events are indexed into Elasticsearch. ([#359](https://github.com/KindlingProject/kindling/pull/359))
- Implement the delay queue for exited thread, so as to avoid losing the data in the period before the thread exits. ([#365](https://github.com/CloudDectective-Harmonycloud/kindling/pull/365))
- Fix the bug of incomplete records when threads arrive at the cpu analyzer for the first time. ([#364](https://github.com/CloudDectective-Harmonycloud/kindling/pull/364))

## v0.5.0 - 2022-11-02
Expand Down
11 changes: 7 additions & 4 deletions collector/pkg/component/analyzer/cpuanalyzer/cpu_analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"strconv"
"sync"
"time"

"github.com/Kindling-project/kindling/collector/pkg/component"
"github.com/Kindling-project/kindling/collector/pkg/component/analyzer"
Expand All @@ -24,8 +25,8 @@ type CpuAnalyzer struct {
sendEventsRoutineMap sync.Map
lock sync.Mutex
telemetry *component.TelemetryTools

nextConsumers []consumer.Consumer
tidExpiredQueue *tidDeleteQueue
nextConsumers []consumer.Consumer
}

func (ca *CpuAnalyzer) Type() analyzer.Type {
Expand All @@ -44,6 +45,8 @@ func NewCpuAnalyzer(cfg interface{}, telemetry *component.TelemetryTools, consum
nextConsumers: consumers,
}
ca.cpuPidEvents = make(map[uint32]map[uint32]*TimeSegments, 100000)
ca.tidExpiredQueue = newTidDeleteQueue()
go ca.TidDelete(30*time.Second, 10*time.Second)
return ca
}

Expand Down Expand Up @@ -243,6 +246,6 @@ func (ca *CpuAnalyzer) trimExitedThread(pid uint32, tid uint32) {
if tidEventsMap == nil {
return
}
ca.telemetry.Logger.Debugf("Receive a procexit pid=%d, tid=%d, which will be deleted from map", pid, tid)
delete(tidEventsMap, tid)
ca.telemetry.Logger.Debugf("Receive a procexit pid=%d, tid=%d, which will be deleted from map after 10 seconds. ", pid, tid)
ca.AddTidToDeleteCache(time.Now(), pid, tid)
}
84 changes: 84 additions & 0 deletions collector/pkg/component/analyzer/cpuanalyzer/delete_tid.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package cpuanalyzer

import (
"sync"
"time"
)

type tidDeleteQueue struct {
queueMutex sync.Mutex
queue []deleteTid
}

type deleteTid struct {
pid uint32
tid uint32
exitTime time.Time
}

func newTidDeleteQueue() *tidDeleteQueue {
return &tidDeleteQueue{queue: make([]deleteTid, 0)}
}

func (dq *tidDeleteQueue) GetFront() *deleteTid {
if len(dq.queue) > 0 {
return &dq.queue[0]
}
return nil
}

func (dq *tidDeleteQueue) Push(elem deleteTid) {
dq.queue = append(dq.queue, elem)
}

func (dq *tidDeleteQueue) Pop() {
if len(dq.queue) > 0 {
dq.queue = dq.queue[1:len(dq.queue)]
}
}

//Add procexit tid
func (ca *CpuAnalyzer) AddTidToDeleteCache(curTime time.Time, pid uint32, tid uint32) {
cacheElem := deleteTid{pid: pid, tid: tid, exitTime: curTime}
ca.tidExpiredQueue.queueMutex.Lock()
defer ca.tidExpiredQueue.queueMutex.Unlock()
ca.tidExpiredQueue.Push(cacheElem)
}

func (ca *CpuAnalyzer) DeleteTid(tidEventsMap map[uint32]*TimeSegments, tid uint32) {
ca.lock.Lock()
defer ca.lock.Unlock()
delete(tidEventsMap, tid)
}

func (ca *CpuAnalyzer) TidDelete(interval time.Duration, expiredDuration time.Duration) {
for {
select {
case <-time.After(interval):
now := time.Now()
func() {
ca.tidExpiredQueue.queueMutex.Lock()
defer ca.tidExpiredQueue.queueMutex.Unlock()
for {
elem := ca.tidExpiredQueue.GetFront()
if elem == nil {
break
}
if elem.exitTime.Add(expiredDuration).Before(now) {
tidEventsMap := ca.cpuPidEvents[elem.pid]
if tidEventsMap == nil {
ca.tidExpiredQueue.Pop()
continue
}
ca.telemetry.Logger.Debugf("Delete expired thread... pid=%d, tid=%d", elem.pid, elem.tid)
//fmt.Printf("Go Test: Delete expired thread... pid=%d, tid=%d\n", elem.pid, elem.tid)
ca.DeleteTid(tidEventsMap, elem.tid)
ca.tidExpiredQueue.Pop()
} else {
break
}
}
}()
}
}
}
107 changes: 107 additions & 0 deletions collector/pkg/component/analyzer/cpuanalyzer/delete_tid_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package cpuanalyzer

import (
"strconv"
"testing"
"time"

"github.com/Kindling-project/kindling/collector/pkg/component"
)

var (
visit []deleteTid
ca *CpuAnalyzer
exitTid map[uint32]int
enterCnt int
exitCnt int
)

const timeDuration time.Duration = 100 * time.Millisecond

func TestDeleteQueue(t *testing.T) {

cpupidEvents := make(map[uint32]map[uint32]*TimeSegments, 100000)
testTelemetry := component.NewTelemetryManager().GetGlobalTelemetryTools()
mycfg := &Config{SegmentSize: 40}
ca = &CpuAnalyzer{cpuPidEvents: cpupidEvents, telemetry: testTelemetry, cfg: mycfg}

ca.tidExpiredQueue = newTidDeleteQueue()

visit = make([]deleteTid, 0)
exitTid = make(map[uint32]int, 0)

go ca.TidDelete(5*timeDuration, 4*timeDuration)
go CheckQueueLoop(t)
for i := 0; i < 20; i++ {

ev := new(CpuEvent)
curTime := time.Now()
ev.EndTime = uint64(curTime.Add(timeDuration).Nanosecond())
ev.StartTime = uint64(curTime.Nanosecond())

//check tid which exist in queue but not in the map
if i%4 != 0 {
ca.PutEventToSegments(uint32(i), uint32(i)+5, "threadname"+strconv.Itoa(i+100), ev)
}

var queueLen int

func() {
ca.tidExpiredQueue.queueMutex.Lock()
defer ca.tidExpiredQueue.queueMutex.Unlock()
queueLen = len(ca.tidExpiredQueue.queue)

cacheElem := deleteTid{uint32(i), uint32(i) + 5, curTime.Add(timeDuration)}
ca.tidExpiredQueue.Push(cacheElem)
visit = append(visit, cacheElem)
if len(ca.tidExpiredQueue.queue) != queueLen+1 {
t.Errorf("the length of queue is not added, expection: %d but: %d\n", queueLen+1, len(ca.tidExpiredQueue.queue))
}
}()

t.Logf("pid=%d, tid=%d enter time=%s\n", uint32(i), uint32(i)+5, curTime.Format("2006-01-02 15:04:05.000"))
enterCnt++
time.Sleep(3 * timeDuration)
}
time.Sleep(10 * timeDuration)

if enterCnt != exitCnt {
t.Fatalf("The number of threads that entering and exiting the queue is not equal! enterCount=%d, exitCount=%d\n", enterCnt, exitCnt)
} else {
t.Logf("All threads have exited normally. enterCount=%d, exitCount=%d\n", enterCnt, exitCnt)
}

}

func CheckQueueLoop(t *testing.T) {
for {
select {
case <-time.After(timeDuration * 3):
func() {
ca.tidExpiredQueue.queueMutex.Lock()
defer ca.tidExpiredQueue.queueMutex.Unlock()
queueLen := len(ca.tidExpiredQueue.queue)
curTime := time.Now()
for i := 0; i < len(visit); i++ {
tmpv := visit[i]
var j int
for j = 0; j < queueLen; j++ {
tmpq := ca.tidExpiredQueue.queue[j]
if tmpv.tid == tmpq.tid {
if curTime.After(tmpq.exitTime.Add(12 * timeDuration)) {
t.Errorf("there is a expired threads that is not deleted. pid=%d, tid=%d, exitTime=%s\n", tmpv.pid, tmpv.tid, tmpv.exitTime.Format("2006-01-02 15:04:05.000"))
}
break
}
}

if _, exist := exitTid[tmpv.tid]; j >= queueLen && !exist {
exitTid[tmpv.tid] = 1
exitCnt++
t.Logf("pid=%d, tid=%d exit time=%s\n", tmpv.pid, tmpv.tid, curTime.Format("2006-01-02 15:04:05.000"))
}
}
}()
}
}
}

0 comments on commit d4e8e64

Please sign in to comment.