From d9aa41d9f08c626944ae57f1fd429c1cd506c61e Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Thu, 13 Feb 2025 11:04:14 +0100 Subject: [PATCH 1/9] [FIXED] Recreate consumer on meta snapshot Signed-off-by: Maurice van Veen --- server/jetstream_cluster.go | 7 +- server/jetstream_cluster_1_test.go | 110 +++++++++++++++++++++++++++++ 2 files changed, 114 insertions(+), 3 deletions(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 3632b3fe0ea..c1d7d255d92 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -1587,10 +1587,11 @@ func (js *jetStream) applyMetaSnapshot(buf []byte, ru *recoveryUpdates, isRecove } if osa := js.streamAssignment(sa.Client.serviceAccount(), sa.Config.Name); osa != nil { for _, ca := range osa.consumers { - if sa.consumers[ca.Name] == nil { + // Consumer was either removed, or recreated with a different raft group. + if nca := sa.consumers[ca.Name]; nca == nil { + caDel = append(caDel, ca) + } else if nca.Group != nil && ca.Group != nil && nca.Group.Name != ca.Group.Name { caDel = append(caDel, ca) - } else { - caAdd = append(caAdd, ca) } } } diff --git a/server/jetstream_cluster_1_test.go b/server/jetstream_cluster_1_test.go index 48e89087037..34caaa4c7ed 100644 --- a/server/jetstream_cluster_1_test.go +++ b/server/jetstream_cluster_1_test.go @@ -7555,6 +7555,116 @@ func TestJetStreamClusterAccountStatsForReplicatedStreams(t *testing.T) { require_True(t, accStats.Sent.Bytes >= accStats.Received.Bytes*4) } +func TestJetStreamClusterRecreateConsumerFromMetaSnapshot(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + // Initial setup. + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 3, + }) + require_NoError(t, err) + _, err = js.Publish("foo", nil) + require_NoError(t, err) + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{Durable: "CONSUMER"}) + require_NoError(t, err) + + // Wait for all servers to be fully up-to-date. + checkFor(t, 2*time.Second, 500*time.Millisecond, func() error { + if err = checkState(t, c, globalAccountName, "TEST"); err != nil { + return err + } + for _, s := range c.servers { + if acc, err := s.lookupAccount(globalAccountName); err != nil { + return err + } else if mset, err := acc.lookupStream("TEST"); err != nil { + return err + } else if o := mset.lookupConsumer("CONSUMER"); o == nil { + return errors.New("consumer doesn't exist") + } + } + return nil + }) + + // Shutdown a random server. + rs := c.randomServer() + rs.Shutdown() + rs.WaitForShutdown() + + // Recreate connection, since we could have shutdown the server we were connected to. + nc.Close() + c.waitOnLeader() + nc, js = jsClientConnect(t, c.randomServer()) + defer nc.Close() + + // Recreate consumer. + require_NoError(t, js.DeleteConsumer("TEST", "CONSUMER")) + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{Durable: "CONSUMER"}) + require_NoError(t, err) + + // Wait for all servers (except for the one that's down) to have recreated the consumer. + var consumerRg string + checkFor(t, 2*time.Second, 500*time.Millisecond, func() error { + consumerRg = _EMPTY_ + for _, s := range c.servers { + if s == rs { + continue + } + if acc, err := s.lookupAccount(globalAccountName); err != nil { + return err + } else if mset, err := acc.lookupStream("TEST"); err != nil { + return err + } else if o := mset.lookupConsumer("CONSUMER"); o == nil { + return errors.New("consumer doesn't exist") + } else if ccrg := o.raftNode().Group(); consumerRg == _EMPTY_ { + consumerRg = ccrg + } else if consumerRg != ccrg { + return errors.New("consumer raft groups don't match") + } + } + return nil + }) + + // Install snapshots on all remaining servers to "hide" the intermediate consumer recreate requests. + for _, s := range c.servers { + if s != rs { + sjs := s.getJetStream() + require_NotNil(t, sjs) + snap, err := sjs.metaSnapshot() + require_NoError(t, err) + sjs.mu.RLock() + meta := sjs.cluster.meta + sjs.mu.RUnlock() + require_NoError(t, meta.InstallSnapshot(snap)) + } + } + + // Restart the server, it should receive a meta snapshot and recognize the consumer recreation. + rs = c.restartServer(rs) + checkFor(t, 2*time.Second, 500*time.Millisecond, func() error { + consumerRg = _EMPTY_ + for _, s := range c.servers { + if acc, err := s.lookupAccount(globalAccountName); err != nil { + return err + } else if mset, err := acc.lookupStream("TEST"); err != nil { + return err + } else if o := mset.lookupConsumer("CONSUMER"); o == nil { + return errors.New("consumer doesn't exist") + } else if ccrg := o.raftNode().Group(); consumerRg == _EMPTY_ { + consumerRg = ccrg + } else if consumerRg != ccrg { + return errors.New("consumer raft groups don't match") + } + } + return nil + }) +} + // // DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times) // Add at the end of jetstream_cluster__test.go, with being the highest value. From abd29b37ab307d3663c2ec20d6cf2e31b46d71ac Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Mon, 17 Feb 2025 16:19:01 +0000 Subject: [PATCH 2/9] Handle max_closed_clients property (#6497) Fixes: https://github.com/nats-io/nats-server/issues/6496 Signed-off-by: Yevhen Surovskyi --- server/opts.go | 2 ++ server/opts_test.go | 7 +++++++ 2 files changed, 9 insertions(+) diff --git a/server/opts.go b/server/opts.go index 6c3f4320260..3cbe5599a7f 100644 --- a/server/opts.go +++ b/server/opts.go @@ -1573,6 +1573,8 @@ func (o *Options) processConfigFileLine(k string, v any, errors *[]error, warnin } case "no_fast_producer_stall": o.NoFastProducerStall = v.(bool) + case "max_closed_clients": + o.MaxClosedClients = int(v.(int64)) default: if au := atomic.LoadInt32(&allowUnknownTopLevelField); au == 0 && !tk.IsUsedVariable() { err := &unknownConfigFieldErr{ diff --git a/server/opts_test.go b/server/opts_test.go index 78cf6a6174c..eba58706dcc 100644 --- a/server/opts_test.go +++ b/server/opts_test.go @@ -1354,6 +1354,13 @@ func TestPanic(t *testing.T) { } } +func TestMaxClosedClients(t *testing.T) { + conf := createConfFile(t, []byte(`max_closed_clients: 5`)) + opts, err := ProcessConfigFile(conf) + require_NoError(t, err) + require_Equal(t, opts.MaxClosedClients, 5) +} + func TestPingIntervalOld(t *testing.T) { conf := createConfFile(t, []byte(`ping_interval: 5`)) opts := &Options{} From c0e41a9682e62bde797fd259b83ea12f2f323c08 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 13 Feb 2025 13:44:08 -0500 Subject: [PATCH 3/9] Populate pending_bytes in routez (#6476) --- server/monitor.go | 1 + 1 file changed, 1 insertion(+) diff --git a/server/monitor.go b/server/monitor.go index f4afbcc2c87..1ce349bac95 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -831,6 +831,7 @@ func (s *Server) Routez(routezOpts *RoutezOptions) (*Routez, error) { OutBytes: r.outBytes, NumSubs: uint32(len(r.subs)), Import: r.opts.Import, + Pending: int(r.out.pb), Export: r.opts.Export, RTT: r.getRTT().String(), Start: r.start, From 8c6ba9527fafc5ffb2b02d555046489c9ed61b55 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Thu, 13 Feb 2025 15:36:36 +0100 Subject: [PATCH 4/9] NRG: Campaign early unless already candidate Signed-off-by: Maurice van Veen --- server/raft.go | 9 ++++---- server/raft_test.go | 54 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 58 insertions(+), 5 deletions(-) diff --git a/server/raft.go b/server/raft.go index 8723a4161e4..7fc86b92550 100644 --- a/server/raft.go +++ b/server/raft.go @@ -4019,11 +4019,10 @@ func (n *raft) processVoteRequest(vr *voteRequest) error { n.vote = vr.candidate n.writeTermVote() n.resetElectionTimeout() - } else { - if vr.term >= n.term && n.vote == noVote { - n.term = vr.term - n.resetElect(randCampaignTimeout()) - } + } else if n.vote == noVote && n.State() != Candidate { + // We have a more up-to-date log, and haven't voted yet. + // Start campaigning earlier, but only if not candidate already, as that would short-circuit us. + n.resetElect(randCampaignTimeout()) } // Term might have changed, make sure response has the most current diff --git a/server/raft_test.go b/server/raft_test.go index c32cf6f1244..152574cb122 100644 --- a/server/raft_test.go +++ b/server/raft_test.go @@ -526,6 +526,60 @@ func TestNRGUnsuccessfulVoteRequestDoesntResetElectionTimer(t *testing.T) { require_True(t, followerEqual) } +func TestNRGUnsuccessfulVoteRequestCampaignEarly(t *testing.T) { + n, cleanup := initSingleMemRaftNode(t) + defer cleanup() + + nats0 := "S1Nunr6R" // "nats-0" + n.etlr = time.Time{} + + // Simple case: we are follower and vote for a candidate. + require_NoError(t, n.processVoteRequest(&voteRequest{term: 1, lastTerm: 0, lastIndex: 0, candidate: nats0})) + require_Equal(t, n.term, 1) + require_Equal(t, n.vote, nats0) + require_NotEqual(t, n.etlr, time.Time{}) // Resets election timer as it voted. + n.etlr = time.Time{} + + // We are follower and deny vote for outdated candidate. + n.pterm, n.pindex = 1, 100 + require_NoError(t, n.processVoteRequest(&voteRequest{term: 2, lastTerm: 1, lastIndex: 2, candidate: nats0})) + require_Equal(t, n.term, 2) + require_Equal(t, n.vote, noVote) + require_NotEqual(t, n.etlr, time.Time{}) // Resets election timer as it starts campaigning. + n.etlr = time.Time{} + + // Switch to candidate. + n.pterm, n.pindex = 2, 200 + n.switchToCandidate() + require_Equal(t, n.term, 3) + require_Equal(t, n.State(), Candidate) + require_NotEqual(t, n.etlr, time.Time{}) // Resets election timer as part of switching state. + n.etlr = time.Time{} + + // We are candidate and deny vote for outdated candidate. But they were on a more recent term, restart campaign. + require_NoError(t, n.processVoteRequest(&voteRequest{term: 4, lastTerm: 1, lastIndex: 2, candidate: nats0})) + require_Equal(t, n.term, 4) + require_Equal(t, n.vote, noVote) + require_NotEqual(t, n.etlr, time.Time{}) // Resets election timer as it restarts campaigning. + n.etlr = time.Time{} + + // Switch to candidate. + n.pterm, n.pindex = 4, 400 + n.switchToCandidate() + require_Equal(t, n.term, 5) + require_Equal(t, n.State(), Candidate) + require_NotEqual(t, n.etlr, time.Time{}) // Resets election timer as part of switching state. + n.etlr = time.Time{} + + // We are candidate and deny vote for outdated candidate. Don't start campaigning early. + require_NoError(t, n.processVoteRequest(&voteRequest{term: 5, lastTerm: 1, lastIndex: 2, candidate: nats0})) + require_Equal(t, n.term, 5) + require_Equal(t, n.vote, noVote) + // Election timer must NOT be updated as that would mean another candidate that we don't vote + // for can short-circuit us by making us restart elections, denying us the ability to become leader. + require_Equal(t, n.etlr, time.Time{}) +} + func TestNRGInvalidTAVDoesntPanic(t *testing.T) { c := createJetStreamClusterExplicit(t, "R3S", 3) defer c.shutdown() From fe48728e9b0ef91c6e897b800edcea4b8c50c348 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Fri, 14 Feb 2025 17:35:41 +0100 Subject: [PATCH 5/9] NRG: Invalidate pending append entries cache Signed-off-by: Maurice van Veen --- server/raft.go | 9 +++++++-- server/raft_helpers_test.go | 2 +- server/raft_test.go | 37 +++++++++++++++++++++++++++++++++++++ 3 files changed, 45 insertions(+), 3 deletions(-) diff --git a/server/raft.go b/server/raft.go index 7fc86b92550..245e419492b 100644 --- a/server/raft.go +++ b/server/raft.go @@ -3443,8 +3443,13 @@ CONTINUE: if l > paeWarnThreshold && l%paeWarnModulo == 0 { n.warn("%d append entries pending", len(n.pae)) } - } else if l%paeWarnModulo == 0 { - n.debug("Not saving to append entries pending") + } else { + // Invalidate cache entry at this index, we might have + // stored it previously with a different value. + delete(n.pae, n.pindex) + if l%paeWarnModulo == 0 { + n.debug("Not saving to append entries pending") + } } } else { // This is a replay on startup so just take the appendEntry version. diff --git a/server/raft_helpers_test.go b/server/raft_helpers_test.go index 4807efee283..2a1c541227b 100644 --- a/server/raft_helpers_test.go +++ b/server/raft_helpers_test.go @@ -320,7 +320,7 @@ func (a *stateAdder) snapshot(t *testing.T) { // Helper to wait for a certain state. func (rg smGroup) waitOnTotal(t *testing.T, expected int64) { t.Helper() - checkFor(t, 20*time.Second, 200*time.Millisecond, func() error { + checkFor(t, 5*time.Second, 200*time.Millisecond, func() error { for _, sm := range rg { asm := sm.(*stateAdder) if total := asm.total(); total != expected { diff --git a/server/raft_test.go b/server/raft_test.go index 152574cb122..468c1806731 100644 --- a/server/raft_test.go +++ b/server/raft_test.go @@ -1151,6 +1151,43 @@ func TestNRGTermNoDecreaseAfterWALReset(t *testing.T) { } } +func TestNRGPendingAppendEntryCacheInvalidation(t *testing.T) { + for _, test := range []struct { + title string + entries int + }{ + {title: "empty", entries: 1}, + {title: "at limit", entries: paeDropThreshold}, + {title: "full", entries: paeDropThreshold + 1}, + } { + t.Run(test.title, func(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + rg := c.createMemRaftGroup("TEST", 3, newStateAdder) + rg.waitOnLeader() + l := rg.leader() + + l.(*stateAdder).proposeDelta(1) + rg.waitOnTotal(t, 1) + + // Fill up the cache with N entries. + // The contents don't matter as they should never be applied. + rg.lockAll() + for _, s := range rg { + n := s.node().(*raft) + for i := 0; i < test.entries; i++ { + n.pae[n.pindex+uint64(1+i)] = newAppendEntry("", 0, 0, 0, 0, nil) + } + } + rg.unlockAll() + + l.(*stateAdder).proposeDelta(1) + rg.waitOnTotal(t, 2) + }) + } +} + func TestNRGCatchupDoesNotTruncateUncommittedEntriesWithQuorum(t *testing.T) { n, cleanup := initSingleMemRaftNode(t) defer cleanup() From 19dc65d0718ce8dd82a050bfa84704fb7e1cc3af Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Mon, 17 Feb 2025 11:14:06 -0700 Subject: [PATCH 6/9] [FIXED] LeafNode: possible delivery to several DQ members across Gateway If the same queue group has members running on different leafnodes connected through a gateway, it was possible for a message to be delivered to several members running on different leaf nodes if there was an interest (either plain subscription or for other queue groups) that made the produce message travel through the gateway. Signed-off-by: Ivan Kozlovic --- server/client.go | 6 +- server/gateway.go | 24 +++- server/leafnode.go | 2 +- server/leafnode_test.go | 287 ++++++++++++++++++++++++++++++++++++++++ 4 files changed, 313 insertions(+), 6 deletions(-) diff --git a/server/client.go b/server/client.go index 774468d8e95..3304d534894 100644 --- a/server/client.go +++ b/server/client.go @@ -3970,7 +3970,7 @@ func (c *client) processInboundClientMsg(msg []byte) (bool, bool) { reply = append(reply, '@') reply = append(reply, c.pa.deliver...) } - didDeliver = c.sendMsgToGateways(acc, msg, c.pa.subject, reply, qnames) || didDeliver + didDeliver = c.sendMsgToGateways(acc, msg, c.pa.subject, reply, qnames, false) || didDeliver } // Check to see if we did not deliver to anyone and the client has a reply subject set @@ -4017,7 +4017,7 @@ func (c *client) handleGWReplyMap(msg []byte) bool { reply = append(reply, '@') reply = append(reply, c.pa.deliver...) } - c.sendMsgToGateways(c.acc, msg, c.pa.subject, reply, nil) + c.sendMsgToGateways(c.acc, msg, c.pa.subject, reply, nil, false) } return true } @@ -4366,7 +4366,7 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt flags |= pmrCollectQueueNames var queues [][]byte didDeliver, queues = c.processMsgResults(siAcc, rr, msg, c.pa.deliver, []byte(to), nrr, flags) - didDeliver = c.sendMsgToGateways(siAcc, msg, []byte(to), nrr, queues) || didDeliver + didDeliver = c.sendMsgToGateways(siAcc, msg, []byte(to), nrr, queues, false) || didDeliver } else { didDeliver, _ = c.processMsgResults(siAcc, rr, msg, c.pa.deliver, []byte(to), nrr, flags) } diff --git a/server/gateway.go b/server/gateway.go index d73a3b2842b..22f0e417bdd 100644 --- a/server/gateway.go +++ b/server/gateway.go @@ -2499,8 +2499,13 @@ var subPool = &sync.Pool{ // that the message is not sent to a given gateway if for instance // it is known that this gateway has no interest in the account or // subject, etc.. +// When invoked from a LEAF connection, `checkLeafQF` should be passed as `true` +// so that we skip any queue subscription interest that is not part of the +// `c.pa.queues` filter (similar to what we do in `processMsgResults`). However, +// when processing service imports, then this boolean should be passes as `false`, +// regardless if it is a LEAF connection or not. // -func (c *client) sendMsgToGateways(acc *Account, msg, subject, reply []byte, qgroups [][]byte) bool { +func (c *client) sendMsgToGateways(acc *Account, msg, subject, reply []byte, qgroups [][]byte, checkLeafQF bool) bool { // We had some times when we were sending across a GW with no subject, and the other side would break // due to parser error. These need to be fixed upstream but also double check here. if len(subject) == 0 { @@ -2577,6 +2582,21 @@ func (c *client) sendMsgToGateways(acc *Account, msg, subject, reply []byte, qgr qsubs := qr.qsubs[i] if len(qsubs) > 0 { queue := qsubs[0].queue + if checkLeafQF { + // Skip any queue that is not in the leaf's queue filter. + skip := true + for _, qn := range c.pa.queues { + if bytes.Equal(queue, qn) { + skip = false + break + } + } + if skip { + continue + } + // Now we still need to check that it was not delivered + // locally by checking the given `qgroups`. + } add := true for _, qn := range qgroups { if bytes.Equal(queue, qn) { @@ -2969,7 +2989,7 @@ func (c *client) handleGatewayReply(msg []byte) (processed bool) { // we now need to send the message with the real subject to // gateways in case they have interest on that reply subject. if !isServiceReply { - c.sendMsgToGateways(acc, msg, c.pa.subject, c.pa.reply, queues) + c.sendMsgToGateways(acc, msg, c.pa.subject, c.pa.reply, queues, false) } } else if c.kind == GATEWAY { // Only if we are a gateway connection should we try to route diff --git a/server/leafnode.go b/server/leafnode.go index c95d39340fd..68db0c8c557 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -2781,7 +2781,7 @@ func (c *client) processInboundLeafMsg(msg []byte) { // Now deal with gateways if c.srv.gateway.enabled { - c.sendMsgToGateways(acc, msg, c.pa.subject, c.pa.reply, qnames) + c.sendMsgToGateways(acc, msg, c.pa.subject, c.pa.reply, qnames, true) } } diff --git a/server/leafnode_test.go b/server/leafnode_test.go index 7c2b228608c..74854e1fdef 100644 --- a/server/leafnode_test.go +++ b/server/leafnode_test.go @@ -4554,6 +4554,289 @@ func TestLeafNodeQueueGroupDistributionWithDaisyChainAndGateway(t *testing.T) { } } +func TestLeafNodeAndGatewaysSingleMsgPerQueueGroup(t *testing.T) { + SetGatewaysSolicitDelay(0) + defer ResetGatewaysSolicitDelay() + + accs := ` + accounts { + SYS: {users: [{user:sys, password: pwd}]} + USER: {users: [{user:user, password: pwd}]} + } + system_account: SYS + ` + gwUSConfTmpl := ` + %s + server_name: GW_US + listen: "127.0.0.1:-1" + gateway { + name: US + listen: "127.0.0.1:-1" + } + leafnodes { + listen: "127.0.0.1:-1" + } + ` + gwUSConf := createConfFile(t, []byte(fmt.Sprintf(gwUSConfTmpl, accs))) + gwUS, gwUSOpts := RunServerWithConfig(gwUSConf) + defer gwUS.Shutdown() + + gwEUConfTmpl := ` + %s + server_name: GW_EU + listen: "127.0.0.1:-1" + gateway { + name: EU + listen: "127.0.0.1:-1" + gateways [ + { + name: US + url: "nats://127.0.0.1:%d" + } + ] + } + leafnodes { + listen: "127.0.0.1:-1" + } + ` + gwEUConf := createConfFile(t, []byte(fmt.Sprintf(gwEUConfTmpl, accs, gwUSOpts.Gateway.Port))) + gwEU, gwEUOpts := RunServerWithConfig(gwEUConf) + defer gwEU.Shutdown() + + waitForOutboundGateways(t, gwUS, 1, time.Second) + waitForOutboundGateways(t, gwEU, 1, time.Second) + waitForInboundGateways(t, gwUS, 1, time.Second) + waitForInboundGateways(t, gwEU, 1, time.Second) + + leafConfTmpl := ` + %s + server_name: %s + listen: "127.0.0.1:-1" + leafnodes { + remotes [ + { + url: "nats://user:pwd@127.0.0.1:%d" + account: USER + } + ] + } + ` + leafUSConf := createConfFile(t, []byte(fmt.Sprintf(leafConfTmpl, accs, "LEAF_US", gwUSOpts.LeafNode.Port))) + leafUS, _ := RunServerWithConfig(leafUSConf) + defer leafUS.Shutdown() + checkLeafNodeConnected(t, leafUS) + + leafEUConf := createConfFile(t, []byte(fmt.Sprintf(leafConfTmpl, accs, "LEAF_EU", gwEUOpts.LeafNode.Port))) + leafEU, _ := RunServerWithConfig(leafEUConf) + defer leafEU.Shutdown() + checkLeafNodeConnected(t, leafEU) + + // Order is important! (see rest of test to understand why) + var usLeafQ, usLeafPS, usGWQ, usGWPS, euGWQ, euGWPS, euLeafQ, euLeafPS, euLeafQBaz atomic.Int32 + counters := []*atomic.Int32{&usLeafQ, &usLeafPS, &usGWQ, &usGWPS, &euGWQ, &euGWPS, &euLeafQ, &euLeafPS, &euLeafQBaz} + counterNames := []string{"usLeafQ", "usLeafPS", "usGWQ", "usGWPS", "euGWQ", "euGWPS", "euLeafQ", "euLeafPS", "euLeafQBaz"} + if len(counters) != len(counterNames) { + panic("Fix test!") + } + resetCounters := func() { + for _, a := range counters { + a.Store(0) + } + } + + // This test will always produce from the US leaf. + ncProd := natsConnect(t, leafUS.ClientURL(), nats.UserInfo("user", "pwd")) + defer ncProd.Close() + + total := int32(1) + check := func(t *testing.T, expected []int32) { + time.Sleep(50 * time.Millisecond) + resetCounters() + for i := 0; i < int(total); i++ { + natsPub(t, ncProd, "foo.1", []byte("hello")) + } + checkFor(t, 2*time.Second, 15*time.Millisecond, func() error { + for i := 0; i < len(expected); i++ { + if n := counters[i].Load(); n != expected[i] { + return fmt.Errorf("Expected counter %q to be %v, got %v", counterNames[i], expected[i], n) + } + } + return nil + }) + } + + // "usLeafQ", "usLeafPS", "usGWQ", "usGWPS", "euGWQ", "euGWPS", "euLeafQ", "euLeafPS", "euLeafQBaz" + for _, test := range []struct { + subs []int + expected []int32 + }{ + // We will always have the qsub on leaf EU, and have some permutations + // of queue and plain subs on other server(s) and check we get the + // expected distribution. + + // Simple test firs, qsubs on leaf US and leaf EU, all messages stay in leaf US. + { + []int{1, 0, 0, 0, 0, 0, 1, 0, 0}, + []int32{total, 0, 0, 0, 0, 0, 0, 0, 0}, + }, + // Move the queue sub from leaf US to GW US. + { + []int{0, 0, 1, 0, 0, 0, 1, 0, 0}, + []int32{0, 0, total, 0, 0, 0, 0, 0, 0}, + }, + // Now move it to GW EU. + { + []int{0, 0, 0, 0, 1, 0, 1, 0, 0}, + []int32{0, 0, 0, 0, total, 0, 0, 0, 0}, + }, + + // More combinations... + { + []int{1, 1, 0, 0, 0, 0, 1, 0, 0}, + []int32{total, total, 0, 0, 0, 0, 0, 0, 0}, + }, + { + []int{0, 1, 1, 0, 0, 0, 1, 0, 0}, + []int32{0, total, total, 0, 0, 0, 0, 0, 0}, + }, + { + []int{0, 1, 1, 1, 0, 0, 1, 0, 0}, + []int32{0, total, total, total, 0, 0, 0, 0, 0}, + }, + { + []int{0, 1, 0, 1, 1, 0, 1, 0, 0}, + []int32{0, total, 0, total, total, 0, 0, 0, 0}, + }, + { + []int{0, 1, 0, 1, 1, 1, 1, 0, 0}, + []int32{0, total, 0, total, total, total, 0, 0, 0}, + }, + // If we have the qsub in leaf US, does not matter if we have + // qsubs in GW US and EU, only leaf US should receive the messages, + // but plain sub in GW servers should get them too. + { + []int{1, 1, 1, 0, 0, 0, 1, 0, 0}, + []int32{total, total, 0, 0, 0, 0, 0, 0, 0}, + }, + { + []int{1, 1, 1, 1, 0, 0, 1, 0, 0}, + []int32{total, total, 0, total, 0, 0, 0, 0, 0}, + }, + { + []int{1, 1, 1, 1, 1, 0, 1, 0, 0}, + []int32{total, total, 0, total, 0, 0, 0, 0, 0}, + }, + { + []int{1, 1, 1, 1, 1, 1, 1, 0, 0}, + []int32{total, total, 0, total, 0, total, 0, 0, 0}, + }, + // Now back to a qsub on leaf US and leaf EU, but introduce plain sub + // interest in leaf EU + { + []int{1, 0, 0, 0, 0, 0, 1, 1, 0}, + []int32{total, 0, 0, 0, 0, 0, 0, total, 0}, + }, + // And add a different queue group in leaf EU and it should get the messages too. + { + []int{1, 0, 0, 0, 0, 0, 1, 1, 1}, + []int32{total, 0, 0, 0, 0, 0, 0, total, total}, + }, + // Keep plain and baz queue sub interests in leaf EU and add more combinations. + { + []int{1, 1, 0, 0, 0, 0, 1, 1, 1}, + []int32{total, total, 0, 0, 0, 0, 0, total, total}, + }, + { + []int{1, 1, 1, 0, 0, 0, 1, 1, 1}, + []int32{total, total, 0, 0, 0, 0, 0, total, total}, + }, + { + []int{1, 1, 1, 1, 0, 0, 1, 1, 1}, + []int32{total, total, 0, total, 0, 0, 0, total, total}, + }, + { + []int{1, 1, 1, 1, 1, 0, 1, 1, 1}, + []int32{total, total, 0, total, 0, 0, 0, total, total}, + }, + { + []int{1, 1, 1, 1, 1, 1, 1, 1, 1}, + []int32{total, total, 0, total, 0, total, 0, total, total}, + }, + } { + t.Run(_EMPTY_, func(t *testing.T) { + if len(test.subs) != len(counters) || len(test.expected) != len(counters) { + panic("Fix test") + } + + ncUS := natsConnect(t, leafUS.ClientURL(), nats.UserInfo("user", "pwd")) + defer ncUS.Close() + + ncGWUS := natsConnect(t, gwUS.ClientURL(), nats.UserInfo("user", "pwd")) + defer ncGWUS.Close() + + ncGWEU := natsConnect(t, gwEU.ClientURL(), nats.UserInfo("user", "pwd")) + defer ncGWEU.Close() + + ncEU := natsConnect(t, leafEU.ClientURL(), nats.UserInfo("user", "pwd")) + defer ncEU.Close() + + if test.subs[0] > 0 { + natsQueueSub(t, ncUS, "foo.*", "bar", func(_ *nats.Msg) { + usLeafQ.Add(1) + }) + } + if test.subs[1] > 0 { + natsSub(t, ncUS, "foo.>", func(_ *nats.Msg) { + usLeafPS.Add(1) + }) + } + natsFlush(t, ncUS) + if test.subs[2] > 0 { + natsQueueSub(t, ncGWUS, "foo.*", "bar", func(_ *nats.Msg) { + usGWQ.Add(1) + }) + } + if test.subs[3] > 0 { + natsSub(t, ncGWUS, "foo.>", func(_ *nats.Msg) { + usGWPS.Add(1) + }) + } + natsFlush(t, ncGWUS) + if test.subs[4] > 0 { + natsQueueSub(t, ncGWEU, "foo.*", "bar", func(_ *nats.Msg) { + euGWQ.Add(1) + }) + } + if test.subs[5] > 0 { + natsSub(t, ncGWEU, "foo.>", func(_ *nats.Msg) { + euGWPS.Add(1) + }) + } + natsFlush(t, ncGWEU) + if test.subs[6] > 0 { + natsQueueSub(t, ncEU, "foo.*", "bar", func(_ *nats.Msg) { + euLeafQ.Add(1) + }) + } + if test.subs[7] > 0 { + natsSub(t, ncEU, "foo.>", func(_ *nats.Msg) { + euLeafPS.Add(1) + }) + } + if test.subs[8] > 0 { + // Create on different group, called "baz" + natsQueueSub(t, ncEU, "foo.*", "baz", func(_ *nats.Msg) { + euLeafQBaz.Add(1) + }) + } + natsFlush(t, ncEU) + + // Check that we have what we expect. + check(t, test.expected) + }) + } +} + func TestLeafNodeQueueGroupWeightCorrectOnConnectionCloseInSuperCluster(t *testing.T) { SetGatewaysSolicitDelay(0) defer ResetGatewaysSolicitDelay() @@ -8044,6 +8327,8 @@ func TestLeafNodeWithWeightedDQRequestsToSuperClusterWithStreamImportAccounts(t subs2 = append(subs2, sub) nc.Flush() } + // Let's them propagate + time.Sleep(100 * time.Millisecond) defer closeSubs(subs1) defer closeSubs(subs2) @@ -8085,6 +8370,8 @@ func TestLeafNodeWithWeightedDQRequestsToSuperClusterWithStreamImportAccounts(t nc.Flush() rsubs = append(rsubs, sub) } + // Let's them propagate + time.Sleep(100 * time.Millisecond) nc, _ = jsClientConnect(t, ln.randomServer()) defer nc.Close() From 99ceb451094448d6841293880d461167eeb42b13 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Mon, 17 Feb 2025 14:31:38 +0100 Subject: [PATCH 7/9] [FIXED] Cleanup dmap on Compact/Truncate Signed-off-by: Maurice van Veen --- server/filestore.go | 2 +- server/memstore.go | 10 ++++- server/store_test.go | 102 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 111 insertions(+), 3 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index 6550aa444ff..79cac1b2e33 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -7573,7 +7573,7 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) { if err == errDeletedMsg { // Update dmap. if !smb.dmap.IsEmpty() { - smb.dmap.Delete(seq) + smb.dmap.Delete(mseq) } } else if sm != nil { sz := fileStoreMsgSize(sm.subj, sm.hdr, sm.msg) diff --git a/server/memstore.go b/server/memstore.go index 42939a76621..979bb55da27 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -1012,6 +1012,8 @@ func (ms *memStore) Compact(seq uint64) (uint64, error) { ms.removeSeqPerSubject(sm.subj, seq) // Must delete message after updating per-subject info, to be consistent with file store. delete(ms.msgs, seq) + } else if !ms.dmap.IsEmpty() { + ms.dmap.Delete(seq) } } if purged > ms.state.Msgs { @@ -1032,9 +1034,10 @@ func (ms *memStore) Compact(seq uint64) (uint64, error) { ms.state.FirstSeq = seq ms.state.FirstTime = time.Time{} ms.state.LastSeq = seq - 1 - // Reset msgs and fss. + // Reset msgs, fss and dmap. ms.msgs = make(map[uint64]*StoreMsg) ms.fss = stree.NewSubjectTree[SimpleState]() + ms.dmap.Empty() } ms.mu.Unlock() @@ -1066,9 +1069,10 @@ func (ms *memStore) reset() error { // Update msgs and bytes. ms.state.Msgs = 0 ms.state.Bytes = 0 - // Reset msgs and fss. + // Reset msgs, fss and dmap. ms.msgs = make(map[uint64]*StoreMsg) ms.fss = stree.NewSubjectTree[SimpleState]() + ms.dmap.Empty() ms.mu.Unlock() @@ -1102,6 +1106,8 @@ func (ms *memStore) Truncate(seq uint64) error { ms.removeSeqPerSubject(sm.subj, i) // Must delete message after updating per-subject info, to be consistent with file store. delete(ms.msgs, i) + } else if !ms.dmap.IsEmpty() { + ms.dmap.Delete(i) } } // Reset last. diff --git a/server/store_test.go b/server/store_test.go index 142cf461cbe..fa5be7befed 100644 --- a/server/store_test.go +++ b/server/store_test.go @@ -302,3 +302,105 @@ func TestStoreMaxMsgsPerUpdateBug(t *testing.T) { }, ) } + +func TestStoreCompactCleansUpDmap(t *testing.T) { + config := func() StreamConfig { + return StreamConfig{Name: "TEST", Subjects: []string{"foo"}, MaxMsgsPer: 0} + } + for cseq := uint64(2); cseq <= 4; cseq++ { + t.Run(fmt.Sprintf("Compact(%d)", cseq), func(t *testing.T) { + testAllStoreAllPermutations( + t, false, config(), + func(t *testing.T, fs StreamStore) { + dmapEntries := func() int { + if fss, ok := fs.(*fileStore); ok { + return fss.dmapEntries() + } else if mss, ok := fs.(*memStore); ok { + mss.mu.RLock() + defer mss.mu.RUnlock() + return mss.dmap.Size() + } else { + return 0 + } + } + + // Publish messages, should have no interior deletes. + for i := 0; i < 3; i++ { + _, _, err := fs.StoreMsg("foo", nil, nil) + require_NoError(t, err) + } + require_Len(t, dmapEntries(), 0) + + // Removing one message in the middle should be an interior delete. + _, err := fs.RemoveMsg(2) + require_NoError(t, err) + require_Len(t, dmapEntries(), 1) + + // Compacting must always clean up the interior delete. + _, err = fs.Compact(cseq) + require_NoError(t, err) + require_Len(t, dmapEntries(), 0) + + // Validate first/last sequence. + state := fs.State() + fseq := uint64(3) + if fseq < cseq { + fseq = cseq + } + require_Equal(t, state.FirstSeq, fseq) + require_Equal(t, state.LastSeq, 3) + }) + }) + } +} + +func TestStoreTruncateCleansUpDmap(t *testing.T) { + config := func() StreamConfig { + return StreamConfig{Name: "TEST", Subjects: []string{"foo"}, MaxMsgsPer: 0} + } + for tseq := uint64(0); tseq <= 1; tseq++ { + t.Run(fmt.Sprintf("Truncate(%d)", tseq), func(t *testing.T) { + testAllStoreAllPermutations( + t, false, config(), + func(t *testing.T, fs StreamStore) { + dmapEntries := func() int { + if fss, ok := fs.(*fileStore); ok { + return fss.dmapEntries() + } else if mss, ok := fs.(*memStore); ok { + mss.mu.RLock() + defer mss.mu.RUnlock() + return mss.dmap.Size() + } else { + return 0 + } + } + + // Publish messages, should have no interior deletes. + for i := 0; i < 3; i++ { + _, _, err := fs.StoreMsg("foo", nil, nil) + require_NoError(t, err) + } + require_Len(t, dmapEntries(), 0) + + // Removing one message in the middle should be an interior delete. + _, err := fs.RemoveMsg(2) + require_NoError(t, err) + require_Len(t, dmapEntries(), 1) + + // Truncating must always clean up the interior delete. + err = fs.Truncate(tseq) + require_NoError(t, err) + require_Len(t, dmapEntries(), 0) + + // Validate first/last sequence. + state := fs.State() + fseq := uint64(1) + if fseq > tseq { + fseq = tseq + } + require_Equal(t, state.FirstSeq, fseq) + require_Equal(t, state.LastSeq, tseq) + }) + }) + } +} From 5fbdfe57166044ea49222a9ba45a8a592248bf2c Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Tue, 18 Feb 2025 14:15:09 +0100 Subject: [PATCH 8/9] [FIXED] Backport retry mset.ackMsg if removal fails (#6519) Partial backport of #6140 for v2.10.26+ `mset.ackMsg` could fail if the clustered stream is behind on applies on this server, but the consumer's ack floor is ahead. In this case `checkStateForInterestStream` would skip its check floor ahead, never retrying to ack/remove this message again. Which would leave messages around, not being removed even though they could be. This PR is a partial backport, still doing `mset.ackMsg` for each individual server instead of via message delete proposals for clustered streams, but allowing to retry if a removal should be done. Signed-off-by: Maurice van Veen Signed-off-by: Maurice van Veen Signed-off-by: Neil Twigg Co-authored-by: Neil Twigg --- server/consumer.go | 19 ++++- server/jetstream_cluster_4_test.go | 122 +++++++++++++++++++++++++++-- server/norace_test.go | 2 +- server/stream.go | 16 ++-- 4 files changed, 142 insertions(+), 17 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index 9511c592bcc..dc5f7e58288 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -5544,6 +5544,7 @@ func (o *consumer) isMonitorRunning() bool { // If we detect that our ackfloor is higher than the stream's last sequence, return this error. var errAckFloorHigherThanLastSeq = errors.New("consumer ack floor is higher than streams last sequence") +var errAckFloorInvalid = errors.New("consumer ack floor is invalid") // If we are a consumer of an interest or workqueue policy stream, process that state and make sure consistent. func (o *consumer) checkStateForInterestStream(ss *StreamState) error { @@ -5573,7 +5574,7 @@ func (o *consumer) checkStateForInterestStream(ss *StreamState) error { asflr := state.AckFloor.Stream // Protect ourselves against rolling backwards. if asflr&(1<<63) != 0 { - return nil + return errAckFloorInvalid } // Check if the underlying stream's last sequence is less than our floor. @@ -5592,6 +5593,7 @@ func (o *consumer) checkStateForInterestStream(ss *StreamState) error { fseq = chkfloor } + var retryAsflr uint64 for seq = fseq; asflr > 0 && seq <= asflr; seq++ { if filters != nil { _, nseq, err = store.LoadNextMsgMulti(filters, seq, &smv) @@ -5604,15 +5606,24 @@ func (o *consumer) checkStateForInterestStream(ss *StreamState) error { } // Only ack though if no error and seq <= ack floor. if err == nil && seq <= asflr { - mset.ackMsg(o, seq) + didRemove := mset.ackMsg(o, seq) + // Removing the message could fail. For example if we're behind on stream applies. + // Overwrite retry floor (only the first time) to allow us to check next time if the removal was successful. + if didRemove && retryAsflr == 0 { + retryAsflr = seq + } } } + // If retry floor was not overwritten, set to ack floor+1, we don't need to account for any retries below it. + if retryAsflr == 0 { + retryAsflr = asflr + 1 + } o.mu.Lock() // Update our check floor. // Check floor must never be greater than ack floor+1, otherwise subsequent calls to this function would skip work. - if asflr+1 > o.chkflr { - o.chkflr = asflr + 1 + if retryAsflr > o.chkflr { + o.chkflr = retryAsflr } // See if we need to process this update if our parent stream is not a limits policy stream. state, _ = o.store.State() diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index 4ee7dd5cb2e..bfab9f19415 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -744,7 +744,7 @@ func TestJetStreamClusterStreamOrphanMsgsAndReplicasDrifting(t *testing.T) { var expectedErr error var msgId string var smv StoreMsg - for i, mset := range msets { + for _, mset := range msets { mset.mu.RLock() sm, err := mset.store.LoadMsg(seq, &smv) mset.mu.RUnlock() @@ -754,17 +754,14 @@ func TestJetStreamClusterStreamOrphanMsgsAndReplicasDrifting(t *testing.T) { // by all msets for that seq to prove consistency across replicas. // If one of the msets either returns no error or doesn't return // the same error, then that replica has drifted. - if msgId != _EMPTY_ { - t.Fatalf("Expected MsgId %q for seq %d, but got error: %v", msgId, seq, err) - } else if expectedErr == nil { + if expectedErr == nil { expectedErr = err } else { require_Error(t, err, expectedErr) } continue } - // Only set expected msg ID if it's for the very first time. - if msgId == _EMPTY_ && i == 0 { + if msgId == _EMPTY_ { msgId = string(sm.hdr) } else if msgId != string(sm.hdr) { t.Fatalf("MsgIds do not match for seq %d: %q vs %q", seq, msgId, sm.hdr) @@ -4673,3 +4670,116 @@ func TestJetStreamClusterRoutedAPIRecoverPerformance(t *testing.T) { ljs.mu.Lock() t.Logf("Took %s to clear %d items", time.Since(start), count) } + +func TestJetStreamClusterStreamAckMsgR1SignalsRemovedMsg(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Retention: nats.WorkQueuePolicy, + Replicas: 1, + }) + require_NoError(t, err) + + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{ + Durable: "CONSUMER", + Replicas: 1, + AckPolicy: nats.AckExplicitPolicy, + }) + require_NoError(t, err) + + _, err = js.Publish("foo", nil) + require_NoError(t, err) + + s := c.streamLeader(globalAccountName, "TEST") + acc, err := s.lookupAccount(globalAccountName) + require_NoError(t, err) + mset, err := acc.lookupStream("TEST") + require_NoError(t, err) + o := mset.lookupConsumer("CONSUMER") + require_NotNil(t, o) + + // Too high sequence, should register pre-ack and return true allowing for retries. + require_True(t, mset.ackMsg(o, 100)) + + var smv StoreMsg + sm, err := mset.store.LoadMsg(1, &smv) + require_NoError(t, err) + require_Equal(t, sm.subj, "foo") + + // Now do a proper ack, should immediately remove the message since it's R1. + require_True(t, mset.ackMsg(o, 1)) + _, err = mset.store.LoadMsg(1, &smv) + require_Error(t, err, ErrStoreMsgNotFound) +} + +func TestJetStreamClusterStreamAckMsgR3SignalsRemovedMsg(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Retention: nats.WorkQueuePolicy, + Replicas: 3, + }) + require_NoError(t, err) + + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{ + Durable: "CONSUMER", + Replicas: 3, + AckPolicy: nats.AckExplicitPolicy, + }) + require_NoError(t, err) + + _, err = js.Publish("foo", nil) + require_NoError(t, err) + + getStreamAndConsumer := func(s *Server) (*stream, *consumer, error) { + t.Helper() + acc, err := s.lookupAccount(globalAccountName) + if err != nil { + return nil, nil, err + } + mset, err := acc.lookupStream("TEST") + if err != nil { + return nil, nil, err + } + o := mset.lookupConsumer("CONSUMER") + if err != nil { + return nil, nil, err + } + return mset, o, nil + } + + sl := c.consumerLeader(globalAccountName, "TEST", "CONSUMER") + sf := c.randomNonConsumerLeader(globalAccountName, "TEST", "CONSUMER") + + msetL, ol, err := getStreamAndConsumer(sl) + require_NoError(t, err) + msetF, of, err := getStreamAndConsumer(sf) + require_NoError(t, err) + + // Too high sequence, should register pre-ack and return true allowing for retries. + require_True(t, msetL.ackMsg(ol, 100)) + require_True(t, msetF.ackMsg(of, 100)) + + // Let all servers ack the message. + var smv StoreMsg + for _, s := range c.servers { + mset, _, err := getStreamAndConsumer(s) + require_NoError(t, err) + require_True(t, mset.ackMsg(of, 1)) + + _, err = mset.store.LoadMsg(1, &smv) + require_Error(t, err, ErrStoreMsgNotFound) + } +} diff --git a/server/norace_test.go b/server/norace_test.go index 6825ac452d9..8c9785f505c 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -11225,7 +11225,7 @@ func TestNoRaceJetStreamClusterCheckInterestStatePerformanceInterest(t *testing. } require_Equal(t, checkFloor(mset.lookupConsumer("A")), 1) - require_Equal(t, checkFloor(mset.lookupConsumer("B")), 100_001) + require_Equal(t, checkFloor(mset.lookupConsumer("B")), 90_001) require_Equal(t, checkFloor(mset.lookupConsumer("C")), 100_001) // This checks the chkflr state. For this test this should be much faster, diff --git a/server/stream.go b/server/stream.go index a27ed9d495c..e7d7512e42d 100644 --- a/server/stream.go +++ b/server/stream.go @@ -5683,16 +5683,17 @@ func (mset *stream) clearPreAck(o *consumer, seq uint64) { } // ackMsg is called into from a consumer when we have a WorkQueue or Interest Retention Policy. -func (mset *stream) ackMsg(o *consumer, seq uint64) { +// Returns whether the message at seq was removed as a result of the ACK. +func (mset *stream) ackMsg(o *consumer, seq uint64) bool { if seq == 0 { - return + return false } // Don't make this RLock(). We need to have only 1 running at a time to gauge interest across all consumers. mset.mu.Lock() if mset.closed.Load() || mset.cfg.Retention == LimitsPolicy { mset.mu.Unlock() - return + return false } store := mset.store @@ -5703,7 +5704,9 @@ func (mset *stream) ackMsg(o *consumer, seq uint64) { if seq > state.LastSeq { mset.registerPreAck(o, seq) mset.mu.Unlock() - return + // We have not removed the message, but should still signal so we could retry later + // since we potentially need to remove it then. + return true } // Always clear pre-ack if here. @@ -5712,7 +5715,7 @@ func (mset *stream) ackMsg(o *consumer, seq uint64) { // Make sure this sequence is not below our first sequence. if seq < state.FirstSeq { mset.mu.Unlock() - return + return false } var shouldRemove bool @@ -5728,7 +5731,7 @@ func (mset *stream) ackMsg(o *consumer, seq uint64) { // If nothing else to do. if !shouldRemove { - return + return false } // If we are here we should attempt to remove. @@ -5736,6 +5739,7 @@ func (mset *stream) ackMsg(o *consumer, seq uint64) { // This should not happen, but being pedantic. mset.registerPreAckLock(o, seq) } + return true } // Snapshot creates a snapshot for the stream and possibly consumers. From e97c6143a4b92f6cb95a1e8197478acce20cf817 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Tue, 18 Feb 2025 16:08:38 +0100 Subject: [PATCH 9/9] [FIXED] SkipMsg left in mb.dmap after recovery Signed-off-by: Maurice van Veen --- server/filestore.go | 13 +++------- server/filestore_test.go | 55 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 59 insertions(+), 9 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index 79cac1b2e33..39526752349 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -1432,15 +1432,14 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, []uint64, error) { if seq == 0 || seq&ebit != 0 || seq < fseq { seq = seq &^ ebit if seq >= fseq { - // Only add to dmap if past recorded first seq and non-zero. - if seq != 0 { - addToDmap(seq) - } atomic.StoreUint64(&mb.last.seq, seq) mb.last.ts = ts if mb.msgs == 0 { atomic.StoreUint64(&mb.first.seq, seq+1) mb.first.ts = 0 + } else if seq != 0 { + // Only add to dmap if past recorded first seq and non-zero. + addToDmap(seq) } } index += rl @@ -7017,11 +7016,7 @@ func (fs *fileStore) State() StreamState { } // Add in deleted. mb.dmap.Range(func(seq uint64) bool { - if seq < fseq { - mb.dmap.Delete(seq) - } else { - state.Deleted = append(state.Deleted, seq) - } + state.Deleted = append(state.Deleted, seq) return true }) mb.mu.Unlock() diff --git a/server/filestore_test.go b/server/filestore_test.go index 18493b61017..2f80f671809 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -8273,3 +8273,58 @@ func changeDirectoryPermission(directory string, mode fs.FileMode) error { }) return err } + +func TestFileStoreLeftoverSkipMsgInDmap(t *testing.T) { + storeDir := t.TempDir() + fs, err := newFileStore( + FileStoreConfig{StoreDir: storeDir}, + StreamConfig{Name: "zzz", Subjects: []string{"test.*"}, Storage: FileStorage, MaxMsgsPer: 1}, + ) + require_NoError(t, err) + defer fs.Stop() + + getLmbState := func(fs *fileStore) (uint64, uint64, int) { + fs.mu.RLock() + lmb := fs.lmb + fs.mu.RUnlock() + lmb.mu.RLock() + fseq := atomic.LoadUint64(&lmb.first.seq) + lseq := atomic.LoadUint64(&lmb.last.seq) + dmaps := lmb.dmap.Size() + lmb.mu.RUnlock() + return fseq, lseq, dmaps + } + + // Only skip a message. + fs.SkipMsg() + + // Confirm state. + state := fs.State() + require_Equal(t, state.FirstSeq, 2) + require_Equal(t, state.LastSeq, 1) + require_Equal(t, state.NumDeleted, 0) + fseq, lseq, dmaps := getLmbState(fs) + require_Equal(t, fseq, 2) + require_Equal(t, lseq, 1) + require_Len(t, dmaps, 0) + + // Stop without writing index.db so we recover based on just the blk file. + require_NoError(t, fs.stop(false, false)) + + fs, err = newFileStore( + FileStoreConfig{StoreDir: storeDir}, + StreamConfig{Name: "zzz", Subjects: []string{"test.*"}, Storage: FileStorage, MaxMsgsPer: 1}, + ) + require_NoError(t, err) + defer fs.Stop() + + // Confirm the skipped message is not included in the deletes. + state = fs.State() + require_Equal(t, state.FirstSeq, 2) + require_Equal(t, state.LastSeq, 1) + require_Equal(t, state.NumDeleted, 0) + fseq, lseq, dmaps = getLmbState(fs) + require_Equal(t, fseq, 2) + require_Equal(t, lseq, 1) + require_Len(t, dmaps, 0) +}