Skip to content

Commit

Permalink
Implement the exited thread queue
Browse files Browse the repository at this point in the history
Signed-off-by: yaofighting <siyao@zju.edu.cn>
  • Loading branch information
yaofighting committed Nov 23, 2022
1 parent 7fee60e commit 2fb0620
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 7 deletions.
4 changes: 1 addition & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,13 @@
- Add a new tool: A debug tool for Trace Profiling is provided for developers to troubleshoot problems.([#363](https://github.com/CloudDectective-Harmonycloud/kindling/pull/363))

### Bug fixes
- Implement the delay queue for exited thread, so as to avoid losing the data in the period before the thread exits. ([#365](https://github.com/CloudDectective-Harmonycloud/kindling/pull/365))
- Fix the bug of incomplete records when threads arrive at the cpu analyzer for the first time. ([#364](https://github.com/CloudDectective-Harmonycloud/kindling/pull/364))

## v0.5.0 - 2022-11-02
### New features
- Add a new feature: Trace Profiling. See more details about it on our [website](http://kindling.harmonycloud.cn). ([#335](https://github.com/CloudDectective-Harmonycloud/kindling/pull/335))

### Bug fixes
- Fix the bug of incomplete records when threads arrive at the cpu analyzer for the first time

### Enhancements
- Add request and response payload of `Redis` protocol message to `Span` data. ([#325](https://github.com/CloudDectective-Harmonycloud/kindling/pull/325))

Expand Down
11 changes: 7 additions & 4 deletions collector/pkg/component/analyzer/cpuanalyzer/cpu_analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"strconv"
"sync"
"time"

"github.com/Kindling-project/kindling/collector/pkg/component"
"github.com/Kindling-project/kindling/collector/pkg/component/analyzer"
Expand All @@ -24,8 +25,8 @@ type CpuAnalyzer struct {
sendEventsRoutineMap sync.Map
lock sync.Mutex
telemetry *component.TelemetryTools

nextConsumers []consumer.Consumer
tidExpiredQueue *tidDeleteQueue
nextConsumers []consumer.Consumer
}

func (ca *CpuAnalyzer) Type() analyzer.Type {
Expand All @@ -44,6 +45,8 @@ func NewCpuAnalyzer(cfg interface{}, telemetry *component.TelemetryTools, consum
nextConsumers: consumers,
}
ca.cpuPidEvents = make(map[uint32]map[uint32]*TimeSegments, 100000)
ca.InitTidDeleteQueue()
go ca.TidDelete(20*time.Second, 10*time.Second)
return ca
}

Expand Down Expand Up @@ -243,6 +246,6 @@ func (ca *CpuAnalyzer) trimExitedThread(pid uint32, tid uint32) {
if tidEventsMap == nil {
return
}
ca.telemetry.Logger.Debugf("Receive a procexit pid=%d, tid=%d, which will be deleted from map", pid, tid)
delete(tidEventsMap, tid)
ca.telemetry.Logger.Debugf("Receive a procexit pid=%d, tid=%d, which will be deleted from map after 10 seconds. ", pid, tid)
ca.AddTidToDeleteCache(time.Now(), pid, tid)
}
74 changes: 74 additions & 0 deletions collector/pkg/component/analyzer/cpuanalyzer/delete_tid.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package cpuanalyzer

import (
"sync"
"time"
)

type tidDeleteQueue struct {
queueMutex sync.Mutex
queue []deleteTid
}

type deleteTid struct {
pid uint32
tid uint32
exitTime time.Time
}

func (dq *tidDeleteQueue) GetFront() *deleteTid {
if len(dq.queue) > 0 {
return &dq.queue[0]
}
return nil
}

func (dq *tidDeleteQueue) Push(elem deleteTid) {
dq.queue = append(dq.queue, elem)
}

func (dq *tidDeleteQueue) Pop() {
if len(dq.queue) > 0 {
dq.queue = dq.queue[1:len(dq.queue)]
}
}

func (ca *CpuAnalyzer) InitTidDeleteQueue() {
ca.tidExpiredQueue = &tidDeleteQueue{queue: make([]deleteTid, 0)}
}

//Add procexit tid
func (ca *CpuAnalyzer) AddTidToDeleteCache(curTime time.Time, pid uint32, tid uint32) {
defer ca.tidExpiredQueue.queueMutex.Unlock()
cacheElem := deleteTid{pid: pid, tid: tid, exitTime: curTime}
ca.tidExpiredQueue.queueMutex.Lock()
ca.tidExpiredQueue.Push(cacheElem)
}

func (ca *CpuAnalyzer) TidDelete(interval time.Duration, expiredDuration time.Duration) {
for {
select {
case <-time.After(interval):
now := time.Now()
ca.tidExpiredQueue.queueMutex.Lock()
for {
elem := ca.tidExpiredQueue.GetFront()
if elem == nil {
break
}
if elem.exitTime.Add(expiredDuration).Before(now) {
tidEventsMap := ca.cpuPidEvents[elem.pid]
if tidEventsMap == nil {
continue
}
ca.telemetry.Logger.Debugf("Delete expired thread... pid=%d, tid=%d", elem.pid, elem.tid)
delete(tidEventsMap, elem.tid)
ca.tidExpiredQueue.Pop()
} else {
break
}
}
ca.tidExpiredQueue.queueMutex.Unlock()
}
}
}

0 comments on commit 2fb0620

Please sign in to comment.