From 7fee60e0f1173515143728045d912cc0be806727 Mon Sep 17 00:00:00 2001 From: yaofighting Date: Tue, 22 Nov 2022 16:41:20 +0800 Subject: [PATCH 1/6] Fix the bug of incomplete records when threads arrive at the cpu analyzer for the first time Signed-off-by: yaofighting --- CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1bd3b7fab..a9de272b1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,9 @@ ### New features - Add a new feature: Trace Profiling. See more details about it on our [website](http://kindling.harmonycloud.cn). ([#335](https://github.com/CloudDectective-Harmonycloud/kindling/pull/335)) +### Bug fixes +- Fix the bug of incomplete records when threads arrive at the cpu analyzer for the first time + ### Enhancements - Add request and response payload of `Redis` protocol message to `Span` data. ([#325](https://github.com/CloudDectective-Harmonycloud/kindling/pull/325)) From 2fb0620d0b121fe71f033fa329873415510d1660 Mon Sep 17 00:00:00 2001 From: yaofighting Date: Wed, 23 Nov 2022 12:05:49 +0800 Subject: [PATCH 2/6] Implement the exited thread queue Signed-off-by: yaofighting --- CHANGELOG.md | 4 +- .../analyzer/cpuanalyzer/cpu_analyzer.go | 11 ++- .../analyzer/cpuanalyzer/delete_tid.go | 74 +++++++++++++++++++ 3 files changed, 82 insertions(+), 7 deletions(-) create mode 100644 collector/pkg/component/analyzer/cpuanalyzer/delete_tid.go diff --git a/CHANGELOG.md b/CHANGELOG.md index a9de272b1..447c2fe2c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,15 +8,13 @@ - Add a new tool: A debug tool for Trace Profiling is provided for developers to troubleshoot problems.([#363](https://github.com/CloudDectective-Harmonycloud/kindling/pull/363)) ### Bug fixes +- Implement the delay queue for exited thread, so as to avoid losing the data in the period before the thread exits. ([#365](https://github.com/CloudDectective-Harmonycloud/kindling/pull/365)) - Fix the bug of incomplete records when threads arrive at the cpu analyzer for the first time. ([#364](https://github.com/CloudDectective-Harmonycloud/kindling/pull/364)) ## v0.5.0 - 2022-11-02 ### New features - Add a new feature: Trace Profiling. See more details about it on our [website](http://kindling.harmonycloud.cn). ([#335](https://github.com/CloudDectective-Harmonycloud/kindling/pull/335)) -### Bug fixes -- Fix the bug of incomplete records when threads arrive at the cpu analyzer for the first time - ### Enhancements - Add request and response payload of `Redis` protocol message to `Span` data. ([#325](https://github.com/CloudDectective-Harmonycloud/kindling/pull/325)) diff --git a/collector/pkg/component/analyzer/cpuanalyzer/cpu_analyzer.go b/collector/pkg/component/analyzer/cpuanalyzer/cpu_analyzer.go index 4a780b436..2ecdb80a4 100644 --- a/collector/pkg/component/analyzer/cpuanalyzer/cpu_analyzer.go +++ b/collector/pkg/component/analyzer/cpuanalyzer/cpu_analyzer.go @@ -4,6 +4,7 @@ import ( "fmt" "strconv" "sync" + "time" "github.com/Kindling-project/kindling/collector/pkg/component" "github.com/Kindling-project/kindling/collector/pkg/component/analyzer" @@ -24,8 +25,8 @@ type CpuAnalyzer struct { sendEventsRoutineMap sync.Map lock sync.Mutex telemetry *component.TelemetryTools - - nextConsumers []consumer.Consumer + tidExpiredQueue *tidDeleteQueue + nextConsumers []consumer.Consumer } func (ca *CpuAnalyzer) Type() analyzer.Type { @@ -44,6 +45,8 @@ func NewCpuAnalyzer(cfg interface{}, telemetry *component.TelemetryTools, consum nextConsumers: consumers, } ca.cpuPidEvents = make(map[uint32]map[uint32]*TimeSegments, 100000) + ca.InitTidDeleteQueue() + go ca.TidDelete(20*time.Second, 10*time.Second) return ca } @@ -243,6 +246,6 @@ func (ca *CpuAnalyzer) trimExitedThread(pid uint32, tid uint32) { if tidEventsMap == nil { return } - ca.telemetry.Logger.Debugf("Receive a procexit pid=%d, tid=%d, which will be deleted from map", pid, tid) - delete(tidEventsMap, tid) + ca.telemetry.Logger.Debugf("Receive a procexit pid=%d, tid=%d, which will be deleted from map after 10 seconds. ", pid, tid) + ca.AddTidToDeleteCache(time.Now(), pid, tid) } diff --git a/collector/pkg/component/analyzer/cpuanalyzer/delete_tid.go b/collector/pkg/component/analyzer/cpuanalyzer/delete_tid.go new file mode 100644 index 000000000..bf9508016 --- /dev/null +++ b/collector/pkg/component/analyzer/cpuanalyzer/delete_tid.go @@ -0,0 +1,74 @@ +package cpuanalyzer + +import ( + "sync" + "time" +) + +type tidDeleteQueue struct { + queueMutex sync.Mutex + queue []deleteTid +} + +type deleteTid struct { + pid uint32 + tid uint32 + exitTime time.Time +} + +func (dq *tidDeleteQueue) GetFront() *deleteTid { + if len(dq.queue) > 0 { + return &dq.queue[0] + } + return nil +} + +func (dq *tidDeleteQueue) Push(elem deleteTid) { + dq.queue = append(dq.queue, elem) +} + +func (dq *tidDeleteQueue) Pop() { + if len(dq.queue) > 0 { + dq.queue = dq.queue[1:len(dq.queue)] + } +} + +func (ca *CpuAnalyzer) InitTidDeleteQueue() { + ca.tidExpiredQueue = &tidDeleteQueue{queue: make([]deleteTid, 0)} +} + +//Add procexit tid +func (ca *CpuAnalyzer) AddTidToDeleteCache(curTime time.Time, pid uint32, tid uint32) { + defer ca.tidExpiredQueue.queueMutex.Unlock() + cacheElem := deleteTid{pid: pid, tid: tid, exitTime: curTime} + ca.tidExpiredQueue.queueMutex.Lock() + ca.tidExpiredQueue.Push(cacheElem) +} + +func (ca *CpuAnalyzer) TidDelete(interval time.Duration, expiredDuration time.Duration) { + for { + select { + case <-time.After(interval): + now := time.Now() + ca.tidExpiredQueue.queueMutex.Lock() + for { + elem := ca.tidExpiredQueue.GetFront() + if elem == nil { + break + } + if elem.exitTime.Add(expiredDuration).Before(now) { + tidEventsMap := ca.cpuPidEvents[elem.pid] + if tidEventsMap == nil { + continue + } + ca.telemetry.Logger.Debugf("Delete expired thread... pid=%d, tid=%d", elem.pid, elem.tid) + delete(tidEventsMap, elem.tid) + ca.tidExpiredQueue.Pop() + } else { + break + } + } + ca.tidExpiredQueue.queueMutex.Unlock() + } + } +} From e143df88b1b118f07b95027d4159394589b6ff24 Mon Sep 17 00:00:00 2001 From: yaofighting Date: Wed, 23 Nov 2022 14:32:54 +0800 Subject: [PATCH 3/6] adjust code style Signed-off-by: yaofighting --- .../pkg/component/analyzer/cpuanalyzer/cpu_analyzer.go | 4 ++-- .../pkg/component/analyzer/cpuanalyzer/delete_tid.go | 10 +++++----- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/collector/pkg/component/analyzer/cpuanalyzer/cpu_analyzer.go b/collector/pkg/component/analyzer/cpuanalyzer/cpu_analyzer.go index 2ecdb80a4..f808240a8 100644 --- a/collector/pkg/component/analyzer/cpuanalyzer/cpu_analyzer.go +++ b/collector/pkg/component/analyzer/cpuanalyzer/cpu_analyzer.go @@ -45,8 +45,8 @@ func NewCpuAnalyzer(cfg interface{}, telemetry *component.TelemetryTools, consum nextConsumers: consumers, } ca.cpuPidEvents = make(map[uint32]map[uint32]*TimeSegments, 100000) - ca.InitTidDeleteQueue() - go ca.TidDelete(20*time.Second, 10*time.Second) + ca.tidExpiredQueue = newTidDeleteQueue() + go ca.TidDelete(30*time.Second, 10*time.Second) return ca } diff --git a/collector/pkg/component/analyzer/cpuanalyzer/delete_tid.go b/collector/pkg/component/analyzer/cpuanalyzer/delete_tid.go index bf9508016..13751a98c 100644 --- a/collector/pkg/component/analyzer/cpuanalyzer/delete_tid.go +++ b/collector/pkg/component/analyzer/cpuanalyzer/delete_tid.go @@ -16,6 +16,10 @@ type deleteTid struct { exitTime time.Time } +func newTidDeleteQueue() *tidDeleteQueue { + return &tidDeleteQueue{queue: make([]deleteTid, 0)} +} + func (dq *tidDeleteQueue) GetFront() *deleteTid { if len(dq.queue) > 0 { return &dq.queue[0] @@ -33,15 +37,11 @@ func (dq *tidDeleteQueue) Pop() { } } -func (ca *CpuAnalyzer) InitTidDeleteQueue() { - ca.tidExpiredQueue = &tidDeleteQueue{queue: make([]deleteTid, 0)} -} - //Add procexit tid func (ca *CpuAnalyzer) AddTidToDeleteCache(curTime time.Time, pid uint32, tid uint32) { - defer ca.tidExpiredQueue.queueMutex.Unlock() cacheElem := deleteTid{pid: pid, tid: tid, exitTime: curTime} ca.tidExpiredQueue.queueMutex.Lock() + defer ca.tidExpiredQueue.queueMutex.Unlock() ca.tidExpiredQueue.Push(cacheElem) } From 5a0fa8773faa970537ac84956531eb55269a2bce Mon Sep 17 00:00:00 2001 From: yaofighting Date: Wed, 23 Nov 2022 16:01:40 +0800 Subject: [PATCH 4/6] add unit test and fix the deadcycle Signed-off-by: yaofighting --- .../analyzer/cpuanalyzer/delete_tid.go | 44 ++++++++++++------- .../analyzer/cpuanalyzer/delete_tid_test.go | 22 ++++++++++ 2 files changed, 49 insertions(+), 17 deletions(-) create mode 100644 collector/pkg/component/analyzer/cpuanalyzer/delete_tid_test.go diff --git a/collector/pkg/component/analyzer/cpuanalyzer/delete_tid.go b/collector/pkg/component/analyzer/cpuanalyzer/delete_tid.go index 13751a98c..297d1e2b2 100644 --- a/collector/pkg/component/analyzer/cpuanalyzer/delete_tid.go +++ b/collector/pkg/component/analyzer/cpuanalyzer/delete_tid.go @@ -45,30 +45,40 @@ func (ca *CpuAnalyzer) AddTidToDeleteCache(curTime time.Time, pid uint32, tid ui ca.tidExpiredQueue.Push(cacheElem) } +func (ca *CpuAnalyzer) DeleteTid(tidEventsMap map[uint32]*TimeSegments, tid uint32) { + ca.lock.Lock() + defer ca.lock.Unlock() + delete(tidEventsMap, tid) +} + func (ca *CpuAnalyzer) TidDelete(interval time.Duration, expiredDuration time.Duration) { for { select { case <-time.After(interval): now := time.Now() - ca.tidExpiredQueue.queueMutex.Lock() - for { - elem := ca.tidExpiredQueue.GetFront() - if elem == nil { - break - } - if elem.exitTime.Add(expiredDuration).Before(now) { - tidEventsMap := ca.cpuPidEvents[elem.pid] - if tidEventsMap == nil { - continue + func() { + ca.tidExpiredQueue.queueMutex.Lock() + defer ca.tidExpiredQueue.queueMutex.Unlock() + for { + elem := ca.tidExpiredQueue.GetFront() + if elem == nil { + break + } + if elem.exitTime.Add(expiredDuration).Before(now) { + tidEventsMap := ca.cpuPidEvents[elem.pid] + if tidEventsMap == nil { + ca.tidExpiredQueue.Pop() + continue + } + ca.telemetry.Logger.Debugf("Delete expired thread... pid=%d, tid=%d", elem.pid, elem.tid) + + ca.DeleteTid(tidEventsMap, elem.tid) + ca.tidExpiredQueue.Pop() + } else { + break } - ca.telemetry.Logger.Debugf("Delete expired thread... pid=%d, tid=%d", elem.pid, elem.tid) - delete(tidEventsMap, elem.tid) - ca.tidExpiredQueue.Pop() - } else { - break } - } - ca.tidExpiredQueue.queueMutex.Unlock() + }() } } } diff --git a/collector/pkg/component/analyzer/cpuanalyzer/delete_tid_test.go b/collector/pkg/component/analyzer/cpuanalyzer/delete_tid_test.go new file mode 100644 index 000000000..77b4b0007 --- /dev/null +++ b/collector/pkg/component/analyzer/cpuanalyzer/delete_tid_test.go @@ -0,0 +1,22 @@ +package cpuanalyzer + +import ( + "testing" + "time" +) + +func TestDeleteQueue(t *testing.T) { + + cpupidEvents := make(map[uint32]map[uint32]*TimeSegments, 100000) + ca := &CpuAnalyzer{cpuPidEvents: cpupidEvents} + + ca.tidExpiredQueue = newTidDeleteQueue() + + go ca.TidDelete(3*time.Second, 4*time.Second) + for i := 0; i < 10; i++ { + ca.AddTidToDeleteCache(time.Now(), uint32(i), uint32(i)+5) + t.Logf("pid=%d, tid=%d enter\n", uint32(i), uint32(i)+5) + time.Sleep(1 * time.Second) + } + +} From 738b7b09f5abcda9dfa47b60805d96a7bf371dc1 Mon Sep 17 00:00:00 2001 From: yaofighting Date: Thu, 24 Nov 2022 13:28:04 +0800 Subject: [PATCH 5/6] Improved unit testing for exiting thread delay queues. Signed-off-by: yaofighting --- .../analyzer/cpuanalyzer/delete_tid.go | 2 +- .../analyzer/cpuanalyzer/delete_tid_test.go | 127 +++++++++++++++++- 2 files changed, 122 insertions(+), 7 deletions(-) diff --git a/collector/pkg/component/analyzer/cpuanalyzer/delete_tid.go b/collector/pkg/component/analyzer/cpuanalyzer/delete_tid.go index 297d1e2b2..25919772d 100644 --- a/collector/pkg/component/analyzer/cpuanalyzer/delete_tid.go +++ b/collector/pkg/component/analyzer/cpuanalyzer/delete_tid.go @@ -71,7 +71,7 @@ func (ca *CpuAnalyzer) TidDelete(interval time.Duration, expiredDuration time.Du continue } ca.telemetry.Logger.Debugf("Delete expired thread... pid=%d, tid=%d", elem.pid, elem.tid) - + //fmt.Printf("Go Test: Delete expired thread... pid=%d, tid=%d\n", elem.pid, elem.tid) ca.DeleteTid(tidEventsMap, elem.tid) ca.tidExpiredQueue.Pop() } else { diff --git a/collector/pkg/component/analyzer/cpuanalyzer/delete_tid_test.go b/collector/pkg/component/analyzer/cpuanalyzer/delete_tid_test.go index 77b4b0007..5787b5967 100644 --- a/collector/pkg/component/analyzer/cpuanalyzer/delete_tid_test.go +++ b/collector/pkg/component/analyzer/cpuanalyzer/delete_tid_test.go @@ -1,22 +1,137 @@ package cpuanalyzer import ( + "strconv" "testing" "time" + + "github.com/Kindling-project/kindling/collector/pkg/component" +) + +var ( + visit []deleteTid + ca *CpuAnalyzer + exitTid map[uint32]int + enterCnt int + exitCnt int ) func TestDeleteQueue(t *testing.T) { cpupidEvents := make(map[uint32]map[uint32]*TimeSegments, 100000) - ca := &CpuAnalyzer{cpuPidEvents: cpupidEvents} + testTelemetry := component.NewTelemetryManager().GetGlobalTelemetryTools() + ca = &CpuAnalyzer{cpuPidEvents: cpupidEvents, telemetry: testTelemetry} ca.tidExpiredQueue = newTidDeleteQueue() - go ca.TidDelete(3*time.Second, 4*time.Second) - for i := 0; i < 10; i++ { - ca.AddTidToDeleteCache(time.Now(), uint32(i), uint32(i)+5) - t.Logf("pid=%d, tid=%d enter\n", uint32(i), uint32(i)+5) - time.Sleep(1 * time.Second) + visit = make([]deleteTid, 0) + exitTid = make(map[uint32]int, 0) + + go ca.TidDelete(5*time.Second, 4*time.Second) + go CheckQueueLoop(t) + for i := 0; i < 20; i++ { + + ev := new(CpuEvent) + curTime := time.Now() + ev.EndTime = uint64(curTime.Add(time.Second).Nanosecond()) + ev.StartTime = uint64(curTime.Nanosecond()) + + //check tid which exist in queue but not in the map + if i%4 != 0 { + PutElemToMap(uint32(i), uint32(i)+5, "threadname"+strconv.Itoa(i+100), ev) + } + + var queueLen int + + func() { + ca.tidExpiredQueue.queueMutex.Lock() + defer ca.tidExpiredQueue.queueMutex.Unlock() + queueLen = len(ca.tidExpiredQueue.queue) + + cacheElem := deleteTid{uint32(i), uint32(i) + 5, curTime.Add(time.Second)} + ca.tidExpiredQueue.Push(cacheElem) + visit = append(visit, cacheElem) + if len(ca.tidExpiredQueue.queue) != queueLen+1 { + t.Errorf("the length of queue is not added, expection: %d but: %d\n", queueLen+1, len(ca.tidExpiredQueue.queue)) + } + }() + + t.Logf("pid=%d, tid=%d enter time=%s\n", uint32(i), uint32(i)+5, curTime.Format("2006-01-02 15:04:05")) + enterCnt++ + time.Sleep(3 * time.Second) + } + time.Sleep(10 * time.Second) + + if enterCnt != exitCnt { + t.Fatalf("The number of threads that entering and exiting the queue is not equal! enterCount=%d, exitCount=%d\n", enterCnt, exitCnt) + } else { + t.Logf("All threads have exited normally. enterCount=%d, exitCount=%d\n", enterCnt, exitCnt) + } + +} + +func CheckQueueLoop(t *testing.T) { + for { + select { + case <-time.After(time.Second * 3): + func() { + ca.tidExpiredQueue.queueMutex.Lock() + defer ca.tidExpiredQueue.queueMutex.Unlock() + queueLen := len(ca.tidExpiredQueue.queue) + curTime := time.Now() + for i := 0; i < len(visit); i++ { + tmpv := visit[i] + var j int + for j = 0; j < queueLen; j++ { + tmpq := ca.tidExpiredQueue.queue[j] + if tmpv.tid == tmpq.tid { + if curTime.After(tmpq.exitTime.Add(12 * time.Second)) { + t.Errorf("there is a expired threads that is not deleted. pid=%d, tid=%d, exitTime=%s\n", tmpv.pid, tmpv.tid, tmpv.exitTime.Format("2006-01-02 15:04:05")) + } + break + } + } + + if _, exist := exitTid[tmpv.tid]; j >= queueLen && !exist { + exitTid[tmpv.tid] = 1 + exitCnt++ + t.Logf("pid=%d, tid=%d exit time=%s\n", tmpv.pid, tmpv.tid, curTime.Format("2006-01-02 15:04:05")) + } + } + }() + } + } +} + +func PutElemToMap(pid uint32, tid uint32, threadName string, event TimedEvent) { + + tidCpuEvents, exist := ca.cpuPidEvents[pid] + if !exist { + tidCpuEvents = make(map[uint32]*TimeSegments) + ca.cpuPidEvents[pid] = tidCpuEvents + } + + newTimeSegments := &TimeSegments{ + Pid: pid, + Tid: tid, + BaseTime: event.StartTimestamp() / nanoToSeconds, + Segments: NewCircleQueue(40), + } + for i := 0; i < 40; i++ { + segment := newSegment(pid, tid, threadName, + (newTimeSegments.BaseTime+uint64(i))*nanoToSeconds, + (newTimeSegments.BaseTime+uint64(i+1))*nanoToSeconds) + newTimeSegments.Segments.UpdateByIndex(i, segment) + } + + endOffset := int(event.EndTimestamp()/nanoToSeconds - newTimeSegments.BaseTime) + + for i := 0; i <= endOffset && i < 40; i++ { + val := newTimeSegments.Segments.GetByIndex(i) + segment := val.(*Segment) + segment.putTimedEvent(event) + segment.IsSend = 0 } + tidCpuEvents[tid] = newTimeSegments } From 7f694e49390dd35d13c247ff2433a919dcdfeb0b Mon Sep 17 00:00:00 2001 From: yaofighting Date: Thu, 24 Nov 2022 17:36:13 +0800 Subject: [PATCH 6/6] update the delete_tid_test.go and decrease the test time. Signed-off-by: yaofighting --- .../analyzer/cpuanalyzer/delete_tid_test.go | 60 +++++-------------- 1 file changed, 15 insertions(+), 45 deletions(-) diff --git a/collector/pkg/component/analyzer/cpuanalyzer/delete_tid_test.go b/collector/pkg/component/analyzer/cpuanalyzer/delete_tid_test.go index 5787b5967..f7bfbd312 100644 --- a/collector/pkg/component/analyzer/cpuanalyzer/delete_tid_test.go +++ b/collector/pkg/component/analyzer/cpuanalyzer/delete_tid_test.go @@ -16,29 +16,32 @@ var ( exitCnt int ) +const timeDuration time.Duration = 100 * time.Millisecond + func TestDeleteQueue(t *testing.T) { cpupidEvents := make(map[uint32]map[uint32]*TimeSegments, 100000) testTelemetry := component.NewTelemetryManager().GetGlobalTelemetryTools() - ca = &CpuAnalyzer{cpuPidEvents: cpupidEvents, telemetry: testTelemetry} + mycfg := &Config{SegmentSize: 40} + ca = &CpuAnalyzer{cpuPidEvents: cpupidEvents, telemetry: testTelemetry, cfg: mycfg} ca.tidExpiredQueue = newTidDeleteQueue() visit = make([]deleteTid, 0) exitTid = make(map[uint32]int, 0) - go ca.TidDelete(5*time.Second, 4*time.Second) + go ca.TidDelete(5*timeDuration, 4*timeDuration) go CheckQueueLoop(t) for i := 0; i < 20; i++ { ev := new(CpuEvent) curTime := time.Now() - ev.EndTime = uint64(curTime.Add(time.Second).Nanosecond()) + ev.EndTime = uint64(curTime.Add(timeDuration).Nanosecond()) ev.StartTime = uint64(curTime.Nanosecond()) //check tid which exist in queue but not in the map if i%4 != 0 { - PutElemToMap(uint32(i), uint32(i)+5, "threadname"+strconv.Itoa(i+100), ev) + ca.PutEventToSegments(uint32(i), uint32(i)+5, "threadname"+strconv.Itoa(i+100), ev) } var queueLen int @@ -48,7 +51,7 @@ func TestDeleteQueue(t *testing.T) { defer ca.tidExpiredQueue.queueMutex.Unlock() queueLen = len(ca.tidExpiredQueue.queue) - cacheElem := deleteTid{uint32(i), uint32(i) + 5, curTime.Add(time.Second)} + cacheElem := deleteTid{uint32(i), uint32(i) + 5, curTime.Add(timeDuration)} ca.tidExpiredQueue.Push(cacheElem) visit = append(visit, cacheElem) if len(ca.tidExpiredQueue.queue) != queueLen+1 { @@ -56,11 +59,11 @@ func TestDeleteQueue(t *testing.T) { } }() - t.Logf("pid=%d, tid=%d enter time=%s\n", uint32(i), uint32(i)+5, curTime.Format("2006-01-02 15:04:05")) + t.Logf("pid=%d, tid=%d enter time=%s\n", uint32(i), uint32(i)+5, curTime.Format("2006-01-02 15:04:05.000")) enterCnt++ - time.Sleep(3 * time.Second) + time.Sleep(3 * timeDuration) } - time.Sleep(10 * time.Second) + time.Sleep(10 * timeDuration) if enterCnt != exitCnt { t.Fatalf("The number of threads that entering and exiting the queue is not equal! enterCount=%d, exitCount=%d\n", enterCnt, exitCnt) @@ -73,7 +76,7 @@ func TestDeleteQueue(t *testing.T) { func CheckQueueLoop(t *testing.T) { for { select { - case <-time.After(time.Second * 3): + case <-time.After(timeDuration * 3): func() { ca.tidExpiredQueue.queueMutex.Lock() defer ca.tidExpiredQueue.queueMutex.Unlock() @@ -85,8 +88,8 @@ func CheckQueueLoop(t *testing.T) { for j = 0; j < queueLen; j++ { tmpq := ca.tidExpiredQueue.queue[j] if tmpv.tid == tmpq.tid { - if curTime.After(tmpq.exitTime.Add(12 * time.Second)) { - t.Errorf("there is a expired threads that is not deleted. pid=%d, tid=%d, exitTime=%s\n", tmpv.pid, tmpv.tid, tmpv.exitTime.Format("2006-01-02 15:04:05")) + if curTime.After(tmpq.exitTime.Add(12 * timeDuration)) { + t.Errorf("there is a expired threads that is not deleted. pid=%d, tid=%d, exitTime=%s\n", tmpv.pid, tmpv.tid, tmpv.exitTime.Format("2006-01-02 15:04:05.000")) } break } @@ -95,43 +98,10 @@ func CheckQueueLoop(t *testing.T) { if _, exist := exitTid[tmpv.tid]; j >= queueLen && !exist { exitTid[tmpv.tid] = 1 exitCnt++ - t.Logf("pid=%d, tid=%d exit time=%s\n", tmpv.pid, tmpv.tid, curTime.Format("2006-01-02 15:04:05")) + t.Logf("pid=%d, tid=%d exit time=%s\n", tmpv.pid, tmpv.tid, curTime.Format("2006-01-02 15:04:05.000")) } } }() } } } - -func PutElemToMap(pid uint32, tid uint32, threadName string, event TimedEvent) { - - tidCpuEvents, exist := ca.cpuPidEvents[pid] - if !exist { - tidCpuEvents = make(map[uint32]*TimeSegments) - ca.cpuPidEvents[pid] = tidCpuEvents - } - - newTimeSegments := &TimeSegments{ - Pid: pid, - Tid: tid, - BaseTime: event.StartTimestamp() / nanoToSeconds, - Segments: NewCircleQueue(40), - } - for i := 0; i < 40; i++ { - segment := newSegment(pid, tid, threadName, - (newTimeSegments.BaseTime+uint64(i))*nanoToSeconds, - (newTimeSegments.BaseTime+uint64(i+1))*nanoToSeconds) - newTimeSegments.Segments.UpdateByIndex(i, segment) - } - - endOffset := int(event.EndTimestamp()/nanoToSeconds - newTimeSegments.BaseTime) - - for i := 0; i <= endOffset && i < 40; i++ { - val := newTimeSegments.Segments.GetByIndex(i) - segment := val.(*Segment) - segment.putTimedEvent(event) - segment.IsSend = 0 - } - - tidCpuEvents[tid] = newTimeSegments -}