Skip to content

Commit

Permalink
Implement the delay queue for exited thread
Browse files Browse the repository at this point in the history
Signed-off-by: yaofighting <siyao@zju.edu.cn>
  • Loading branch information
yaofighting committed Nov 23, 2022
1 parent 9d1ad53 commit 999f552
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 5 deletions.
7 changes: 4 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@
### New features
- Add a new tool: A debug tool for Trace Profiling is provided for developers to troubleshoot problems.([#363](https://github.com/CloudDectective-Harmonycloud/kindling/pull/363))

### Bug fixes
- Implement the delay queue for exited thread, so as to avoid losing the data in the period before the thread exits. ([#365](https://github.com/CloudDectective-Harmonycloud/kindling/pull/365))
- Fix the bug of incomplete records when threads arrive at the cpu analyzer for the first time. ([#364](https://github.com/CloudDectective-Harmonycloud/kindling/pull/364))

## v0.5.0 - 2022-11-02
### New features
- Add a new feature: Trace Profiling. See more details about it on our [website](http://kindling.harmonycloud.cn). ([#335](https://github.com/CloudDectective-Harmonycloud/kindling/pull/335))

### Bug fixes
- Fix the bug of incomplete records when threads arrive at the cpu analyzer for the first time

### Enhancements
- Add request and response payload of `Redis` protocol message to `Span` data. ([#325](https://github.com/CloudDectective-Harmonycloud/kindling/pull/325))

Expand Down
7 changes: 5 additions & 2 deletions collector/pkg/component/analyzer/cpuanalyzer/cpu_analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"strconv"
"sync"
"time"

"github.com/Kindling-project/kindling/collector/pkg/component"
"github.com/Kindling-project/kindling/collector/pkg/component/analyzer"
Expand Down Expand Up @@ -44,6 +45,8 @@ func NewCpuAnalyzer(cfg interface{}, telemetry *component.TelemetryTools, consum
nextConsumers: consumers,
}
ca.cpuPidEvents = make(map[uint32]map[uint32]*TimeSegments, 100000)
InitTidDeleteQueue()
go ca.TidDelete(20*time.Second, 10*time.Second)
return ca
}

Expand Down Expand Up @@ -243,6 +246,6 @@ func (ca *CpuAnalyzer) trimExitedThread(pid uint32, tid uint32) {
if tidEventsMap == nil {
return
}
ca.telemetry.Logger.Debugf("Receive a procexit pid=%d, tid=%d, which will be deleted from map", pid, tid)
delete(tidEventsMap, tid)
ca.telemetry.Logger.Debugf("Receive a procexit pid=%d, tid=%d, which will be deleted from map after 10 seconds. ", pid, tid)
ca.AddTidToDeleteCache(time.Now(), pid, tid)
}
76 changes: 76 additions & 0 deletions collector/pkg/component/analyzer/cpuanalyzer/delete_tid.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package cpuanalyzer

import (
"sync"
"time"
)

type tidDeleteQueue struct {
queueMutex sync.Mutex
queue []deleteTid
}

var tidExpiredQueue *tidDeleteQueue

type deleteTid struct {
pid uint32
tid uint32
exitTime time.Time
}

func (dq *tidDeleteQueue) GetFront() *deleteTid {
if len(dq.queue) > 0 {
return &dq.queue[0]
}
return nil
}

func (dq *tidDeleteQueue) Push(elem deleteTid) {
dq.queue = append(dq.queue, elem)
}

func (dq *tidDeleteQueue) Pop() {
if len(dq.queue) > 0 {
dq.queue = dq.queue[1:len(dq.queue)]
}
}

func InitTidDeleteQueue() {
tidExpiredQueue = &tidDeleteQueue{queue: make([]deleteTid, 0)}
}

//Add procexit tid
func (ca *CpuAnalyzer) AddTidToDeleteCache(curTime time.Time, pid uint32, tid uint32) {
defer tidExpiredQueue.queueMutex.Unlock()
cacheElem := deleteTid{pid: pid, tid: tid, exitTime: curTime}
tidExpiredQueue.queueMutex.Lock()
tidExpiredQueue.Push(cacheElem)
}

func (ca *CpuAnalyzer) TidDelete(interval time.Duration, expiredDuration time.Duration) {
for {
select {
case <-time.After(interval):
now := time.Now()
tidExpiredQueue.queueMutex.Lock()
for {
elem := tidExpiredQueue.GetFront()
if elem == nil {
break
}
if elem.exitTime.Add(expiredDuration).Before(now) {
tidEventsMap := ca.cpuPidEvents[elem.pid]
if tidEventsMap == nil {
continue
}
ca.telemetry.Logger.Debugf("Delete expired thread... pid=%d, tid=%d", elem.pid, elem.tid)
delete(tidEventsMap, elem.tid)
tidExpiredQueue.Pop()
} else {
break
}
}
tidExpiredQueue.queueMutex.Unlock()
}
}
}

0 comments on commit 999f552

Please sign in to comment.