Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: introduce ack trackers to prevent batch message loss #1277

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ type partitionConsumer struct {
chunkedMsgCtxMap *chunkedMsgCtxMap
unAckChunksTracker *unAckChunksTracker
ackGroupingTracker ackGroupingTracker
ackTrackers *ackTrackers

lastMessageInBroker *trackingMessageID

Expand Down Expand Up @@ -375,6 +376,7 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
pc.decryptor = decryptor

pc.nackTracker = newNegativeAcksTracker(pc, options.nackRedeliveryDelay, options.nackBackoffPolicy, pc.log)
pc.ackTrackers = newAckTrackers()

err := pc.grabConn("")
if err != nil {
Expand Down Expand Up @@ -443,6 +445,9 @@ func (pc *partitionConsumer) ackIDCommon(msgID MessageID, withResponse bool, txn
}

trackingID := toTrackingMessageID(msgID)
if trackingID != nil && trackingID.tracker == nil {
trackingID.tracker = pc.ackTrackers.tracker(trackingID)
}

if trackingID != nil && trackingID.ack() {
// All messages in the same batch have been acknowledged, we only need to acknowledge the
Expand All @@ -453,6 +458,7 @@ func (pc *partitionConsumer) ackIDCommon(msgID MessageID, withResponse bool, txn
entryID: trackingID.entryID,
},
}
pc.ackTrackers.remove(trackingID)
pc.metrics.AcksCounter.Inc()
pc.metrics.ProcessingTime.Observe(float64(time.Now().UnixNano()-trackingID.receivedTime.UnixNano()) / 1.0e9)
} else if !pc.options.enableBatchIndexAck {
Expand Down Expand Up @@ -712,6 +718,9 @@ func (pc *partitionConsumer) internalAckIDCumulative(msgID MessageID, withRespon
if trackingID == nil {
return errors.New("failed to convert trackingMessageID")
}
if trackingID.tracker == nil {
trackingID.tracker = pc.ackTrackers.tracker(trackingID)
}

var msgIDToAck *trackingMessageID
if trackingID.ackCumulative() || pc.options.enableBatchIndexAck {
Expand All @@ -725,6 +734,7 @@ func (pc *partitionConsumer) internalAckIDCumulative(msgID MessageID, withRespon
return nil
}

pc.ackTrackers.remove(msgIDToAck)
pc.metrics.AcksCounter.Inc()
pc.metrics.ProcessingTime.Observe(float64(time.Now().UnixNano()-trackingID.receivedTime.UnixNano()) / 1.0e9)

Expand Down Expand Up @@ -1162,6 +1172,7 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
ackTracker)
// set the consumer so we know how to ack the message id
trackingMsgID.consumer = pc
pc.ackTrackers.add(trackingMsgID, ackTracker)

if pc.messageShouldBeDiscarded(trackingMsgID) {
pc.AckID(trackingMsgID)
Expand Down Expand Up @@ -2366,3 +2377,32 @@ func (u *unAckChunksTracker) nack(cmid *chunkMessageID) {
}
u.remove(cmid)
}

type ackTrackers struct {
mu sync.RWMutex
trackers map[[2]int64]*ackTracker
}

func newAckTrackers() *ackTrackers {
return &ackTrackers{
trackers: make(map[[2]int64]*ackTracker),
}
}

func (a *ackTrackers) tracker(id MessageID) *ackTracker {
a.mu.RLock()
defer a.mu.RUnlock()
return a.trackers[[2]int64{id.LedgerID(), id.EntryID()}]
}

func (a *ackTrackers) add(id MessageID, tracker *ackTracker) {
a.mu.Lock()
defer a.mu.Unlock()
a.trackers[[2]int64{id.LedgerID(), id.EntryID()}] = tracker
}

func (a *ackTrackers) remove(id MessageID) {
a.mu.Lock()
defer a.mu.Unlock()
delete(a.trackers, [2]int64{id.LedgerID(), id.EntryID()})
}
64 changes: 64 additions & 0 deletions pulsar/consumer_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func TestSingleMessageIDNoAckTracker(t *testing.T) {
options: &partitionConsumerOpts{},
metrics: newTestMetrics(),
decryptor: crypto.NewNoopDecryptor(),
ackTrackers: newAckTrackers(),
}
pc.availablePermits = &availablePermits{pc: &pc}
pc.ackGroupingTracker = newAckGroupingTracker(&AckGroupingOptions{MaxSize: 0},
Expand Down Expand Up @@ -75,6 +76,7 @@ func TestBatchMessageIDNoAckTracker(t *testing.T) {
options: &partitionConsumerOpts{},
metrics: newTestMetrics(),
decryptor: crypto.NewNoopDecryptor(),
ackTrackers: newAckTrackers(),
}
pc.availablePermits = &availablePermits{pc: &pc}
pc.ackGroupingTracker = newAckGroupingTracker(&AckGroupingOptions{MaxSize: 0},
Expand Down Expand Up @@ -111,6 +113,7 @@ func TestBatchMessageIDWithAckTracker(t *testing.T) {
options: &partitionConsumerOpts{},
metrics: newTestMetrics(),
decryptor: crypto.NewNoopDecryptor(),
ackTrackers: newAckTrackers(),
}
pc.availablePermits = &availablePermits{pc: &pc}
pc.ackGroupingTracker = newAckGroupingTracker(&AckGroupingOptions{MaxSize: 0},
Expand Down Expand Up @@ -150,6 +153,67 @@ func TestBatchMessageIDWithAckTracker(t *testing.T) {
}
}

func TestBatchMessageIDWithAckTrackers(t *testing.T) {
eventsCh := make(chan interface{}, 1)
pc := partitionConsumer{
queueCh: make(chan []*message, 1),
eventsCh: eventsCh,
compressionProviders: sync.Map{},
options: &partitionConsumerOpts{},
metrics: newTestMetrics(),
decryptor: crypto.NewNoopDecryptor(),
ackTrackers: newAckTrackers(),
}
pc.availablePermits = &availablePermits{pc: &pc}
pc.ackGroupingTracker = newAckGroupingTracker(&AckGroupingOptions{MaxSize: 0},
func(id MessageID) { pc.sendIndividualAck(id) }, nil, nil)

headersAndPayload := internal.NewBufferWrapper(rawBatchMessage10)
if err := pc.MessageReceived(nil, headersAndPayload); err != nil {
t.Fatal(err)
}

// ensure the tracker was set on the message id
messages := <-pc.queueCh
for _, m := range messages {
assert.NotNil(t, m.ID().(*trackingMessageID).tracker)
}

noAckTrackerMessages := make([]MessageID, 10)
for i, m := range messages {
tmp := m.ID().Serialize()
mid, err := DeserializeMessageID(tmp)
if err != nil {
t.Fatal(err)
}
noAckTrackerMessages[i] = mid
}

// ack all message ids except the last one
for i := 0; i < 9; i++ {
_, ok := noAckTrackerMessages[i].(*trackingMessageID)
assert.False(t, ok)
err := pc.AckID(noAckTrackerMessages[i])
assert.Nil(t, err)
}

select {
case <-eventsCh:
t.Error("The message id should not be acked!")
default:
}

// ack last message
err := pc.AckID(noAckTrackerMessages[9])
assert.Nil(t, err)

select {
case <-eventsCh:
default:
t.Error("Expected an ack request to be triggered!")
}
}

// Raw single message in old format
// metadata properties:<key:"a" value:"1" > properties:<key:"b" value:"2" >
// payload = "hello"
Expand Down