Skip to content

Commit

Permalink
调整:链路追踪End接受error参数,
Browse files Browse the repository at this point in the history
  • Loading branch information
steden committed Sep 16, 2024
1 parent c7c4fae commit 3525680
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 5 deletions.
5 changes: 3 additions & 2 deletions pullMessage.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func (receiver *subscriber) pullMessage() {
remainingCount := receiver.queueManager.queue.Count() - endIndex

traceContext := receiver.traceManager.EntryQueueConsumer(receiver.queueManager.name, receiver.subscribeName)
var err error
// 执行客户端的消费
exception.Try(func() {
sw := stopwatch.StartNew()
Expand All @@ -47,10 +48,10 @@ func (receiver *subscriber) pullMessage() {
receiver.offset = endIndex - 1
flog.ComponentInfof("queue", "Subscribe:%s,PullCount:%d,ElapsedTime:%s", receiver.subscribeName, pullCount, sw.GetMillisecondsText())
}).CatchException(func(exp any) {
traceContext.Error(flog.Error(exp))
err = flog.Error(exp)
<-time.After(time.Second)
})
traceContext.End()
traceContext.End(err)
asyncLocal.Release()

receiver.queueManager.unWork()
Expand Down
6 changes: 3 additions & 3 deletions test/push_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestPush(t *testing.T) {
})
var aSum int
var lockA sync.Mutex
queue.Subscribe("test", "A", 2, func(subscribeName string, lstMessage collections.ListAny, remainingCount int) {
queue.Subscribe("test", "A", 2, 0, func(subscribeName string, lstMessage collections.ListAny, remainingCount int) {
assert.Equal(t, "A", subscribeName)
lst := collections.NewList[int]()
lstMessage.Foreach(func(item *any) {
Expand All @@ -33,7 +33,7 @@ func TestPush(t *testing.T) {

var bSum int
var lockB sync.Mutex
queue.Subscribe("test", "B", 4, func(subscribeName string, lstMessage collections.ListAny, remainingCount int) {
queue.Subscribe("test", "B", 4, 0, func(subscribeName string, lstMessage collections.ListAny, remainingCount int) {
assert.Equal(t, "B", subscribeName)
lst := collections.NewList[int]()
lstMessage.Foreach(func(item *any) {
Expand All @@ -45,7 +45,7 @@ func TestPush(t *testing.T) {
bSum += lst.SumItem()
})

queue.Subscribe("test", "C", 100, func(subscribeName string, lstMessage collections.ListAny, remainingCount int) {
queue.Subscribe("test", "C", 100, 0, func(subscribeName string, lstMessage collections.ListAny, remainingCount int) {
panic("测试panic")
})

Expand Down

0 comments on commit 3525680

Please sign in to comment.