From cd43b70daef8c13991feed53853dd69761400c98 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Wed, 13 Nov 2024 14:31:42 -0700 Subject: [PATCH] [FIXED] LeafNode: queue distribution with daisy-chain and gateway In complex setup, a message produced from a cluster that had queue interest from leafnodes (either hub or spoke) would sometimes not deliver a message if the interest was a leafnode that had the interest on behalf of a gateway. In the setup described in the issue this PR fixes, "Cluster B" may have picked "Cluster C", but that cluster does not have local queue interest, only the leafnode interest from "Cluster B", and would pick a LEAF connection to this cluster, but then suppress the message since it came from "B" so "C" cannot send it back there. But picking a queue sub for "B" in "C" would then prevent the message to be delivered to the gateway "D". Resolves #6125 Signed-off-by: Ivan Kozlovic --- server/client.go | 47 ++++-- server/leafnode_test.go | 320 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 354 insertions(+), 13 deletions(-) diff --git a/server/client.go b/server/client.go index b308389f7b6..94fec71db7b 100644 --- a/server/client.go +++ b/server/client.go @@ -4752,6 +4752,21 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver, // Declared here because of goto. var queues [][]byte + var leafOrigin string + switch c.kind { + case ROUTER: + if len(c.pa.origin) > 0 { + // Picture a message sent from a leafnode to a server that then routes + // this message: CluserA -leaf-> HUB1 -route-> HUB2 + // Here we are in HUB2, so c.kind is a ROUTER, but the message will + // contain a c.pa.origin set to "ClusterA" to indicate that this message + // originated from that leafnode cluster. + leafOrigin = bytesToString(c.pa.origin) + } + case LEAF: + leafOrigin = c.remoteCluster() + } + // For all routes/leaf/gateway connections, we may still want to send messages to // leaf nodes or routes even if there are no queue filters since we collect // them above and do not process inline like normal clients. @@ -4791,7 +4806,13 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver, for i := 0; i < len(qsubs); i++ { sub = qsubs[i] if dst := sub.client.kind; dst == LEAF || dst == ROUTER { - // If we have assigned an ROUTER rsub already, replace if + // If the destination is a LEAF, we first need to make sure + // that we would not pick one that was the origin of this + // message. + if dst == LEAF && leafOrigin != _EMPTY_ && leafOrigin == sub.client.remoteCluster() { + continue + } + // If we have assigned a ROUTER rsub already, replace if // the destination is a LEAF since we want to favor that. if rsub == nil || (rsub.client.kind == ROUTER && dst == LEAF) { rsub = sub @@ -4817,6 +4838,8 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver, } // Find a subscription that is able to deliver this message starting at a random index. + // Note that if the message came from a ROUTER, we will only have CLIENT or LEAF + // queue subs here, otherwise we can have all types. for i := 0; i < lqs; i++ { if sindex+i < lqs { sub = qsubs[sindex+i] @@ -4837,6 +4860,11 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver, // Here we just care about a client or leaf and skipping a leaf and preferring locals. if dst := sub.client.kind; dst == ROUTER || dst == LEAF { if (src == LEAF || src == CLIENT) && dst == LEAF { + // If we come from a LEAF and are about to pick a LEAF connection, + // make sure this is not the same leaf cluster. + if src == LEAF && leafOrigin != _EMPTY_ && leafOrigin == sub.client.remoteCluster() { + continue + } // Remember that leaf in case we don't find any other candidate. if rsub == nil { rsub = sub @@ -4980,18 +5008,11 @@ sendToRoutesOrLeafs: // If so make sure we do not send it back to the same cluster for a different // leafnode. Cluster wide no echo. if dc.kind == LEAF { - // Check two scenarios. One is inbound from a route (c.pa.origin) - if c.kind == ROUTER && len(c.pa.origin) > 0 { - if bytesToString(c.pa.origin) == dc.remoteCluster() { - continue - } - } - // The other is leaf to leaf. - if c.kind == LEAF { - src, dest := c.remoteCluster(), dc.remoteCluster() - if src != _EMPTY_ && src == dest { - continue - } + // Check two scenarios. One is inbound from a route (c.pa.origin), + // and the other is leaf to leaf. In both case, leafOrigin is the one + // to use for the comparison. + if leafOrigin != _EMPTY_ && leafOrigin == dc.remoteCluster() { + continue } // We need to check if this is a request that has a stamped client information header. diff --git a/server/leafnode_test.go b/server/leafnode_test.go index 0a068ccf665..c5a131d277d 100644 --- a/server/leafnode_test.go +++ b/server/leafnode_test.go @@ -37,6 +37,7 @@ import ( jwt "github.com/nats-io/jwt/v2" "github.com/nats-io/nats.go" + "github.com/nats-io/nats-server/v2/internal/fastrand" "github.com/nats-io/nats-server/v2/internal/testhelper" ) @@ -4233,6 +4234,325 @@ func TestLeafNodeQueueGroupDistributionVariant(t *testing.T) { sendAndCheck(2) } +func TestLeafNodeQueueGroupDistributionWithDaisyChainAndGateway(t *testing.T) { + SetGatewaysSolicitDelay(0) + defer ResetGatewaysSolicitDelay() + + // We create a sort of a ladder of servers with connections that look like this: + // + // D1 <--- route ---> D2 + // | | + // GW GW + // | | + // C1 <--- route ---> C2 + // | | + // Leaf Leaf + // | | + // B1 <--- route ---> B2 + // | | + // Leaf Leaf + // | | + // A1 <--- route ---> A2 + // + // We will then place queue subscriptions (different sub-tests) on A1, A2 + // B1, B2, D1 and D2. + + accs := ` + accounts { + SYS: {users: [{user:sys, password: pwd}]} + USER: {users: [{user:user, password: pwd}]} + } + system_account: SYS + ` + dConf := ` + %s + server_name: %s + port: -1 + cluster { + name: "D" + port: -1 + %s + } + gateway { + name: "D" + port: -1 + } + ` + d1Conf := createConfFile(t, []byte(fmt.Sprintf(dConf, accs, "GW1", _EMPTY_))) + d1, d1Opts := RunServerWithConfig(d1Conf) + defer d1.Shutdown() + + d2Conf := createConfFile(t, []byte(fmt.Sprintf(dConf, accs, "GW2", + fmt.Sprintf("routes: [\"nats://127.0.0.1:%d\"]", d1Opts.Cluster.Port)))) + d2, d2Opts := RunServerWithConfig(d2Conf) + defer d2.Shutdown() + + checkClusterFormed(t, d1, d2) + + leafCConf := ` + %s + server_name: %s + port: -1 + cluster { + name: C + port: -1 + %s + } + leafnodes { + port: -1 + } + gateway { + name: C + port: -1 + gateways [ + { + name: D + url: "nats://127.0.0.1:%d" + } + ] + } + ` + c1Conf := createConfFile(t, []byte(fmt.Sprintf(leafCConf, accs, "C1", _EMPTY_, d1Opts.Gateway.Port))) + c1, c1Opts := RunServerWithConfig(c1Conf) + defer c1.Shutdown() + + waitForOutboundGateways(t, c1, 1, time.Second) + waitForInboundGateways(t, d1, 1, time.Second) + + c2Conf := createConfFile(t, []byte(fmt.Sprintf(leafCConf, accs, "C2", + fmt.Sprintf("routes: [\"nats://127.0.0.1:%d\"]", c1Opts.Cluster.Port), d2Opts.Gateway.Port))) + c2, c2Opts := RunServerWithConfig(c2Conf) + defer c2.Shutdown() + + waitForOutboundGateways(t, c2, 1, time.Second) + waitForInboundGateways(t, d2, 1, time.Second) + + checkClusterFormed(t, c1, c2) + + leafABConf := ` + %s + server_name: %s + port: -1 + cluster { + name: %s + port: -1 + %s + } + leafnodes { + port: -1 + remotes [ + { + url: "nats://user:pwd@127.0.0.1:%d" + account: USER + } + ] + } + ` + b1Conf := createConfFile(t, []byte(fmt.Sprintf(leafABConf, accs, "B1", "B", _EMPTY_, c1Opts.LeafNode.Port))) + b1, b1Opts := RunServerWithConfig(b1Conf) + defer b1.Shutdown() + + checkLeafNodeConnected(t, b1) + checkLeafNodeConnected(t, c1) + + b2Conf := createConfFile(t, []byte(fmt.Sprintf(leafABConf, accs, "B2", "B", + fmt.Sprintf("routes: [\"nats://127.0.0.1:%d\"]", b1Opts.Cluster.Port), c2Opts.LeafNode.Port))) + b2, b2Opts := RunServerWithConfig(b2Conf) + defer b2.Shutdown() + + checkLeafNodeConnected(t, b2) + checkLeafNodeConnected(t, c2) + checkClusterFormed(t, b1, b2) + + a1Conf := createConfFile(t, []byte(fmt.Sprintf(leafABConf, accs, "A1", "A", _EMPTY_, b1Opts.LeafNode.Port))) + a1, a1Opts := RunServerWithConfig(a1Conf) + defer a1.Shutdown() + + checkLeafNodeConnectedCount(t, b1, 2) + checkLeafNodeConnected(t, a1) + + a2Conf := createConfFile(t, []byte(fmt.Sprintf(leafABConf, accs, "A2", "A", + fmt.Sprintf("routes: [\"nats://127.0.0.1:%d\"]", a1Opts.Cluster.Port), b2Opts.LeafNode.Port))) + a2, _ := RunServerWithConfig(a2Conf) + defer a2.Shutdown() + + checkLeafNodeConnectedCount(t, b2, 2) + checkLeafNodeConnected(t, a2) + checkClusterFormed(t, a1, a2) + + // Create out client connections to all servers where we may need to have + // queue subscriptions. + ncD1 := natsConnect(t, d1.ClientURL(), nats.UserInfo("user", "pwd")) + defer ncD1.Close() + ncD2 := natsConnect(t, d2.ClientURL(), nats.UserInfo("user", "pwd")) + defer ncD2.Close() + ncB1 := natsConnect(t, b1.ClientURL(), nats.UserInfo("user", "pwd")) + defer ncB1.Close() + ncB2 := natsConnect(t, b2.ClientURL(), nats.UserInfo("user", "pwd")) + defer ncB2.Close() + ncA1 := natsConnect(t, a1.ClientURL(), nats.UserInfo("user", "pwd")) + defer ncA1.Close() + ncA2 := natsConnect(t, a2.ClientURL(), nats.UserInfo("user", "pwd")) + defer ncA2.Close() + + // Helper to check that the interest is propagated to all servers + checkInterest := func(t *testing.T, subj string) { + t.Helper() + for _, s := range []*Server{a1, a2, b1, b2, c1, c2, d1, d2} { + acc, err := s.LookupAccount("USER") + require_NoError(t, err) + checkFor(t, time.Second, 10*time.Millisecond, func() error { + if acc.Interest(subj) != 0 { + return nil + } + return fmt.Errorf("Still no interest on %q in server %q", subj, s) + }) + } + } + + // Helper to send messages on given subject. We are always sending + // from cluster B in this test, but we pick randomly between B1 and B2. + total := 1000 + send := func(t *testing.T, subj string) { + for i := 0; i < total; i++ { + var nc *nats.Conn + if fastrand.Uint32n(2) == 0 { + nc = ncB1 + } else { + nc = ncB2 + } + natsPub(t, nc, subj, []byte("hello")) + } + } + + const queue = "queue" + + for i, test := range []struct { + name string + a1 bool + a2 bool + b1 bool + b2 bool + d1 bool + d2 bool + }{ + // Cases with QSubs in A, B and D + {"A1 __ B1 __ D1 __", true, false, true, false, true, false}, + {"A1 __ B1 __ __ D2", true, false, true, false, false, true}, + {"A1 __ B1 __ D1 D2", true, false, true, false, true, true}, + + {"A1 __ __ B2 D1 __", true, false, false, true, true, false}, + {"A1 __ __ B2 __ D2", true, false, false, true, false, true}, + {"A1 __ __ B2 D1 D2", true, false, false, true, true, true}, + + {"A1 __ B1 B2 D1 __", true, false, true, true, true, false}, + {"A1 __ B1 B2 __ D2", true, false, true, true, false, true}, + {"A1 __ B1 B2 D1 D2", true, false, true, true, true, true}, + + {"__ A2 B1 __ D1 __", false, true, true, false, true, false}, + {"__ A2 B1 __ __ D2", false, true, true, false, false, true}, + {"__ A2 B1 __ D1 D2", false, true, true, false, true, true}, + + {"__ A2 __ B2 D1 __", false, true, false, true, true, false}, + {"__ A2 __ B2 __ D2", false, true, false, true, false, true}, + {"__ A2 __ B2 D1 D2", false, true, false, true, true, true}, + + {"__ A2 B1 B2 D1 __", false, true, true, true, true, false}, + {"__ A2 B1 B2 __ D2", false, true, true, true, false, true}, + {"__ A2 B1 B2 D1 D2", false, true, true, true, true, true}, + + {"A1 A2 B1 __ D1 __", true, true, true, false, true, false}, + {"A1 A2 B1 __ __ D2", true, true, true, false, false, true}, + {"A1 A2 B1 __ D1 D2", true, true, true, false, true, true}, + + {"A1 A2 __ B2 D1 __", true, true, false, true, true, false}, + {"A1 A2 __ B2 __ D2", true, true, false, true, false, true}, + {"A1 A2 __ B2 D1 D2", true, true, false, true, true, true}, + + {"A1 A2 B1 B2 D1 __", true, true, true, true, true, false}, + {"A1 A2 B1 B2 __ D2", true, true, true, true, false, true}, + {"A1 A2 B1 B2 D1 D2", true, true, true, true, true, true}, + + // Now without any QSub in B cluster (so just A and D) + {"A1 __ __ __ D1 __", true, false, false, false, true, false}, + {"A1 __ __ __ __ D2", true, false, false, false, false, true}, + {"A1 __ __ __ D1 D2", true, false, false, false, true, true}, + + {"__ A2 __ __ D1 __", false, true, false, false, true, false}, + {"__ A2 __ __ __ D2", false, true, false, false, false, true}, + {"__ A2 __ __ D1 D2", false, true, false, false, true, true}, + + {"A1 A2 __ __ D1 __", true, true, false, false, true, false}, + {"A1 A2 __ __ __ D2", true, true, false, false, false, true}, + {"A1 A2 __ __ D1 D2", true, true, false, false, true, true}, + } { + t.Run(test.name, func(t *testing.T) { + subj := fmt.Sprintf("foo.%d", i+1) + var aCount, bCount, dCount atomic.Int32 + if test.a1 { + qsA1 := natsQueueSub(t, ncA1, subj, queue, func(_ *nats.Msg) { + aCount.Add(1) + }) + defer qsA1.Unsubscribe() + } + if test.a2 { + qsA2 := natsQueueSub(t, ncA2, subj, queue, func(_ *nats.Msg) { + aCount.Add(1) + }) + defer qsA2.Unsubscribe() + } + if test.b1 { + qsB1 := natsQueueSub(t, ncB1, subj, queue, func(_ *nats.Msg) { + bCount.Add(1) + }) + defer qsB1.Unsubscribe() + } + if test.b2 { + qsB2 := natsQueueSub(t, ncB2, subj, queue, func(_ *nats.Msg) { + bCount.Add(1) + }) + defer qsB2.Unsubscribe() + } + if test.d1 { + qsD1 := natsQueueSub(t, ncD1, subj, queue, func(_ *nats.Msg) { + dCount.Add(1) + }) + defer qsD1.Unsubscribe() + } + if test.d2 { + qsD2 := natsQueueSub(t, ncD2, subj, queue, func(_ *nats.Msg) { + dCount.Add(1) + }) + defer qsD2.Unsubscribe() + } + checkInterest(t, subj) + + // Now send messages + send(t, subj) + + // Check that appropriate queue subs receive all messages. + checkFor(t, 2*time.Second, 10*time.Millisecond, func() error { + // When there is (are) qsub(s) on b, then only A and B should + // get the messages. Otherwise, it should be between A and D + n := aCount.Load() + if test.b1 || test.b2 { + n += bCount.Load() + } else { + n += dCount.Load() + } + if n == int32(total) { + return nil + } + return fmt.Errorf("Got only %v/%v messages (a=%v b=%v d=%v)", n, total, aCount.Load(), bCount.Load(), dCount.Load()) + }) + // For this specific case, make sure that D did not receive any. + if test.b1 || test.b2 { + require_LessThan(t, dCount.Load(), 1) + } + }) + } +} + func TestLeafNodeQueueGroupWithLateLNJoin(t *testing.T) { /*