diff --git a/server/client.go b/server/client.go index 774468d8e95..3304d534894 100644 --- a/server/client.go +++ b/server/client.go @@ -3970,7 +3970,7 @@ func (c *client) processInboundClientMsg(msg []byte) (bool, bool) { reply = append(reply, '@') reply = append(reply, c.pa.deliver...) } - didDeliver = c.sendMsgToGateways(acc, msg, c.pa.subject, reply, qnames) || didDeliver + didDeliver = c.sendMsgToGateways(acc, msg, c.pa.subject, reply, qnames, false) || didDeliver } // Check to see if we did not deliver to anyone and the client has a reply subject set @@ -4017,7 +4017,7 @@ func (c *client) handleGWReplyMap(msg []byte) bool { reply = append(reply, '@') reply = append(reply, c.pa.deliver...) } - c.sendMsgToGateways(c.acc, msg, c.pa.subject, reply, nil) + c.sendMsgToGateways(c.acc, msg, c.pa.subject, reply, nil, false) } return true } @@ -4366,7 +4366,7 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt flags |= pmrCollectQueueNames var queues [][]byte didDeliver, queues = c.processMsgResults(siAcc, rr, msg, c.pa.deliver, []byte(to), nrr, flags) - didDeliver = c.sendMsgToGateways(siAcc, msg, []byte(to), nrr, queues) || didDeliver + didDeliver = c.sendMsgToGateways(siAcc, msg, []byte(to), nrr, queues, false) || didDeliver } else { didDeliver, _ = c.processMsgResults(siAcc, rr, msg, c.pa.deliver, []byte(to), nrr, flags) } diff --git a/server/consumer.go b/server/consumer.go index 9511c592bcc..dc5f7e58288 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -5544,6 +5544,7 @@ func (o *consumer) isMonitorRunning() bool { // If we detect that our ackfloor is higher than the stream's last sequence, return this error. var errAckFloorHigherThanLastSeq = errors.New("consumer ack floor is higher than streams last sequence") +var errAckFloorInvalid = errors.New("consumer ack floor is invalid") // If we are a consumer of an interest or workqueue policy stream, process that state and make sure consistent. func (o *consumer) checkStateForInterestStream(ss *StreamState) error { @@ -5573,7 +5574,7 @@ func (o *consumer) checkStateForInterestStream(ss *StreamState) error { asflr := state.AckFloor.Stream // Protect ourselves against rolling backwards. if asflr&(1<<63) != 0 { - return nil + return errAckFloorInvalid } // Check if the underlying stream's last sequence is less than our floor. @@ -5592,6 +5593,7 @@ func (o *consumer) checkStateForInterestStream(ss *StreamState) error { fseq = chkfloor } + var retryAsflr uint64 for seq = fseq; asflr > 0 && seq <= asflr; seq++ { if filters != nil { _, nseq, err = store.LoadNextMsgMulti(filters, seq, &smv) @@ -5604,15 +5606,24 @@ func (o *consumer) checkStateForInterestStream(ss *StreamState) error { } // Only ack though if no error and seq <= ack floor. if err == nil && seq <= asflr { - mset.ackMsg(o, seq) + didRemove := mset.ackMsg(o, seq) + // Removing the message could fail. For example if we're behind on stream applies. + // Overwrite retry floor (only the first time) to allow us to check next time if the removal was successful. + if didRemove && retryAsflr == 0 { + retryAsflr = seq + } } } + // If retry floor was not overwritten, set to ack floor+1, we don't need to account for any retries below it. + if retryAsflr == 0 { + retryAsflr = asflr + 1 + } o.mu.Lock() // Update our check floor. // Check floor must never be greater than ack floor+1, otherwise subsequent calls to this function would skip work. - if asflr+1 > o.chkflr { - o.chkflr = asflr + 1 + if retryAsflr > o.chkflr { + o.chkflr = retryAsflr } // See if we need to process this update if our parent stream is not a limits policy stream. state, _ = o.store.State() diff --git a/server/filestore.go b/server/filestore.go index 6550aa444ff..39526752349 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -1432,15 +1432,14 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, []uint64, error) { if seq == 0 || seq&ebit != 0 || seq < fseq { seq = seq &^ ebit if seq >= fseq { - // Only add to dmap if past recorded first seq and non-zero. - if seq != 0 { - addToDmap(seq) - } atomic.StoreUint64(&mb.last.seq, seq) mb.last.ts = ts if mb.msgs == 0 { atomic.StoreUint64(&mb.first.seq, seq+1) mb.first.ts = 0 + } else if seq != 0 { + // Only add to dmap if past recorded first seq and non-zero. + addToDmap(seq) } } index += rl @@ -7017,11 +7016,7 @@ func (fs *fileStore) State() StreamState { } // Add in deleted. mb.dmap.Range(func(seq uint64) bool { - if seq < fseq { - mb.dmap.Delete(seq) - } else { - state.Deleted = append(state.Deleted, seq) - } + state.Deleted = append(state.Deleted, seq) return true }) mb.mu.Unlock() @@ -7573,7 +7568,7 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) { if err == errDeletedMsg { // Update dmap. if !smb.dmap.IsEmpty() { - smb.dmap.Delete(seq) + smb.dmap.Delete(mseq) } } else if sm != nil { sz := fileStoreMsgSize(sm.subj, sm.hdr, sm.msg) diff --git a/server/filestore_test.go b/server/filestore_test.go index 18493b61017..2f80f671809 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -8273,3 +8273,58 @@ func changeDirectoryPermission(directory string, mode fs.FileMode) error { }) return err } + +func TestFileStoreLeftoverSkipMsgInDmap(t *testing.T) { + storeDir := t.TempDir() + fs, err := newFileStore( + FileStoreConfig{StoreDir: storeDir}, + StreamConfig{Name: "zzz", Subjects: []string{"test.*"}, Storage: FileStorage, MaxMsgsPer: 1}, + ) + require_NoError(t, err) + defer fs.Stop() + + getLmbState := func(fs *fileStore) (uint64, uint64, int) { + fs.mu.RLock() + lmb := fs.lmb + fs.mu.RUnlock() + lmb.mu.RLock() + fseq := atomic.LoadUint64(&lmb.first.seq) + lseq := atomic.LoadUint64(&lmb.last.seq) + dmaps := lmb.dmap.Size() + lmb.mu.RUnlock() + return fseq, lseq, dmaps + } + + // Only skip a message. + fs.SkipMsg() + + // Confirm state. + state := fs.State() + require_Equal(t, state.FirstSeq, 2) + require_Equal(t, state.LastSeq, 1) + require_Equal(t, state.NumDeleted, 0) + fseq, lseq, dmaps := getLmbState(fs) + require_Equal(t, fseq, 2) + require_Equal(t, lseq, 1) + require_Len(t, dmaps, 0) + + // Stop without writing index.db so we recover based on just the blk file. + require_NoError(t, fs.stop(false, false)) + + fs, err = newFileStore( + FileStoreConfig{StoreDir: storeDir}, + StreamConfig{Name: "zzz", Subjects: []string{"test.*"}, Storage: FileStorage, MaxMsgsPer: 1}, + ) + require_NoError(t, err) + defer fs.Stop() + + // Confirm the skipped message is not included in the deletes. + state = fs.State() + require_Equal(t, state.FirstSeq, 2) + require_Equal(t, state.LastSeq, 1) + require_Equal(t, state.NumDeleted, 0) + fseq, lseq, dmaps = getLmbState(fs) + require_Equal(t, fseq, 2) + require_Equal(t, lseq, 1) + require_Len(t, dmaps, 0) +} diff --git a/server/gateway.go b/server/gateway.go index d73a3b2842b..22f0e417bdd 100644 --- a/server/gateway.go +++ b/server/gateway.go @@ -2499,8 +2499,13 @@ var subPool = &sync.Pool{ // that the message is not sent to a given gateway if for instance // it is known that this gateway has no interest in the account or // subject, etc.. +// When invoked from a LEAF connection, `checkLeafQF` should be passed as `true` +// so that we skip any queue subscription interest that is not part of the +// `c.pa.queues` filter (similar to what we do in `processMsgResults`). However, +// when processing service imports, then this boolean should be passes as `false`, +// regardless if it is a LEAF connection or not. // -func (c *client) sendMsgToGateways(acc *Account, msg, subject, reply []byte, qgroups [][]byte) bool { +func (c *client) sendMsgToGateways(acc *Account, msg, subject, reply []byte, qgroups [][]byte, checkLeafQF bool) bool { // We had some times when we were sending across a GW with no subject, and the other side would break // due to parser error. These need to be fixed upstream but also double check here. if len(subject) == 0 { @@ -2577,6 +2582,21 @@ func (c *client) sendMsgToGateways(acc *Account, msg, subject, reply []byte, qgr qsubs := qr.qsubs[i] if len(qsubs) > 0 { queue := qsubs[0].queue + if checkLeafQF { + // Skip any queue that is not in the leaf's queue filter. + skip := true + for _, qn := range c.pa.queues { + if bytes.Equal(queue, qn) { + skip = false + break + } + } + if skip { + continue + } + // Now we still need to check that it was not delivered + // locally by checking the given `qgroups`. + } add := true for _, qn := range qgroups { if bytes.Equal(queue, qn) { @@ -2969,7 +2989,7 @@ func (c *client) handleGatewayReply(msg []byte) (processed bool) { // we now need to send the message with the real subject to // gateways in case they have interest on that reply subject. if !isServiceReply { - c.sendMsgToGateways(acc, msg, c.pa.subject, c.pa.reply, queues) + c.sendMsgToGateways(acc, msg, c.pa.subject, c.pa.reply, queues, false) } } else if c.kind == GATEWAY { // Only if we are a gateway connection should we try to route diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 3632b3fe0ea..c1d7d255d92 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -1587,10 +1587,11 @@ func (js *jetStream) applyMetaSnapshot(buf []byte, ru *recoveryUpdates, isRecove } if osa := js.streamAssignment(sa.Client.serviceAccount(), sa.Config.Name); osa != nil { for _, ca := range osa.consumers { - if sa.consumers[ca.Name] == nil { + // Consumer was either removed, or recreated with a different raft group. + if nca := sa.consumers[ca.Name]; nca == nil { + caDel = append(caDel, ca) + } else if nca.Group != nil && ca.Group != nil && nca.Group.Name != ca.Group.Name { caDel = append(caDel, ca) - } else { - caAdd = append(caAdd, ca) } } } diff --git a/server/jetstream_cluster_1_test.go b/server/jetstream_cluster_1_test.go index 48e89087037..34caaa4c7ed 100644 --- a/server/jetstream_cluster_1_test.go +++ b/server/jetstream_cluster_1_test.go @@ -7555,6 +7555,116 @@ func TestJetStreamClusterAccountStatsForReplicatedStreams(t *testing.T) { require_True(t, accStats.Sent.Bytes >= accStats.Received.Bytes*4) } +func TestJetStreamClusterRecreateConsumerFromMetaSnapshot(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + // Initial setup. + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 3, + }) + require_NoError(t, err) + _, err = js.Publish("foo", nil) + require_NoError(t, err) + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{Durable: "CONSUMER"}) + require_NoError(t, err) + + // Wait for all servers to be fully up-to-date. + checkFor(t, 2*time.Second, 500*time.Millisecond, func() error { + if err = checkState(t, c, globalAccountName, "TEST"); err != nil { + return err + } + for _, s := range c.servers { + if acc, err := s.lookupAccount(globalAccountName); err != nil { + return err + } else if mset, err := acc.lookupStream("TEST"); err != nil { + return err + } else if o := mset.lookupConsumer("CONSUMER"); o == nil { + return errors.New("consumer doesn't exist") + } + } + return nil + }) + + // Shutdown a random server. + rs := c.randomServer() + rs.Shutdown() + rs.WaitForShutdown() + + // Recreate connection, since we could have shutdown the server we were connected to. + nc.Close() + c.waitOnLeader() + nc, js = jsClientConnect(t, c.randomServer()) + defer nc.Close() + + // Recreate consumer. + require_NoError(t, js.DeleteConsumer("TEST", "CONSUMER")) + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{Durable: "CONSUMER"}) + require_NoError(t, err) + + // Wait for all servers (except for the one that's down) to have recreated the consumer. + var consumerRg string + checkFor(t, 2*time.Second, 500*time.Millisecond, func() error { + consumerRg = _EMPTY_ + for _, s := range c.servers { + if s == rs { + continue + } + if acc, err := s.lookupAccount(globalAccountName); err != nil { + return err + } else if mset, err := acc.lookupStream("TEST"); err != nil { + return err + } else if o := mset.lookupConsumer("CONSUMER"); o == nil { + return errors.New("consumer doesn't exist") + } else if ccrg := o.raftNode().Group(); consumerRg == _EMPTY_ { + consumerRg = ccrg + } else if consumerRg != ccrg { + return errors.New("consumer raft groups don't match") + } + } + return nil + }) + + // Install snapshots on all remaining servers to "hide" the intermediate consumer recreate requests. + for _, s := range c.servers { + if s != rs { + sjs := s.getJetStream() + require_NotNil(t, sjs) + snap, err := sjs.metaSnapshot() + require_NoError(t, err) + sjs.mu.RLock() + meta := sjs.cluster.meta + sjs.mu.RUnlock() + require_NoError(t, meta.InstallSnapshot(snap)) + } + } + + // Restart the server, it should receive a meta snapshot and recognize the consumer recreation. + rs = c.restartServer(rs) + checkFor(t, 2*time.Second, 500*time.Millisecond, func() error { + consumerRg = _EMPTY_ + for _, s := range c.servers { + if acc, err := s.lookupAccount(globalAccountName); err != nil { + return err + } else if mset, err := acc.lookupStream("TEST"); err != nil { + return err + } else if o := mset.lookupConsumer("CONSUMER"); o == nil { + return errors.New("consumer doesn't exist") + } else if ccrg := o.raftNode().Group(); consumerRg == _EMPTY_ { + consumerRg = ccrg + } else if consumerRg != ccrg { + return errors.New("consumer raft groups don't match") + } + } + return nil + }) +} + // // DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times) // Add at the end of jetstream_cluster__test.go, with being the highest value. diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index 4ee7dd5cb2e..bfab9f19415 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -744,7 +744,7 @@ func TestJetStreamClusterStreamOrphanMsgsAndReplicasDrifting(t *testing.T) { var expectedErr error var msgId string var smv StoreMsg - for i, mset := range msets { + for _, mset := range msets { mset.mu.RLock() sm, err := mset.store.LoadMsg(seq, &smv) mset.mu.RUnlock() @@ -754,17 +754,14 @@ func TestJetStreamClusterStreamOrphanMsgsAndReplicasDrifting(t *testing.T) { // by all msets for that seq to prove consistency across replicas. // If one of the msets either returns no error or doesn't return // the same error, then that replica has drifted. - if msgId != _EMPTY_ { - t.Fatalf("Expected MsgId %q for seq %d, but got error: %v", msgId, seq, err) - } else if expectedErr == nil { + if expectedErr == nil { expectedErr = err } else { require_Error(t, err, expectedErr) } continue } - // Only set expected msg ID if it's for the very first time. - if msgId == _EMPTY_ && i == 0 { + if msgId == _EMPTY_ { msgId = string(sm.hdr) } else if msgId != string(sm.hdr) { t.Fatalf("MsgIds do not match for seq %d: %q vs %q", seq, msgId, sm.hdr) @@ -4673,3 +4670,116 @@ func TestJetStreamClusterRoutedAPIRecoverPerformance(t *testing.T) { ljs.mu.Lock() t.Logf("Took %s to clear %d items", time.Since(start), count) } + +func TestJetStreamClusterStreamAckMsgR1SignalsRemovedMsg(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Retention: nats.WorkQueuePolicy, + Replicas: 1, + }) + require_NoError(t, err) + + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{ + Durable: "CONSUMER", + Replicas: 1, + AckPolicy: nats.AckExplicitPolicy, + }) + require_NoError(t, err) + + _, err = js.Publish("foo", nil) + require_NoError(t, err) + + s := c.streamLeader(globalAccountName, "TEST") + acc, err := s.lookupAccount(globalAccountName) + require_NoError(t, err) + mset, err := acc.lookupStream("TEST") + require_NoError(t, err) + o := mset.lookupConsumer("CONSUMER") + require_NotNil(t, o) + + // Too high sequence, should register pre-ack and return true allowing for retries. + require_True(t, mset.ackMsg(o, 100)) + + var smv StoreMsg + sm, err := mset.store.LoadMsg(1, &smv) + require_NoError(t, err) + require_Equal(t, sm.subj, "foo") + + // Now do a proper ack, should immediately remove the message since it's R1. + require_True(t, mset.ackMsg(o, 1)) + _, err = mset.store.LoadMsg(1, &smv) + require_Error(t, err, ErrStoreMsgNotFound) +} + +func TestJetStreamClusterStreamAckMsgR3SignalsRemovedMsg(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Retention: nats.WorkQueuePolicy, + Replicas: 3, + }) + require_NoError(t, err) + + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{ + Durable: "CONSUMER", + Replicas: 3, + AckPolicy: nats.AckExplicitPolicy, + }) + require_NoError(t, err) + + _, err = js.Publish("foo", nil) + require_NoError(t, err) + + getStreamAndConsumer := func(s *Server) (*stream, *consumer, error) { + t.Helper() + acc, err := s.lookupAccount(globalAccountName) + if err != nil { + return nil, nil, err + } + mset, err := acc.lookupStream("TEST") + if err != nil { + return nil, nil, err + } + o := mset.lookupConsumer("CONSUMER") + if err != nil { + return nil, nil, err + } + return mset, o, nil + } + + sl := c.consumerLeader(globalAccountName, "TEST", "CONSUMER") + sf := c.randomNonConsumerLeader(globalAccountName, "TEST", "CONSUMER") + + msetL, ol, err := getStreamAndConsumer(sl) + require_NoError(t, err) + msetF, of, err := getStreamAndConsumer(sf) + require_NoError(t, err) + + // Too high sequence, should register pre-ack and return true allowing for retries. + require_True(t, msetL.ackMsg(ol, 100)) + require_True(t, msetF.ackMsg(of, 100)) + + // Let all servers ack the message. + var smv StoreMsg + for _, s := range c.servers { + mset, _, err := getStreamAndConsumer(s) + require_NoError(t, err) + require_True(t, mset.ackMsg(of, 1)) + + _, err = mset.store.LoadMsg(1, &smv) + require_Error(t, err, ErrStoreMsgNotFound) + } +} diff --git a/server/leafnode.go b/server/leafnode.go index c95d39340fd..68db0c8c557 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -2781,7 +2781,7 @@ func (c *client) processInboundLeafMsg(msg []byte) { // Now deal with gateways if c.srv.gateway.enabled { - c.sendMsgToGateways(acc, msg, c.pa.subject, c.pa.reply, qnames) + c.sendMsgToGateways(acc, msg, c.pa.subject, c.pa.reply, qnames, true) } } diff --git a/server/leafnode_test.go b/server/leafnode_test.go index 7c2b228608c..74854e1fdef 100644 --- a/server/leafnode_test.go +++ b/server/leafnode_test.go @@ -4554,6 +4554,289 @@ func TestLeafNodeQueueGroupDistributionWithDaisyChainAndGateway(t *testing.T) { } } +func TestLeafNodeAndGatewaysSingleMsgPerQueueGroup(t *testing.T) { + SetGatewaysSolicitDelay(0) + defer ResetGatewaysSolicitDelay() + + accs := ` + accounts { + SYS: {users: [{user:sys, password: pwd}]} + USER: {users: [{user:user, password: pwd}]} + } + system_account: SYS + ` + gwUSConfTmpl := ` + %s + server_name: GW_US + listen: "127.0.0.1:-1" + gateway { + name: US + listen: "127.0.0.1:-1" + } + leafnodes { + listen: "127.0.0.1:-1" + } + ` + gwUSConf := createConfFile(t, []byte(fmt.Sprintf(gwUSConfTmpl, accs))) + gwUS, gwUSOpts := RunServerWithConfig(gwUSConf) + defer gwUS.Shutdown() + + gwEUConfTmpl := ` + %s + server_name: GW_EU + listen: "127.0.0.1:-1" + gateway { + name: EU + listen: "127.0.0.1:-1" + gateways [ + { + name: US + url: "nats://127.0.0.1:%d" + } + ] + } + leafnodes { + listen: "127.0.0.1:-1" + } + ` + gwEUConf := createConfFile(t, []byte(fmt.Sprintf(gwEUConfTmpl, accs, gwUSOpts.Gateway.Port))) + gwEU, gwEUOpts := RunServerWithConfig(gwEUConf) + defer gwEU.Shutdown() + + waitForOutboundGateways(t, gwUS, 1, time.Second) + waitForOutboundGateways(t, gwEU, 1, time.Second) + waitForInboundGateways(t, gwUS, 1, time.Second) + waitForInboundGateways(t, gwEU, 1, time.Second) + + leafConfTmpl := ` + %s + server_name: %s + listen: "127.0.0.1:-1" + leafnodes { + remotes [ + { + url: "nats://user:pwd@127.0.0.1:%d" + account: USER + } + ] + } + ` + leafUSConf := createConfFile(t, []byte(fmt.Sprintf(leafConfTmpl, accs, "LEAF_US", gwUSOpts.LeafNode.Port))) + leafUS, _ := RunServerWithConfig(leafUSConf) + defer leafUS.Shutdown() + checkLeafNodeConnected(t, leafUS) + + leafEUConf := createConfFile(t, []byte(fmt.Sprintf(leafConfTmpl, accs, "LEAF_EU", gwEUOpts.LeafNode.Port))) + leafEU, _ := RunServerWithConfig(leafEUConf) + defer leafEU.Shutdown() + checkLeafNodeConnected(t, leafEU) + + // Order is important! (see rest of test to understand why) + var usLeafQ, usLeafPS, usGWQ, usGWPS, euGWQ, euGWPS, euLeafQ, euLeafPS, euLeafQBaz atomic.Int32 + counters := []*atomic.Int32{&usLeafQ, &usLeafPS, &usGWQ, &usGWPS, &euGWQ, &euGWPS, &euLeafQ, &euLeafPS, &euLeafQBaz} + counterNames := []string{"usLeafQ", "usLeafPS", "usGWQ", "usGWPS", "euGWQ", "euGWPS", "euLeafQ", "euLeafPS", "euLeafQBaz"} + if len(counters) != len(counterNames) { + panic("Fix test!") + } + resetCounters := func() { + for _, a := range counters { + a.Store(0) + } + } + + // This test will always produce from the US leaf. + ncProd := natsConnect(t, leafUS.ClientURL(), nats.UserInfo("user", "pwd")) + defer ncProd.Close() + + total := int32(1) + check := func(t *testing.T, expected []int32) { + time.Sleep(50 * time.Millisecond) + resetCounters() + for i := 0; i < int(total); i++ { + natsPub(t, ncProd, "foo.1", []byte("hello")) + } + checkFor(t, 2*time.Second, 15*time.Millisecond, func() error { + for i := 0; i < len(expected); i++ { + if n := counters[i].Load(); n != expected[i] { + return fmt.Errorf("Expected counter %q to be %v, got %v", counterNames[i], expected[i], n) + } + } + return nil + }) + } + + // "usLeafQ", "usLeafPS", "usGWQ", "usGWPS", "euGWQ", "euGWPS", "euLeafQ", "euLeafPS", "euLeafQBaz" + for _, test := range []struct { + subs []int + expected []int32 + }{ + // We will always have the qsub on leaf EU, and have some permutations + // of queue and plain subs on other server(s) and check we get the + // expected distribution. + + // Simple test firs, qsubs on leaf US and leaf EU, all messages stay in leaf US. + { + []int{1, 0, 0, 0, 0, 0, 1, 0, 0}, + []int32{total, 0, 0, 0, 0, 0, 0, 0, 0}, + }, + // Move the queue sub from leaf US to GW US. + { + []int{0, 0, 1, 0, 0, 0, 1, 0, 0}, + []int32{0, 0, total, 0, 0, 0, 0, 0, 0}, + }, + // Now move it to GW EU. + { + []int{0, 0, 0, 0, 1, 0, 1, 0, 0}, + []int32{0, 0, 0, 0, total, 0, 0, 0, 0}, + }, + + // More combinations... + { + []int{1, 1, 0, 0, 0, 0, 1, 0, 0}, + []int32{total, total, 0, 0, 0, 0, 0, 0, 0}, + }, + { + []int{0, 1, 1, 0, 0, 0, 1, 0, 0}, + []int32{0, total, total, 0, 0, 0, 0, 0, 0}, + }, + { + []int{0, 1, 1, 1, 0, 0, 1, 0, 0}, + []int32{0, total, total, total, 0, 0, 0, 0, 0}, + }, + { + []int{0, 1, 0, 1, 1, 0, 1, 0, 0}, + []int32{0, total, 0, total, total, 0, 0, 0, 0}, + }, + { + []int{0, 1, 0, 1, 1, 1, 1, 0, 0}, + []int32{0, total, 0, total, total, total, 0, 0, 0}, + }, + // If we have the qsub in leaf US, does not matter if we have + // qsubs in GW US and EU, only leaf US should receive the messages, + // but plain sub in GW servers should get them too. + { + []int{1, 1, 1, 0, 0, 0, 1, 0, 0}, + []int32{total, total, 0, 0, 0, 0, 0, 0, 0}, + }, + { + []int{1, 1, 1, 1, 0, 0, 1, 0, 0}, + []int32{total, total, 0, total, 0, 0, 0, 0, 0}, + }, + { + []int{1, 1, 1, 1, 1, 0, 1, 0, 0}, + []int32{total, total, 0, total, 0, 0, 0, 0, 0}, + }, + { + []int{1, 1, 1, 1, 1, 1, 1, 0, 0}, + []int32{total, total, 0, total, 0, total, 0, 0, 0}, + }, + // Now back to a qsub on leaf US and leaf EU, but introduce plain sub + // interest in leaf EU + { + []int{1, 0, 0, 0, 0, 0, 1, 1, 0}, + []int32{total, 0, 0, 0, 0, 0, 0, total, 0}, + }, + // And add a different queue group in leaf EU and it should get the messages too. + { + []int{1, 0, 0, 0, 0, 0, 1, 1, 1}, + []int32{total, 0, 0, 0, 0, 0, 0, total, total}, + }, + // Keep plain and baz queue sub interests in leaf EU and add more combinations. + { + []int{1, 1, 0, 0, 0, 0, 1, 1, 1}, + []int32{total, total, 0, 0, 0, 0, 0, total, total}, + }, + { + []int{1, 1, 1, 0, 0, 0, 1, 1, 1}, + []int32{total, total, 0, 0, 0, 0, 0, total, total}, + }, + { + []int{1, 1, 1, 1, 0, 0, 1, 1, 1}, + []int32{total, total, 0, total, 0, 0, 0, total, total}, + }, + { + []int{1, 1, 1, 1, 1, 0, 1, 1, 1}, + []int32{total, total, 0, total, 0, 0, 0, total, total}, + }, + { + []int{1, 1, 1, 1, 1, 1, 1, 1, 1}, + []int32{total, total, 0, total, 0, total, 0, total, total}, + }, + } { + t.Run(_EMPTY_, func(t *testing.T) { + if len(test.subs) != len(counters) || len(test.expected) != len(counters) { + panic("Fix test") + } + + ncUS := natsConnect(t, leafUS.ClientURL(), nats.UserInfo("user", "pwd")) + defer ncUS.Close() + + ncGWUS := natsConnect(t, gwUS.ClientURL(), nats.UserInfo("user", "pwd")) + defer ncGWUS.Close() + + ncGWEU := natsConnect(t, gwEU.ClientURL(), nats.UserInfo("user", "pwd")) + defer ncGWEU.Close() + + ncEU := natsConnect(t, leafEU.ClientURL(), nats.UserInfo("user", "pwd")) + defer ncEU.Close() + + if test.subs[0] > 0 { + natsQueueSub(t, ncUS, "foo.*", "bar", func(_ *nats.Msg) { + usLeafQ.Add(1) + }) + } + if test.subs[1] > 0 { + natsSub(t, ncUS, "foo.>", func(_ *nats.Msg) { + usLeafPS.Add(1) + }) + } + natsFlush(t, ncUS) + if test.subs[2] > 0 { + natsQueueSub(t, ncGWUS, "foo.*", "bar", func(_ *nats.Msg) { + usGWQ.Add(1) + }) + } + if test.subs[3] > 0 { + natsSub(t, ncGWUS, "foo.>", func(_ *nats.Msg) { + usGWPS.Add(1) + }) + } + natsFlush(t, ncGWUS) + if test.subs[4] > 0 { + natsQueueSub(t, ncGWEU, "foo.*", "bar", func(_ *nats.Msg) { + euGWQ.Add(1) + }) + } + if test.subs[5] > 0 { + natsSub(t, ncGWEU, "foo.>", func(_ *nats.Msg) { + euGWPS.Add(1) + }) + } + natsFlush(t, ncGWEU) + if test.subs[6] > 0 { + natsQueueSub(t, ncEU, "foo.*", "bar", func(_ *nats.Msg) { + euLeafQ.Add(1) + }) + } + if test.subs[7] > 0 { + natsSub(t, ncEU, "foo.>", func(_ *nats.Msg) { + euLeafPS.Add(1) + }) + } + if test.subs[8] > 0 { + // Create on different group, called "baz" + natsQueueSub(t, ncEU, "foo.*", "baz", func(_ *nats.Msg) { + euLeafQBaz.Add(1) + }) + } + natsFlush(t, ncEU) + + // Check that we have what we expect. + check(t, test.expected) + }) + } +} + func TestLeafNodeQueueGroupWeightCorrectOnConnectionCloseInSuperCluster(t *testing.T) { SetGatewaysSolicitDelay(0) defer ResetGatewaysSolicitDelay() @@ -8044,6 +8327,8 @@ func TestLeafNodeWithWeightedDQRequestsToSuperClusterWithStreamImportAccounts(t subs2 = append(subs2, sub) nc.Flush() } + // Let's them propagate + time.Sleep(100 * time.Millisecond) defer closeSubs(subs1) defer closeSubs(subs2) @@ -8085,6 +8370,8 @@ func TestLeafNodeWithWeightedDQRequestsToSuperClusterWithStreamImportAccounts(t nc.Flush() rsubs = append(rsubs, sub) } + // Let's them propagate + time.Sleep(100 * time.Millisecond) nc, _ = jsClientConnect(t, ln.randomServer()) defer nc.Close() diff --git a/server/memstore.go b/server/memstore.go index 42939a76621..979bb55da27 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -1012,6 +1012,8 @@ func (ms *memStore) Compact(seq uint64) (uint64, error) { ms.removeSeqPerSubject(sm.subj, seq) // Must delete message after updating per-subject info, to be consistent with file store. delete(ms.msgs, seq) + } else if !ms.dmap.IsEmpty() { + ms.dmap.Delete(seq) } } if purged > ms.state.Msgs { @@ -1032,9 +1034,10 @@ func (ms *memStore) Compact(seq uint64) (uint64, error) { ms.state.FirstSeq = seq ms.state.FirstTime = time.Time{} ms.state.LastSeq = seq - 1 - // Reset msgs and fss. + // Reset msgs, fss and dmap. ms.msgs = make(map[uint64]*StoreMsg) ms.fss = stree.NewSubjectTree[SimpleState]() + ms.dmap.Empty() } ms.mu.Unlock() @@ -1066,9 +1069,10 @@ func (ms *memStore) reset() error { // Update msgs and bytes. ms.state.Msgs = 0 ms.state.Bytes = 0 - // Reset msgs and fss. + // Reset msgs, fss and dmap. ms.msgs = make(map[uint64]*StoreMsg) ms.fss = stree.NewSubjectTree[SimpleState]() + ms.dmap.Empty() ms.mu.Unlock() @@ -1102,6 +1106,8 @@ func (ms *memStore) Truncate(seq uint64) error { ms.removeSeqPerSubject(sm.subj, i) // Must delete message after updating per-subject info, to be consistent with file store. delete(ms.msgs, i) + } else if !ms.dmap.IsEmpty() { + ms.dmap.Delete(i) } } // Reset last. diff --git a/server/monitor.go b/server/monitor.go index f4afbcc2c87..1ce349bac95 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -831,6 +831,7 @@ func (s *Server) Routez(routezOpts *RoutezOptions) (*Routez, error) { OutBytes: r.outBytes, NumSubs: uint32(len(r.subs)), Import: r.opts.Import, + Pending: int(r.out.pb), Export: r.opts.Export, RTT: r.getRTT().String(), Start: r.start, diff --git a/server/norace_test.go b/server/norace_test.go index 6825ac452d9..8c9785f505c 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -11225,7 +11225,7 @@ func TestNoRaceJetStreamClusterCheckInterestStatePerformanceInterest(t *testing. } require_Equal(t, checkFloor(mset.lookupConsumer("A")), 1) - require_Equal(t, checkFloor(mset.lookupConsumer("B")), 100_001) + require_Equal(t, checkFloor(mset.lookupConsumer("B")), 90_001) require_Equal(t, checkFloor(mset.lookupConsumer("C")), 100_001) // This checks the chkflr state. For this test this should be much faster, diff --git a/server/opts.go b/server/opts.go index 6c3f4320260..3cbe5599a7f 100644 --- a/server/opts.go +++ b/server/opts.go @@ -1573,6 +1573,8 @@ func (o *Options) processConfigFileLine(k string, v any, errors *[]error, warnin } case "no_fast_producer_stall": o.NoFastProducerStall = v.(bool) + case "max_closed_clients": + o.MaxClosedClients = int(v.(int64)) default: if au := atomic.LoadInt32(&allowUnknownTopLevelField); au == 0 && !tk.IsUsedVariable() { err := &unknownConfigFieldErr{ diff --git a/server/opts_test.go b/server/opts_test.go index 78cf6a6174c..eba58706dcc 100644 --- a/server/opts_test.go +++ b/server/opts_test.go @@ -1354,6 +1354,13 @@ func TestPanic(t *testing.T) { } } +func TestMaxClosedClients(t *testing.T) { + conf := createConfFile(t, []byte(`max_closed_clients: 5`)) + opts, err := ProcessConfigFile(conf) + require_NoError(t, err) + require_Equal(t, opts.MaxClosedClients, 5) +} + func TestPingIntervalOld(t *testing.T) { conf := createConfFile(t, []byte(`ping_interval: 5`)) opts := &Options{} diff --git a/server/raft.go b/server/raft.go index 8723a4161e4..245e419492b 100644 --- a/server/raft.go +++ b/server/raft.go @@ -3443,8 +3443,13 @@ CONTINUE: if l > paeWarnThreshold && l%paeWarnModulo == 0 { n.warn("%d append entries pending", len(n.pae)) } - } else if l%paeWarnModulo == 0 { - n.debug("Not saving to append entries pending") + } else { + // Invalidate cache entry at this index, we might have + // stored it previously with a different value. + delete(n.pae, n.pindex) + if l%paeWarnModulo == 0 { + n.debug("Not saving to append entries pending") + } } } else { // This is a replay on startup so just take the appendEntry version. @@ -4019,11 +4024,10 @@ func (n *raft) processVoteRequest(vr *voteRequest) error { n.vote = vr.candidate n.writeTermVote() n.resetElectionTimeout() - } else { - if vr.term >= n.term && n.vote == noVote { - n.term = vr.term - n.resetElect(randCampaignTimeout()) - } + } else if n.vote == noVote && n.State() != Candidate { + // We have a more up-to-date log, and haven't voted yet. + // Start campaigning earlier, but only if not candidate already, as that would short-circuit us. + n.resetElect(randCampaignTimeout()) } // Term might have changed, make sure response has the most current diff --git a/server/raft_helpers_test.go b/server/raft_helpers_test.go index 4807efee283..2a1c541227b 100644 --- a/server/raft_helpers_test.go +++ b/server/raft_helpers_test.go @@ -320,7 +320,7 @@ func (a *stateAdder) snapshot(t *testing.T) { // Helper to wait for a certain state. func (rg smGroup) waitOnTotal(t *testing.T, expected int64) { t.Helper() - checkFor(t, 20*time.Second, 200*time.Millisecond, func() error { + checkFor(t, 5*time.Second, 200*time.Millisecond, func() error { for _, sm := range rg { asm := sm.(*stateAdder) if total := asm.total(); total != expected { diff --git a/server/raft_test.go b/server/raft_test.go index c32cf6f1244..468c1806731 100644 --- a/server/raft_test.go +++ b/server/raft_test.go @@ -526,6 +526,60 @@ func TestNRGUnsuccessfulVoteRequestDoesntResetElectionTimer(t *testing.T) { require_True(t, followerEqual) } +func TestNRGUnsuccessfulVoteRequestCampaignEarly(t *testing.T) { + n, cleanup := initSingleMemRaftNode(t) + defer cleanup() + + nats0 := "S1Nunr6R" // "nats-0" + n.etlr = time.Time{} + + // Simple case: we are follower and vote for a candidate. + require_NoError(t, n.processVoteRequest(&voteRequest{term: 1, lastTerm: 0, lastIndex: 0, candidate: nats0})) + require_Equal(t, n.term, 1) + require_Equal(t, n.vote, nats0) + require_NotEqual(t, n.etlr, time.Time{}) // Resets election timer as it voted. + n.etlr = time.Time{} + + // We are follower and deny vote for outdated candidate. + n.pterm, n.pindex = 1, 100 + require_NoError(t, n.processVoteRequest(&voteRequest{term: 2, lastTerm: 1, lastIndex: 2, candidate: nats0})) + require_Equal(t, n.term, 2) + require_Equal(t, n.vote, noVote) + require_NotEqual(t, n.etlr, time.Time{}) // Resets election timer as it starts campaigning. + n.etlr = time.Time{} + + // Switch to candidate. + n.pterm, n.pindex = 2, 200 + n.switchToCandidate() + require_Equal(t, n.term, 3) + require_Equal(t, n.State(), Candidate) + require_NotEqual(t, n.etlr, time.Time{}) // Resets election timer as part of switching state. + n.etlr = time.Time{} + + // We are candidate and deny vote for outdated candidate. But they were on a more recent term, restart campaign. + require_NoError(t, n.processVoteRequest(&voteRequest{term: 4, lastTerm: 1, lastIndex: 2, candidate: nats0})) + require_Equal(t, n.term, 4) + require_Equal(t, n.vote, noVote) + require_NotEqual(t, n.etlr, time.Time{}) // Resets election timer as it restarts campaigning. + n.etlr = time.Time{} + + // Switch to candidate. + n.pterm, n.pindex = 4, 400 + n.switchToCandidate() + require_Equal(t, n.term, 5) + require_Equal(t, n.State(), Candidate) + require_NotEqual(t, n.etlr, time.Time{}) // Resets election timer as part of switching state. + n.etlr = time.Time{} + + // We are candidate and deny vote for outdated candidate. Don't start campaigning early. + require_NoError(t, n.processVoteRequest(&voteRequest{term: 5, lastTerm: 1, lastIndex: 2, candidate: nats0})) + require_Equal(t, n.term, 5) + require_Equal(t, n.vote, noVote) + // Election timer must NOT be updated as that would mean another candidate that we don't vote + // for can short-circuit us by making us restart elections, denying us the ability to become leader. + require_Equal(t, n.etlr, time.Time{}) +} + func TestNRGInvalidTAVDoesntPanic(t *testing.T) { c := createJetStreamClusterExplicit(t, "R3S", 3) defer c.shutdown() @@ -1097,6 +1151,43 @@ func TestNRGTermNoDecreaseAfterWALReset(t *testing.T) { } } +func TestNRGPendingAppendEntryCacheInvalidation(t *testing.T) { + for _, test := range []struct { + title string + entries int + }{ + {title: "empty", entries: 1}, + {title: "at limit", entries: paeDropThreshold}, + {title: "full", entries: paeDropThreshold + 1}, + } { + t.Run(test.title, func(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + rg := c.createMemRaftGroup("TEST", 3, newStateAdder) + rg.waitOnLeader() + l := rg.leader() + + l.(*stateAdder).proposeDelta(1) + rg.waitOnTotal(t, 1) + + // Fill up the cache with N entries. + // The contents don't matter as they should never be applied. + rg.lockAll() + for _, s := range rg { + n := s.node().(*raft) + for i := 0; i < test.entries; i++ { + n.pae[n.pindex+uint64(1+i)] = newAppendEntry("", 0, 0, 0, 0, nil) + } + } + rg.unlockAll() + + l.(*stateAdder).proposeDelta(1) + rg.waitOnTotal(t, 2) + }) + } +} + func TestNRGCatchupDoesNotTruncateUncommittedEntriesWithQuorum(t *testing.T) { n, cleanup := initSingleMemRaftNode(t) defer cleanup() diff --git a/server/store_test.go b/server/store_test.go index 142cf461cbe..fa5be7befed 100644 --- a/server/store_test.go +++ b/server/store_test.go @@ -302,3 +302,105 @@ func TestStoreMaxMsgsPerUpdateBug(t *testing.T) { }, ) } + +func TestStoreCompactCleansUpDmap(t *testing.T) { + config := func() StreamConfig { + return StreamConfig{Name: "TEST", Subjects: []string{"foo"}, MaxMsgsPer: 0} + } + for cseq := uint64(2); cseq <= 4; cseq++ { + t.Run(fmt.Sprintf("Compact(%d)", cseq), func(t *testing.T) { + testAllStoreAllPermutations( + t, false, config(), + func(t *testing.T, fs StreamStore) { + dmapEntries := func() int { + if fss, ok := fs.(*fileStore); ok { + return fss.dmapEntries() + } else if mss, ok := fs.(*memStore); ok { + mss.mu.RLock() + defer mss.mu.RUnlock() + return mss.dmap.Size() + } else { + return 0 + } + } + + // Publish messages, should have no interior deletes. + for i := 0; i < 3; i++ { + _, _, err := fs.StoreMsg("foo", nil, nil) + require_NoError(t, err) + } + require_Len(t, dmapEntries(), 0) + + // Removing one message in the middle should be an interior delete. + _, err := fs.RemoveMsg(2) + require_NoError(t, err) + require_Len(t, dmapEntries(), 1) + + // Compacting must always clean up the interior delete. + _, err = fs.Compact(cseq) + require_NoError(t, err) + require_Len(t, dmapEntries(), 0) + + // Validate first/last sequence. + state := fs.State() + fseq := uint64(3) + if fseq < cseq { + fseq = cseq + } + require_Equal(t, state.FirstSeq, fseq) + require_Equal(t, state.LastSeq, 3) + }) + }) + } +} + +func TestStoreTruncateCleansUpDmap(t *testing.T) { + config := func() StreamConfig { + return StreamConfig{Name: "TEST", Subjects: []string{"foo"}, MaxMsgsPer: 0} + } + for tseq := uint64(0); tseq <= 1; tseq++ { + t.Run(fmt.Sprintf("Truncate(%d)", tseq), func(t *testing.T) { + testAllStoreAllPermutations( + t, false, config(), + func(t *testing.T, fs StreamStore) { + dmapEntries := func() int { + if fss, ok := fs.(*fileStore); ok { + return fss.dmapEntries() + } else if mss, ok := fs.(*memStore); ok { + mss.mu.RLock() + defer mss.mu.RUnlock() + return mss.dmap.Size() + } else { + return 0 + } + } + + // Publish messages, should have no interior deletes. + for i := 0; i < 3; i++ { + _, _, err := fs.StoreMsg("foo", nil, nil) + require_NoError(t, err) + } + require_Len(t, dmapEntries(), 0) + + // Removing one message in the middle should be an interior delete. + _, err := fs.RemoveMsg(2) + require_NoError(t, err) + require_Len(t, dmapEntries(), 1) + + // Truncating must always clean up the interior delete. + err = fs.Truncate(tseq) + require_NoError(t, err) + require_Len(t, dmapEntries(), 0) + + // Validate first/last sequence. + state := fs.State() + fseq := uint64(1) + if fseq > tseq { + fseq = tseq + } + require_Equal(t, state.FirstSeq, fseq) + require_Equal(t, state.LastSeq, tseq) + }) + }) + } +} diff --git a/server/stream.go b/server/stream.go index a27ed9d495c..e7d7512e42d 100644 --- a/server/stream.go +++ b/server/stream.go @@ -5683,16 +5683,17 @@ func (mset *stream) clearPreAck(o *consumer, seq uint64) { } // ackMsg is called into from a consumer when we have a WorkQueue or Interest Retention Policy. -func (mset *stream) ackMsg(o *consumer, seq uint64) { +// Returns whether the message at seq was removed as a result of the ACK. +func (mset *stream) ackMsg(o *consumer, seq uint64) bool { if seq == 0 { - return + return false } // Don't make this RLock(). We need to have only 1 running at a time to gauge interest across all consumers. mset.mu.Lock() if mset.closed.Load() || mset.cfg.Retention == LimitsPolicy { mset.mu.Unlock() - return + return false } store := mset.store @@ -5703,7 +5704,9 @@ func (mset *stream) ackMsg(o *consumer, seq uint64) { if seq > state.LastSeq { mset.registerPreAck(o, seq) mset.mu.Unlock() - return + // We have not removed the message, but should still signal so we could retry later + // since we potentially need to remove it then. + return true } // Always clear pre-ack if here. @@ -5712,7 +5715,7 @@ func (mset *stream) ackMsg(o *consumer, seq uint64) { // Make sure this sequence is not below our first sequence. if seq < state.FirstSeq { mset.mu.Unlock() - return + return false } var shouldRemove bool @@ -5728,7 +5731,7 @@ func (mset *stream) ackMsg(o *consumer, seq uint64) { // If nothing else to do. if !shouldRemove { - return + return false } // If we are here we should attempt to remove. @@ -5736,6 +5739,7 @@ func (mset *stream) ackMsg(o *consumer, seq uint64) { // This should not happen, but being pedantic. mset.registerPreAckLock(o, seq) } + return true } // Snapshot creates a snapshot for the stream and possibly consumers.