diff --git a/server/client.go b/server/client.go index b308389f7b..94fec71db7 100644 --- a/server/client.go +++ b/server/client.go @@ -4752,6 +4752,21 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver, // Declared here because of goto. var queues [][]byte + var leafOrigin string + switch c.kind { + case ROUTER: + if len(c.pa.origin) > 0 { + // Picture a message sent from a leafnode to a server that then routes + // this message: CluserA -leaf-> HUB1 -route-> HUB2 + // Here we are in HUB2, so c.kind is a ROUTER, but the message will + // contain a c.pa.origin set to "ClusterA" to indicate that this message + // originated from that leafnode cluster. + leafOrigin = bytesToString(c.pa.origin) + } + case LEAF: + leafOrigin = c.remoteCluster() + } + // For all routes/leaf/gateway connections, we may still want to send messages to // leaf nodes or routes even if there are no queue filters since we collect // them above and do not process inline like normal clients. @@ -4791,7 +4806,13 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver, for i := 0; i < len(qsubs); i++ { sub = qsubs[i] if dst := sub.client.kind; dst == LEAF || dst == ROUTER { - // If we have assigned an ROUTER rsub already, replace if + // If the destination is a LEAF, we first need to make sure + // that we would not pick one that was the origin of this + // message. + if dst == LEAF && leafOrigin != _EMPTY_ && leafOrigin == sub.client.remoteCluster() { + continue + } + // If we have assigned a ROUTER rsub already, replace if // the destination is a LEAF since we want to favor that. if rsub == nil || (rsub.client.kind == ROUTER && dst == LEAF) { rsub = sub @@ -4817,6 +4838,8 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver, } // Find a subscription that is able to deliver this message starting at a random index. + // Note that if the message came from a ROUTER, we will only have CLIENT or LEAF + // queue subs here, otherwise we can have all types. for i := 0; i < lqs; i++ { if sindex+i < lqs { sub = qsubs[sindex+i] @@ -4837,6 +4860,11 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver, // Here we just care about a client or leaf and skipping a leaf and preferring locals. if dst := sub.client.kind; dst == ROUTER || dst == LEAF { if (src == LEAF || src == CLIENT) && dst == LEAF { + // If we come from a LEAF and are about to pick a LEAF connection, + // make sure this is not the same leaf cluster. + if src == LEAF && leafOrigin != _EMPTY_ && leafOrigin == sub.client.remoteCluster() { + continue + } // Remember that leaf in case we don't find any other candidate. if rsub == nil { rsub = sub @@ -4980,18 +5008,11 @@ sendToRoutesOrLeafs: // If so make sure we do not send it back to the same cluster for a different // leafnode. Cluster wide no echo. if dc.kind == LEAF { - // Check two scenarios. One is inbound from a route (c.pa.origin) - if c.kind == ROUTER && len(c.pa.origin) > 0 { - if bytesToString(c.pa.origin) == dc.remoteCluster() { - continue - } - } - // The other is leaf to leaf. - if c.kind == LEAF { - src, dest := c.remoteCluster(), dc.remoteCluster() - if src != _EMPTY_ && src == dest { - continue - } + // Check two scenarios. One is inbound from a route (c.pa.origin), + // and the other is leaf to leaf. In both case, leafOrigin is the one + // to use for the comparison. + if leafOrigin != _EMPTY_ && leafOrigin == dc.remoteCluster() { + continue } // We need to check if this is a request that has a stamped client information header. diff --git a/server/leafnode_test.go b/server/leafnode_test.go index 0a068ccf66..c5a131d277 100644 --- a/server/leafnode_test.go +++ b/server/leafnode_test.go @@ -37,6 +37,7 @@ import ( jwt "github.com/nats-io/jwt/v2" "github.com/nats-io/nats.go" + "github.com/nats-io/nats-server/v2/internal/fastrand" "github.com/nats-io/nats-server/v2/internal/testhelper" ) @@ -4233,6 +4234,325 @@ func TestLeafNodeQueueGroupDistributionVariant(t *testing.T) { sendAndCheck(2) } +func TestLeafNodeQueueGroupDistributionWithDaisyChainAndGateway(t *testing.T) { + SetGatewaysSolicitDelay(0) + defer ResetGatewaysSolicitDelay() + + // We create a sort of a ladder of servers with connections that look like this: + // + // D1 <--- route ---> D2 + // | | + // GW GW + // | | + // C1 <--- route ---> C2 + // | | + // Leaf Leaf + // | | + // B1 <--- route ---> B2 + // | | + // Leaf Leaf + // | | + // A1 <--- route ---> A2 + // + // We will then place queue subscriptions (different sub-tests) on A1, A2 + // B1, B2, D1 and D2. + + accs := ` + accounts { + SYS: {users: [{user:sys, password: pwd}]} + USER: {users: [{user:user, password: pwd}]} + } + system_account: SYS + ` + dConf := ` + %s + server_name: %s + port: -1 + cluster { + name: "D" + port: -1 + %s + } + gateway { + name: "D" + port: -1 + } + ` + d1Conf := createConfFile(t, []byte(fmt.Sprintf(dConf, accs, "GW1", _EMPTY_))) + d1, d1Opts := RunServerWithConfig(d1Conf) + defer d1.Shutdown() + + d2Conf := createConfFile(t, []byte(fmt.Sprintf(dConf, accs, "GW2", + fmt.Sprintf("routes: [\"nats://127.0.0.1:%d\"]", d1Opts.Cluster.Port)))) + d2, d2Opts := RunServerWithConfig(d2Conf) + defer d2.Shutdown() + + checkClusterFormed(t, d1, d2) + + leafCConf := ` + %s + server_name: %s + port: -1 + cluster { + name: C + port: -1 + %s + } + leafnodes { + port: -1 + } + gateway { + name: C + port: -1 + gateways [ + { + name: D + url: "nats://127.0.0.1:%d" + } + ] + } + ` + c1Conf := createConfFile(t, []byte(fmt.Sprintf(leafCConf, accs, "C1", _EMPTY_, d1Opts.Gateway.Port))) + c1, c1Opts := RunServerWithConfig(c1Conf) + defer c1.Shutdown() + + waitForOutboundGateways(t, c1, 1, time.Second) + waitForInboundGateways(t, d1, 1, time.Second) + + c2Conf := createConfFile(t, []byte(fmt.Sprintf(leafCConf, accs, "C2", + fmt.Sprintf("routes: [\"nats://127.0.0.1:%d\"]", c1Opts.Cluster.Port), d2Opts.Gateway.Port))) + c2, c2Opts := RunServerWithConfig(c2Conf) + defer c2.Shutdown() + + waitForOutboundGateways(t, c2, 1, time.Second) + waitForInboundGateways(t, d2, 1, time.Second) + + checkClusterFormed(t, c1, c2) + + leafABConf := ` + %s + server_name: %s + port: -1 + cluster { + name: %s + port: -1 + %s + } + leafnodes { + port: -1 + remotes [ + { + url: "nats://user:pwd@127.0.0.1:%d" + account: USER + } + ] + } + ` + b1Conf := createConfFile(t, []byte(fmt.Sprintf(leafABConf, accs, "B1", "B", _EMPTY_, c1Opts.LeafNode.Port))) + b1, b1Opts := RunServerWithConfig(b1Conf) + defer b1.Shutdown() + + checkLeafNodeConnected(t, b1) + checkLeafNodeConnected(t, c1) + + b2Conf := createConfFile(t, []byte(fmt.Sprintf(leafABConf, accs, "B2", "B", + fmt.Sprintf("routes: [\"nats://127.0.0.1:%d\"]", b1Opts.Cluster.Port), c2Opts.LeafNode.Port))) + b2, b2Opts := RunServerWithConfig(b2Conf) + defer b2.Shutdown() + + checkLeafNodeConnected(t, b2) + checkLeafNodeConnected(t, c2) + checkClusterFormed(t, b1, b2) + + a1Conf := createConfFile(t, []byte(fmt.Sprintf(leafABConf, accs, "A1", "A", _EMPTY_, b1Opts.LeafNode.Port))) + a1, a1Opts := RunServerWithConfig(a1Conf) + defer a1.Shutdown() + + checkLeafNodeConnectedCount(t, b1, 2) + checkLeafNodeConnected(t, a1) + + a2Conf := createConfFile(t, []byte(fmt.Sprintf(leafABConf, accs, "A2", "A", + fmt.Sprintf("routes: [\"nats://127.0.0.1:%d\"]", a1Opts.Cluster.Port), b2Opts.LeafNode.Port))) + a2, _ := RunServerWithConfig(a2Conf) + defer a2.Shutdown() + + checkLeafNodeConnectedCount(t, b2, 2) + checkLeafNodeConnected(t, a2) + checkClusterFormed(t, a1, a2) + + // Create out client connections to all servers where we may need to have + // queue subscriptions. + ncD1 := natsConnect(t, d1.ClientURL(), nats.UserInfo("user", "pwd")) + defer ncD1.Close() + ncD2 := natsConnect(t, d2.ClientURL(), nats.UserInfo("user", "pwd")) + defer ncD2.Close() + ncB1 := natsConnect(t, b1.ClientURL(), nats.UserInfo("user", "pwd")) + defer ncB1.Close() + ncB2 := natsConnect(t, b2.ClientURL(), nats.UserInfo("user", "pwd")) + defer ncB2.Close() + ncA1 := natsConnect(t, a1.ClientURL(), nats.UserInfo("user", "pwd")) + defer ncA1.Close() + ncA2 := natsConnect(t, a2.ClientURL(), nats.UserInfo("user", "pwd")) + defer ncA2.Close() + + // Helper to check that the interest is propagated to all servers + checkInterest := func(t *testing.T, subj string) { + t.Helper() + for _, s := range []*Server{a1, a2, b1, b2, c1, c2, d1, d2} { + acc, err := s.LookupAccount("USER") + require_NoError(t, err) + checkFor(t, time.Second, 10*time.Millisecond, func() error { + if acc.Interest(subj) != 0 { + return nil + } + return fmt.Errorf("Still no interest on %q in server %q", subj, s) + }) + } + } + + // Helper to send messages on given subject. We are always sending + // from cluster B in this test, but we pick randomly between B1 and B2. + total := 1000 + send := func(t *testing.T, subj string) { + for i := 0; i < total; i++ { + var nc *nats.Conn + if fastrand.Uint32n(2) == 0 { + nc = ncB1 + } else { + nc = ncB2 + } + natsPub(t, nc, subj, []byte("hello")) + } + } + + const queue = "queue" + + for i, test := range []struct { + name string + a1 bool + a2 bool + b1 bool + b2 bool + d1 bool + d2 bool + }{ + // Cases with QSubs in A, B and D + {"A1 __ B1 __ D1 __", true, false, true, false, true, false}, + {"A1 __ B1 __ __ D2", true, false, true, false, false, true}, + {"A1 __ B1 __ D1 D2", true, false, true, false, true, true}, + + {"A1 __ __ B2 D1 __", true, false, false, true, true, false}, + {"A1 __ __ B2 __ D2", true, false, false, true, false, true}, + {"A1 __ __ B2 D1 D2", true, false, false, true, true, true}, + + {"A1 __ B1 B2 D1 __", true, false, true, true, true, false}, + {"A1 __ B1 B2 __ D2", true, false, true, true, false, true}, + {"A1 __ B1 B2 D1 D2", true, false, true, true, true, true}, + + {"__ A2 B1 __ D1 __", false, true, true, false, true, false}, + {"__ A2 B1 __ __ D2", false, true, true, false, false, true}, + {"__ A2 B1 __ D1 D2", false, true, true, false, true, true}, + + {"__ A2 __ B2 D1 __", false, true, false, true, true, false}, + {"__ A2 __ B2 __ D2", false, true, false, true, false, true}, + {"__ A2 __ B2 D1 D2", false, true, false, true, true, true}, + + {"__ A2 B1 B2 D1 __", false, true, true, true, true, false}, + {"__ A2 B1 B2 __ D2", false, true, true, true, false, true}, + {"__ A2 B1 B2 D1 D2", false, true, true, true, true, true}, + + {"A1 A2 B1 __ D1 __", true, true, true, false, true, false}, + {"A1 A2 B1 __ __ D2", true, true, true, false, false, true}, + {"A1 A2 B1 __ D1 D2", true, true, true, false, true, true}, + + {"A1 A2 __ B2 D1 __", true, true, false, true, true, false}, + {"A1 A2 __ B2 __ D2", true, true, false, true, false, true}, + {"A1 A2 __ B2 D1 D2", true, true, false, true, true, true}, + + {"A1 A2 B1 B2 D1 __", true, true, true, true, true, false}, + {"A1 A2 B1 B2 __ D2", true, true, true, true, false, true}, + {"A1 A2 B1 B2 D1 D2", true, true, true, true, true, true}, + + // Now without any QSub in B cluster (so just A and D) + {"A1 __ __ __ D1 __", true, false, false, false, true, false}, + {"A1 __ __ __ __ D2", true, false, false, false, false, true}, + {"A1 __ __ __ D1 D2", true, false, false, false, true, true}, + + {"__ A2 __ __ D1 __", false, true, false, false, true, false}, + {"__ A2 __ __ __ D2", false, true, false, false, false, true}, + {"__ A2 __ __ D1 D2", false, true, false, false, true, true}, + + {"A1 A2 __ __ D1 __", true, true, false, false, true, false}, + {"A1 A2 __ __ __ D2", true, true, false, false, false, true}, + {"A1 A2 __ __ D1 D2", true, true, false, false, true, true}, + } { + t.Run(test.name, func(t *testing.T) { + subj := fmt.Sprintf("foo.%d", i+1) + var aCount, bCount, dCount atomic.Int32 + if test.a1 { + qsA1 := natsQueueSub(t, ncA1, subj, queue, func(_ *nats.Msg) { + aCount.Add(1) + }) + defer qsA1.Unsubscribe() + } + if test.a2 { + qsA2 := natsQueueSub(t, ncA2, subj, queue, func(_ *nats.Msg) { + aCount.Add(1) + }) + defer qsA2.Unsubscribe() + } + if test.b1 { + qsB1 := natsQueueSub(t, ncB1, subj, queue, func(_ *nats.Msg) { + bCount.Add(1) + }) + defer qsB1.Unsubscribe() + } + if test.b2 { + qsB2 := natsQueueSub(t, ncB2, subj, queue, func(_ *nats.Msg) { + bCount.Add(1) + }) + defer qsB2.Unsubscribe() + } + if test.d1 { + qsD1 := natsQueueSub(t, ncD1, subj, queue, func(_ *nats.Msg) { + dCount.Add(1) + }) + defer qsD1.Unsubscribe() + } + if test.d2 { + qsD2 := natsQueueSub(t, ncD2, subj, queue, func(_ *nats.Msg) { + dCount.Add(1) + }) + defer qsD2.Unsubscribe() + } + checkInterest(t, subj) + + // Now send messages + send(t, subj) + + // Check that appropriate queue subs receive all messages. + checkFor(t, 2*time.Second, 10*time.Millisecond, func() error { + // When there is (are) qsub(s) on b, then only A and B should + // get the messages. Otherwise, it should be between A and D + n := aCount.Load() + if test.b1 || test.b2 { + n += bCount.Load() + } else { + n += dCount.Load() + } + if n == int32(total) { + return nil + } + return fmt.Errorf("Got only %v/%v messages (a=%v b=%v d=%v)", n, total, aCount.Load(), bCount.Load(), dCount.Load()) + }) + // For this specific case, make sure that D did not receive any. + if test.b1 || test.b2 { + require_LessThan(t, dCount.Load(), 1) + } + }) + } +} + func TestLeafNodeQueueGroupWithLateLNJoin(t *testing.T) { /*