diff --git a/pullMessage.go b/pullMessage.go index e28afd7..6de9c60 100644 --- a/pullMessage.go +++ b/pullMessage.go @@ -39,6 +39,7 @@ func (receiver *subscriber) pullMessage() { remainingCount := receiver.queueManager.queue.Count() - endIndex traceContext := receiver.traceManager.EntryQueueConsumer(receiver.queueManager.name, receiver.subscribeName) + var err error // 执行客户端的消费 exception.Try(func() { sw := stopwatch.StartNew() @@ -47,10 +48,10 @@ func (receiver *subscriber) pullMessage() { receiver.offset = endIndex - 1 flog.ComponentInfof("queue", "Subscribe:%s,PullCount:%d,ElapsedTime:%s", receiver.subscribeName, pullCount, sw.GetMillisecondsText()) }).CatchException(func(exp any) { - traceContext.Error(flog.Error(exp)) + err = flog.Error(exp) <-time.After(time.Second) }) - traceContext.End() + traceContext.End(err) asyncLocal.Release() receiver.queueManager.unWork() diff --git a/test/push_test.go b/test/push_test.go index 7c1e08f..449d6a7 100644 --- a/test/push_test.go +++ b/test/push_test.go @@ -19,7 +19,7 @@ func TestPush(t *testing.T) { }) var aSum int var lockA sync.Mutex - queue.Subscribe("test", "A", 2, func(subscribeName string, lstMessage collections.ListAny, remainingCount int) { + queue.Subscribe("test", "A", 2, 0, func(subscribeName string, lstMessage collections.ListAny, remainingCount int) { assert.Equal(t, "A", subscribeName) lst := collections.NewList[int]() lstMessage.Foreach(func(item *any) { @@ -33,7 +33,7 @@ func TestPush(t *testing.T) { var bSum int var lockB sync.Mutex - queue.Subscribe("test", "B", 4, func(subscribeName string, lstMessage collections.ListAny, remainingCount int) { + queue.Subscribe("test", "B", 4, 0, func(subscribeName string, lstMessage collections.ListAny, remainingCount int) { assert.Equal(t, "B", subscribeName) lst := collections.NewList[int]() lstMessage.Foreach(func(item *any) { @@ -45,7 +45,7 @@ func TestPush(t *testing.T) { bSum += lst.SumItem() }) - queue.Subscribe("test", "C", 100, func(subscribeName string, lstMessage collections.ListAny, remainingCount int) { + queue.Subscribe("test", "C", 100, 0, func(subscribeName string, lstMessage collections.ListAny, remainingCount int) { panic("测试panic") })