Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement the delay queue for exited thread #365

Merged
merged 7 commits into from
Nov 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
-
- Fix the bug that cpuEvent cache size continuously increases even if trace profiling is not enabled.([#362](https://github.com/CloudDectective-Harmonycloud/kindling/pull/362))
- Fix the bug that duplicate CPU events are indexed into Elasticsearch. ([#359](https://github.com/KindlingProject/kindling/pull/359))
- Implement the delay queue for exited thread, so as to avoid losing the data in the period before the thread exits. ([#365](https://github.com/CloudDectective-Harmonycloud/kindling/pull/365))
- Fix the bug of incomplete records when threads arrive at the cpu analyzer for the first time. ([#364](https://github.com/CloudDectective-Harmonycloud/kindling/pull/364))

## v0.5.0 - 2022-11-02
Expand Down
11 changes: 7 additions & 4 deletions collector/pkg/component/analyzer/cpuanalyzer/cpu_analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"strconv"
"sync"
"time"

"github.com/Kindling-project/kindling/collector/pkg/component"
"github.com/Kindling-project/kindling/collector/pkg/component/analyzer"
Expand All @@ -24,8 +25,8 @@ type CpuAnalyzer struct {
sendEventsRoutineMap sync.Map
lock sync.Mutex
telemetry *component.TelemetryTools

nextConsumers []consumer.Consumer
tidExpiredQueue *tidDeleteQueue
nextConsumers []consumer.Consumer
}

func (ca *CpuAnalyzer) Type() analyzer.Type {
Expand All @@ -44,6 +45,8 @@ func NewCpuAnalyzer(cfg interface{}, telemetry *component.TelemetryTools, consum
nextConsumers: consumers,
}
ca.cpuPidEvents = make(map[uint32]map[uint32]*TimeSegments, 100000)
ca.tidExpiredQueue = newTidDeleteQueue()
go ca.TidDelete(30*time.Second, 10*time.Second)
return ca
}

Expand Down Expand Up @@ -243,6 +246,6 @@ func (ca *CpuAnalyzer) trimExitedThread(pid uint32, tid uint32) {
if tidEventsMap == nil {
return
}
ca.telemetry.Logger.Debugf("Receive a procexit pid=%d, tid=%d, which will be deleted from map", pid, tid)
delete(tidEventsMap, tid)
ca.telemetry.Logger.Debugf("Receive a procexit pid=%d, tid=%d, which will be deleted from map after 10 seconds. ", pid, tid)
ca.AddTidToDeleteCache(time.Now(), pid, tid)
}
84 changes: 84 additions & 0 deletions collector/pkg/component/analyzer/cpuanalyzer/delete_tid.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package cpuanalyzer

import (
"sync"
"time"
)

type tidDeleteQueue struct {
queueMutex sync.Mutex
queue []deleteTid
}

type deleteTid struct {
pid uint32
tid uint32
exitTime time.Time
}

func newTidDeleteQueue() *tidDeleteQueue {
return &tidDeleteQueue{queue: make([]deleteTid, 0)}
}

func (dq *tidDeleteQueue) GetFront() *deleteTid {
if len(dq.queue) > 0 {
return &dq.queue[0]
}
return nil
}

func (dq *tidDeleteQueue) Push(elem deleteTid) {
dq.queue = append(dq.queue, elem)
}

func (dq *tidDeleteQueue) Pop() {
if len(dq.queue) > 0 {
dq.queue = dq.queue[1:len(dq.queue)]
}
}

//Add procexit tid
func (ca *CpuAnalyzer) AddTidToDeleteCache(curTime time.Time, pid uint32, tid uint32) {
cacheElem := deleteTid{pid: pid, tid: tid, exitTime: curTime}
ca.tidExpiredQueue.queueMutex.Lock()
defer ca.tidExpiredQueue.queueMutex.Unlock()
ca.tidExpiredQueue.Push(cacheElem)
}

func (ca *CpuAnalyzer) DeleteTid(tidEventsMap map[uint32]*TimeSegments, tid uint32) {
ca.lock.Lock()
defer ca.lock.Unlock()
delete(tidEventsMap, tid)
}

func (ca *CpuAnalyzer) TidDelete(interval time.Duration, expiredDuration time.Duration) {
for {
select {
case <-time.After(interval):
now := time.Now()
func() {
ca.tidExpiredQueue.queueMutex.Lock()
defer ca.tidExpiredQueue.queueMutex.Unlock()
for {
elem := ca.tidExpiredQueue.GetFront()
if elem == nil {
break
}
if elem.exitTime.Add(expiredDuration).Before(now) {
tidEventsMap := ca.cpuPidEvents[elem.pid]
if tidEventsMap == nil {
ca.tidExpiredQueue.Pop()
continue
}
ca.telemetry.Logger.Debugf("Delete expired thread... pid=%d, tid=%d", elem.pid, elem.tid)
//fmt.Printf("Go Test: Delete expired thread... pid=%d, tid=%d\n", elem.pid, elem.tid)
ca.DeleteTid(tidEventsMap, elem.tid)
ca.tidExpiredQueue.Pop()
} else {
break
}
}
}()
}
}
}
107 changes: 107 additions & 0 deletions collector/pkg/component/analyzer/cpuanalyzer/delete_tid_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package cpuanalyzer

import (
"strconv"
"testing"
"time"

"github.com/Kindling-project/kindling/collector/pkg/component"
)

var (
visit []deleteTid
ca *CpuAnalyzer
exitTid map[uint32]int
enterCnt int
exitCnt int
)

const timeDuration time.Duration = 100 * time.Millisecond

func TestDeleteQueue(t *testing.T) {

cpupidEvents := make(map[uint32]map[uint32]*TimeSegments, 100000)
testTelemetry := component.NewTelemetryManager().GetGlobalTelemetryTools()
mycfg := &Config{SegmentSize: 40}
ca = &CpuAnalyzer{cpuPidEvents: cpupidEvents, telemetry: testTelemetry, cfg: mycfg}

ca.tidExpiredQueue = newTidDeleteQueue()

visit = make([]deleteTid, 0)
exitTid = make(map[uint32]int, 0)

go ca.TidDelete(5*timeDuration, 4*timeDuration)
go CheckQueueLoop(t)
for i := 0; i < 20; i++ {

ev := new(CpuEvent)
curTime := time.Now()
ev.EndTime = uint64(curTime.Add(timeDuration).Nanosecond())
ev.StartTime = uint64(curTime.Nanosecond())

//check tid which exist in queue but not in the map
if i%4 != 0 {
ca.PutEventToSegments(uint32(i), uint32(i)+5, "threadname"+strconv.Itoa(i+100), ev)
}

var queueLen int

func() {
ca.tidExpiredQueue.queueMutex.Lock()
defer ca.tidExpiredQueue.queueMutex.Unlock()
queueLen = len(ca.tidExpiredQueue.queue)

cacheElem := deleteTid{uint32(i), uint32(i) + 5, curTime.Add(timeDuration)}
ca.tidExpiredQueue.Push(cacheElem)
visit = append(visit, cacheElem)
if len(ca.tidExpiredQueue.queue) != queueLen+1 {
t.Errorf("the length of queue is not added, expection: %d but: %d\n", queueLen+1, len(ca.tidExpiredQueue.queue))
}
}()

t.Logf("pid=%d, tid=%d enter time=%s\n", uint32(i), uint32(i)+5, curTime.Format("2006-01-02 15:04:05.000"))
enterCnt++
time.Sleep(3 * timeDuration)
}
time.Sleep(10 * timeDuration)

if enterCnt != exitCnt {
t.Fatalf("The number of threads that entering and exiting the queue is not equal! enterCount=%d, exitCount=%d\n", enterCnt, exitCnt)
} else {
t.Logf("All threads have exited normally. enterCount=%d, exitCount=%d\n", enterCnt, exitCnt)
}

}

func CheckQueueLoop(t *testing.T) {
for {
select {
case <-time.After(timeDuration * 3):
func() {
ca.tidExpiredQueue.queueMutex.Lock()
defer ca.tidExpiredQueue.queueMutex.Unlock()
queueLen := len(ca.tidExpiredQueue.queue)
curTime := time.Now()
for i := 0; i < len(visit); i++ {
tmpv := visit[i]
var j int
for j = 0; j < queueLen; j++ {
tmpq := ca.tidExpiredQueue.queue[j]
if tmpv.tid == tmpq.tid {
if curTime.After(tmpq.exitTime.Add(12 * timeDuration)) {
t.Errorf("there is a expired threads that is not deleted. pid=%d, tid=%d, exitTime=%s\n", tmpv.pid, tmpv.tid, tmpv.exitTime.Format("2006-01-02 15:04:05.000"))
}
break
}
}

if _, exist := exitTid[tmpv.tid]; j >= queueLen && !exist {
exitTid[tmpv.tid] = 1
exitCnt++
t.Logf("pid=%d, tid=%d exit time=%s\n", tmpv.pid, tmpv.tid, curTime.Format("2006-01-02 15:04:05.000"))
}
}
}()
}
}
}