Skip to content

Commit 612b7a9

Browse files
Cherry-picks for 2.10.26-RC.4 (#6520)
Includes the following: - #6507 - #6497 - #6476 - #6511 - #6513 - #6517 - #6515 - #6519 - #6521 Signed-off-by: Neil Twigg <neil@nats.io>
2 parents 45ee8c4 + e97c614 commit 612b7a9

20 files changed

+852
-46
lines changed

server/client.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -3970,7 +3970,7 @@ func (c *client) processInboundClientMsg(msg []byte) (bool, bool) {
39703970
reply = append(reply, '@')
39713971
reply = append(reply, c.pa.deliver...)
39723972
}
3973-
didDeliver = c.sendMsgToGateways(acc, msg, c.pa.subject, reply, qnames) || didDeliver
3973+
didDeliver = c.sendMsgToGateways(acc, msg, c.pa.subject, reply, qnames, false) || didDeliver
39743974
}
39753975

39763976
// Check to see if we did not deliver to anyone and the client has a reply subject set
@@ -4017,7 +4017,7 @@ func (c *client) handleGWReplyMap(msg []byte) bool {
40174017
reply = append(reply, '@')
40184018
reply = append(reply, c.pa.deliver...)
40194019
}
4020-
c.sendMsgToGateways(c.acc, msg, c.pa.subject, reply, nil)
4020+
c.sendMsgToGateways(c.acc, msg, c.pa.subject, reply, nil, false)
40214021
}
40224022
return true
40234023
}
@@ -4366,7 +4366,7 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt
43664366
flags |= pmrCollectQueueNames
43674367
var queues [][]byte
43684368
didDeliver, queues = c.processMsgResults(siAcc, rr, msg, c.pa.deliver, []byte(to), nrr, flags)
4369-
didDeliver = c.sendMsgToGateways(siAcc, msg, []byte(to), nrr, queues) || didDeliver
4369+
didDeliver = c.sendMsgToGateways(siAcc, msg, []byte(to), nrr, queues, false) || didDeliver
43704370
} else {
43714371
didDeliver, _ = c.processMsgResults(siAcc, rr, msg, c.pa.deliver, []byte(to), nrr, flags)
43724372
}

server/consumer.go

+15-4
Original file line numberDiff line numberDiff line change
@@ -5544,6 +5544,7 @@ func (o *consumer) isMonitorRunning() bool {
55445544

55455545
// If we detect that our ackfloor is higher than the stream's last sequence, return this error.
55465546
var errAckFloorHigherThanLastSeq = errors.New("consumer ack floor is higher than streams last sequence")
5547+
var errAckFloorInvalid = errors.New("consumer ack floor is invalid")
55475548

55485549
// If we are a consumer of an interest or workqueue policy stream, process that state and make sure consistent.
55495550
func (o *consumer) checkStateForInterestStream(ss *StreamState) error {
@@ -5573,7 +5574,7 @@ func (o *consumer) checkStateForInterestStream(ss *StreamState) error {
55735574
asflr := state.AckFloor.Stream
55745575
// Protect ourselves against rolling backwards.
55755576
if asflr&(1<<63) != 0 {
5576-
return nil
5577+
return errAckFloorInvalid
55775578
}
55785579

55795580
// Check if the underlying stream's last sequence is less than our floor.
@@ -5592,6 +5593,7 @@ func (o *consumer) checkStateForInterestStream(ss *StreamState) error {
55925593
fseq = chkfloor
55935594
}
55945595

5596+
var retryAsflr uint64
55955597
for seq = fseq; asflr > 0 && seq <= asflr; seq++ {
55965598
if filters != nil {
55975599
_, nseq, err = store.LoadNextMsgMulti(filters, seq, &smv)
@@ -5604,15 +5606,24 @@ func (o *consumer) checkStateForInterestStream(ss *StreamState) error {
56045606
}
56055607
// Only ack though if no error and seq <= ack floor.
56065608
if err == nil && seq <= asflr {
5607-
mset.ackMsg(o, seq)
5609+
didRemove := mset.ackMsg(o, seq)
5610+
// Removing the message could fail. For example if we're behind on stream applies.
5611+
// Overwrite retry floor (only the first time) to allow us to check next time if the removal was successful.
5612+
if didRemove && retryAsflr == 0 {
5613+
retryAsflr = seq
5614+
}
56085615
}
56095616
}
5617+
// If retry floor was not overwritten, set to ack floor+1, we don't need to account for any retries below it.
5618+
if retryAsflr == 0 {
5619+
retryAsflr = asflr + 1
5620+
}
56105621

56115622
o.mu.Lock()
56125623
// Update our check floor.
56135624
// Check floor must never be greater than ack floor+1, otherwise subsequent calls to this function would skip work.
5614-
if asflr+1 > o.chkflr {
5615-
o.chkflr = asflr + 1
5625+
if retryAsflr > o.chkflr {
5626+
o.chkflr = retryAsflr
56165627
}
56175628
// See if we need to process this update if our parent stream is not a limits policy stream.
56185629
state, _ = o.store.State()

server/filestore.go

+5-10
Original file line numberDiff line numberDiff line change
@@ -1432,15 +1432,14 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, []uint64, error) {
14321432
if seq == 0 || seq&ebit != 0 || seq < fseq {
14331433
seq = seq &^ ebit
14341434
if seq >= fseq {
1435-
// Only add to dmap if past recorded first seq and non-zero.
1436-
if seq != 0 {
1437-
addToDmap(seq)
1438-
}
14391435
atomic.StoreUint64(&mb.last.seq, seq)
14401436
mb.last.ts = ts
14411437
if mb.msgs == 0 {
14421438
atomic.StoreUint64(&mb.first.seq, seq+1)
14431439
mb.first.ts = 0
1440+
} else if seq != 0 {
1441+
// Only add to dmap if past recorded first seq and non-zero.
1442+
addToDmap(seq)
14441443
}
14451444
}
14461445
index += rl
@@ -7017,11 +7016,7 @@ func (fs *fileStore) State() StreamState {
70177016
}
70187017
// Add in deleted.
70197018
mb.dmap.Range(func(seq uint64) bool {
7020-
if seq < fseq {
7021-
mb.dmap.Delete(seq)
7022-
} else {
7023-
state.Deleted = append(state.Deleted, seq)
7024-
}
7019+
state.Deleted = append(state.Deleted, seq)
70257020
return true
70267021
})
70277022
mb.mu.Unlock()
@@ -7573,7 +7568,7 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) {
75737568
if err == errDeletedMsg {
75747569
// Update dmap.
75757570
if !smb.dmap.IsEmpty() {
7576-
smb.dmap.Delete(seq)
7571+
smb.dmap.Delete(mseq)
75777572
}
75787573
} else if sm != nil {
75797574
sz := fileStoreMsgSize(sm.subj, sm.hdr, sm.msg)

server/filestore_test.go

+55
Original file line numberDiff line numberDiff line change
@@ -8273,3 +8273,58 @@ func changeDirectoryPermission(directory string, mode fs.FileMode) error {
82738273
})
82748274
return err
82758275
}
8276+
8277+
func TestFileStoreLeftoverSkipMsgInDmap(t *testing.T) {
8278+
storeDir := t.TempDir()
8279+
fs, err := newFileStore(
8280+
FileStoreConfig{StoreDir: storeDir},
8281+
StreamConfig{Name: "zzz", Subjects: []string{"test.*"}, Storage: FileStorage, MaxMsgsPer: 1},
8282+
)
8283+
require_NoError(t, err)
8284+
defer fs.Stop()
8285+
8286+
getLmbState := func(fs *fileStore) (uint64, uint64, int) {
8287+
fs.mu.RLock()
8288+
lmb := fs.lmb
8289+
fs.mu.RUnlock()
8290+
lmb.mu.RLock()
8291+
fseq := atomic.LoadUint64(&lmb.first.seq)
8292+
lseq := atomic.LoadUint64(&lmb.last.seq)
8293+
dmaps := lmb.dmap.Size()
8294+
lmb.mu.RUnlock()
8295+
return fseq, lseq, dmaps
8296+
}
8297+
8298+
// Only skip a message.
8299+
fs.SkipMsg()
8300+
8301+
// Confirm state.
8302+
state := fs.State()
8303+
require_Equal(t, state.FirstSeq, 2)
8304+
require_Equal(t, state.LastSeq, 1)
8305+
require_Equal(t, state.NumDeleted, 0)
8306+
fseq, lseq, dmaps := getLmbState(fs)
8307+
require_Equal(t, fseq, 2)
8308+
require_Equal(t, lseq, 1)
8309+
require_Len(t, dmaps, 0)
8310+
8311+
// Stop without writing index.db so we recover based on just the blk file.
8312+
require_NoError(t, fs.stop(false, false))
8313+
8314+
fs, err = newFileStore(
8315+
FileStoreConfig{StoreDir: storeDir},
8316+
StreamConfig{Name: "zzz", Subjects: []string{"test.*"}, Storage: FileStorage, MaxMsgsPer: 1},
8317+
)
8318+
require_NoError(t, err)
8319+
defer fs.Stop()
8320+
8321+
// Confirm the skipped message is not included in the deletes.
8322+
state = fs.State()
8323+
require_Equal(t, state.FirstSeq, 2)
8324+
require_Equal(t, state.LastSeq, 1)
8325+
require_Equal(t, state.NumDeleted, 0)
8326+
fseq, lseq, dmaps = getLmbState(fs)
8327+
require_Equal(t, fseq, 2)
8328+
require_Equal(t, lseq, 1)
8329+
require_Len(t, dmaps, 0)
8330+
}

server/gateway.go

+22-2
Original file line numberDiff line numberDiff line change
@@ -2499,8 +2499,13 @@ var subPool = &sync.Pool{
24992499
// that the message is not sent to a given gateway if for instance
25002500
// it is known that this gateway has no interest in the account or
25012501
// subject, etc..
2502+
// When invoked from a LEAF connection, `checkLeafQF` should be passed as `true`
2503+
// so that we skip any queue subscription interest that is not part of the
2504+
// `c.pa.queues` filter (similar to what we do in `processMsgResults`). However,
2505+
// when processing service imports, then this boolean should be passes as `false`,
2506+
// regardless if it is a LEAF connection or not.
25022507
// <Invoked from any client connection's readLoop>
2503-
func (c *client) sendMsgToGateways(acc *Account, msg, subject, reply []byte, qgroups [][]byte) bool {
2508+
func (c *client) sendMsgToGateways(acc *Account, msg, subject, reply []byte, qgroups [][]byte, checkLeafQF bool) bool {
25042509
// We had some times when we were sending across a GW with no subject, and the other side would break
25052510
// due to parser error. These need to be fixed upstream but also double check here.
25062511
if len(subject) == 0 {
@@ -2577,6 +2582,21 @@ func (c *client) sendMsgToGateways(acc *Account, msg, subject, reply []byte, qgr
25772582
qsubs := qr.qsubs[i]
25782583
if len(qsubs) > 0 {
25792584
queue := qsubs[0].queue
2585+
if checkLeafQF {
2586+
// Skip any queue that is not in the leaf's queue filter.
2587+
skip := true
2588+
for _, qn := range c.pa.queues {
2589+
if bytes.Equal(queue, qn) {
2590+
skip = false
2591+
break
2592+
}
2593+
}
2594+
if skip {
2595+
continue
2596+
}
2597+
// Now we still need to check that it was not delivered
2598+
// locally by checking the given `qgroups`.
2599+
}
25802600
add := true
25812601
for _, qn := range qgroups {
25822602
if bytes.Equal(queue, qn) {
@@ -2969,7 +2989,7 @@ func (c *client) handleGatewayReply(msg []byte) (processed bool) {
29692989
// we now need to send the message with the real subject to
29702990
// gateways in case they have interest on that reply subject.
29712991
if !isServiceReply {
2972-
c.sendMsgToGateways(acc, msg, c.pa.subject, c.pa.reply, queues)
2992+
c.sendMsgToGateways(acc, msg, c.pa.subject, c.pa.reply, queues, false)
29732993
}
29742994
} else if c.kind == GATEWAY {
29752995
// Only if we are a gateway connection should we try to route

server/jetstream_cluster.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -1587,10 +1587,11 @@ func (js *jetStream) applyMetaSnapshot(buf []byte, ru *recoveryUpdates, isRecove
15871587
}
15881588
if osa := js.streamAssignment(sa.Client.serviceAccount(), sa.Config.Name); osa != nil {
15891589
for _, ca := range osa.consumers {
1590-
if sa.consumers[ca.Name] == nil {
1590+
// Consumer was either removed, or recreated with a different raft group.
1591+
if nca := sa.consumers[ca.Name]; nca == nil {
1592+
caDel = append(caDel, ca)
1593+
} else if nca.Group != nil && ca.Group != nil && nca.Group.Name != ca.Group.Name {
15911594
caDel = append(caDel, ca)
1592-
} else {
1593-
caAdd = append(caAdd, ca)
15941595
}
15951596
}
15961597
}

server/jetstream_cluster_1_test.go

+110
Original file line numberDiff line numberDiff line change
@@ -7555,6 +7555,116 @@ func TestJetStreamClusterAccountStatsForReplicatedStreams(t *testing.T) {
75557555
require_True(t, accStats.Sent.Bytes >= accStats.Received.Bytes*4)
75567556
}
75577557

7558+
func TestJetStreamClusterRecreateConsumerFromMetaSnapshot(t *testing.T) {
7559+
c := createJetStreamClusterExplicit(t, "R3S", 3)
7560+
defer c.shutdown()
7561+
7562+
nc, js := jsClientConnect(t, c.randomServer())
7563+
defer nc.Close()
7564+
7565+
// Initial setup.
7566+
_, err := js.AddStream(&nats.StreamConfig{
7567+
Name: "TEST",
7568+
Subjects: []string{"foo"},
7569+
Replicas: 3,
7570+
})
7571+
require_NoError(t, err)
7572+
_, err = js.Publish("foo", nil)
7573+
require_NoError(t, err)
7574+
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{Durable: "CONSUMER"})
7575+
require_NoError(t, err)
7576+
7577+
// Wait for all servers to be fully up-to-date.
7578+
checkFor(t, 2*time.Second, 500*time.Millisecond, func() error {
7579+
if err = checkState(t, c, globalAccountName, "TEST"); err != nil {
7580+
return err
7581+
}
7582+
for _, s := range c.servers {
7583+
if acc, err := s.lookupAccount(globalAccountName); err != nil {
7584+
return err
7585+
} else if mset, err := acc.lookupStream("TEST"); err != nil {
7586+
return err
7587+
} else if o := mset.lookupConsumer("CONSUMER"); o == nil {
7588+
return errors.New("consumer doesn't exist")
7589+
}
7590+
}
7591+
return nil
7592+
})
7593+
7594+
// Shutdown a random server.
7595+
rs := c.randomServer()
7596+
rs.Shutdown()
7597+
rs.WaitForShutdown()
7598+
7599+
// Recreate connection, since we could have shutdown the server we were connected to.
7600+
nc.Close()
7601+
c.waitOnLeader()
7602+
nc, js = jsClientConnect(t, c.randomServer())
7603+
defer nc.Close()
7604+
7605+
// Recreate consumer.
7606+
require_NoError(t, js.DeleteConsumer("TEST", "CONSUMER"))
7607+
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{Durable: "CONSUMER"})
7608+
require_NoError(t, err)
7609+
7610+
// Wait for all servers (except for the one that's down) to have recreated the consumer.
7611+
var consumerRg string
7612+
checkFor(t, 2*time.Second, 500*time.Millisecond, func() error {
7613+
consumerRg = _EMPTY_
7614+
for _, s := range c.servers {
7615+
if s == rs {
7616+
continue
7617+
}
7618+
if acc, err := s.lookupAccount(globalAccountName); err != nil {
7619+
return err
7620+
} else if mset, err := acc.lookupStream("TEST"); err != nil {
7621+
return err
7622+
} else if o := mset.lookupConsumer("CONSUMER"); o == nil {
7623+
return errors.New("consumer doesn't exist")
7624+
} else if ccrg := o.raftNode().Group(); consumerRg == _EMPTY_ {
7625+
consumerRg = ccrg
7626+
} else if consumerRg != ccrg {
7627+
return errors.New("consumer raft groups don't match")
7628+
}
7629+
}
7630+
return nil
7631+
})
7632+
7633+
// Install snapshots on all remaining servers to "hide" the intermediate consumer recreate requests.
7634+
for _, s := range c.servers {
7635+
if s != rs {
7636+
sjs := s.getJetStream()
7637+
require_NotNil(t, sjs)
7638+
snap, err := sjs.metaSnapshot()
7639+
require_NoError(t, err)
7640+
sjs.mu.RLock()
7641+
meta := sjs.cluster.meta
7642+
sjs.mu.RUnlock()
7643+
require_NoError(t, meta.InstallSnapshot(snap))
7644+
}
7645+
}
7646+
7647+
// Restart the server, it should receive a meta snapshot and recognize the consumer recreation.
7648+
rs = c.restartServer(rs)
7649+
checkFor(t, 2*time.Second, 500*time.Millisecond, func() error {
7650+
consumerRg = _EMPTY_
7651+
for _, s := range c.servers {
7652+
if acc, err := s.lookupAccount(globalAccountName); err != nil {
7653+
return err
7654+
} else if mset, err := acc.lookupStream("TEST"); err != nil {
7655+
return err
7656+
} else if o := mset.lookupConsumer("CONSUMER"); o == nil {
7657+
return errors.New("consumer doesn't exist")
7658+
} else if ccrg := o.raftNode().Group(); consumerRg == _EMPTY_ {
7659+
consumerRg = ccrg
7660+
} else if consumerRg != ccrg {
7661+
return errors.New("consumer raft groups don't match")
7662+
}
7663+
}
7664+
return nil
7665+
})
7666+
}
7667+
75587668
//
75597669
// DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times)
75607670
// Add at the end of jetstream_cluster_<n>_test.go, with <n> being the highest value.

0 commit comments

Comments
 (0)