Skip to content

Commit

Permalink
简化实现
Browse files Browse the repository at this point in the history
  • Loading branch information
steden committed Jan 19, 2025
1 parent 3d01cf2 commit e5de8c3
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 115 deletions.
89 changes: 0 additions & 89 deletions pullMessage.go

This file was deleted.

16 changes: 8 additions & 8 deletions queueManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,21 +38,21 @@ func newQueueManager(queueName string) *queueManager {
}

// 定时检查一下队列的消费长度
func (queueList *queueManager) stat() {
func (receiver *queueManager) stat() {
for {
time.Sleep(MoveQueueInterval)
queueList.lock.Lock()
receiver.lock.Lock()

// 得到当前所有订阅者的最后消费的位置的最小值
queueList.statLastIndex()
receiver.statLastIndex()

// 所有订阅者没有在执行的时候,做一次队列合并
if queueList.minOffset > -1 {
preLength := queueList.queue.Count()
queueList.moveQueue()
flog.ComponentInfof("queue", "Migrating Data,QueueName:%s,queueLength:%d -> %d", queueList.name, preLength, queueList.queue.Count())
if receiver.minOffset > -1 {
preLength := receiver.queue.Count()
receiver.moveQueue()
flog.ComponentInfof("queue", "Migrating Data,QueueName:%s,queueLength:%d -> %d", receiver.name, preLength, receiver.queue.Count())
}
queueList.lock.Unlock()
receiver.lock.Unlock()
}
}

Expand Down
96 changes: 78 additions & 18 deletions subscribe.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
package queue

import (
"time"

"github.com/farseer-go/collections"
"github.com/farseer-go/fs/asyncLocal"
"github.com/farseer-go/fs/container"
"github.com/farseer-go/fs/exception"
"github.com/farseer-go/fs/flog"
"github.com/farseer-go/fs/trace"
"time"
)

// Consumer 消费
type queueSubscribeFunc func(subscribeName string, lstMessage collections.ListAny, remainingCount int)

// 订阅者的队列
type subscriber struct {
// 订阅者名称
Expand All @@ -27,22 +34,6 @@ type subscriber struct {
sleepTime time.Duration
}

func newSubscriber(subscribeName string, fn queueSubscribeFunc, pullCount int, sleepTime time.Duration, queue *queueManager) *subscriber {
return &subscriber{
subscribeName: subscribeName,
offset: -1,
subscribeFunc: fn,
pullCount: pullCount,
sleepTime: sleepTime,
queueManager: queue,
notify: make(chan bool, 100000),
traceManager: container.Resolve[trace.IManager](),
}
}

// Consumer 消费
type queueSubscribeFunc func(subscribeName string, lstMessage collections.ListAny, remainingCount int)

// Subscribe 订阅消息
// queueName = 队列名称
// subscribeName = 订阅者名称
Expand All @@ -60,8 +51,77 @@ func Subscribe(queueName string, subscribeName string, pullCount int, sleepTime
queue := dicQueue.GetValue(queueName)

// 添加订阅者
subscriber := newSubscriber(subscribeName, fn, pullCount, sleepTime, queue)
subscriber := &subscriber{
subscribeName: subscribeName,
offset: -1,
subscribeFunc: fn,
pullCount: pullCount,
sleepTime: sleepTime,
queueManager: queue,
notify: make(chan bool, 100000),
traceManager: container.Resolve[trace.IManager](),
}
queue.subscribers.Add(subscriber)

go subscriber.pullMessage()
}

// 计算本次可以消费的数量
func (receiver *subscriber) getPullCount() int {
receiver.queueManager.work()
defer receiver.queueManager.unWork()

pullCount := receiver.queueManager.queue.Count() - receiver.offset - 1
// 如果超出每次拉取的数量,则以拉取设置为准
if pullCount > receiver.pullCount {
pullCount = receiver.pullCount
}
return pullCount
}

// 每个订阅者独立消费
func (receiver *subscriber) pullMessage() {
for {
// 得出未消费的长度
pullCount := receiver.getPullCount()
// 如果未消费的长度小于1,则说明没有新的数据
if pullCount < 1 {
<-receiver.notify
continue
}

receiver.notify = make(chan bool, 100000)
// 设置为消费中
receiver.queueManager.work()

// 计算当前订阅者应消费队列的起始位置
startIndex := receiver.offset + 1
endIndex := startIndex + pullCount

// 得到本次消费的队列切片
curQueue := receiver.queueManager.queue.Range(startIndex, pullCount).ToListAny()
remainingCount := receiver.queueManager.queue.Count() - endIndex

traceContext := receiver.traceManager.EntryQueueConsumer(receiver.queueManager.name, receiver.subscribeName)
var err error
// 执行客户端的消费
exception.Try(func() {
receiver.subscribeFunc(receiver.subscribeName, curQueue, remainingCount)
// 保存本次消费的位置
receiver.offset = endIndex - 1
}).CatchException(func(exp any) {
err = flog.Error(exp)
<-time.After(time.Second)
})
curQueue.Clear()
container.Resolve[trace.IManager]().Push(traceContext, err)

Check failure on line 117 in subscribe.go

View workflow job for this annotation

GitHub Actions / build

container.Resolve[trace.IManager]().Push undefined (type trace.IManager has no field or method Push)
asyncLocal.Release()

receiver.queueManager.unWork()

// 休眠指定时间
if receiver.sleepTime > 0 {
time.Sleep(receiver.sleepTime)
}
}
}

0 comments on commit e5de8c3

Please sign in to comment.