Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-picks for 2.10.26-RC.4 #6520

Merged
merged 9 commits into from
Feb 18, 2025
Prev Previous commit
Next Next commit
[FIXED] LeafNode: possible delivery to several DQ members across Gateway
If the same queue group has members running on different leafnodes
connected through a gateway, it was possible for a message to be
delivered to several members running on different leaf nodes if
there was an interest (either plain subscription or for other queue
groups) that made the produce message travel through the gateway.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic authored and neilalexander committed Feb 18, 2025
commit 19dc65d0718ce8dd82a050bfa84704fb7e1cc3af
6 changes: 3 additions & 3 deletions server/client.go
Original file line number Diff line number Diff line change
@@ -3970,7 +3970,7 @@ func (c *client) processInboundClientMsg(msg []byte) (bool, bool) {
reply = append(reply, '@')
reply = append(reply, c.pa.deliver...)
}
didDeliver = c.sendMsgToGateways(acc, msg, c.pa.subject, reply, qnames) || didDeliver
didDeliver = c.sendMsgToGateways(acc, msg, c.pa.subject, reply, qnames, false) || didDeliver
}

// Check to see if we did not deliver to anyone and the client has a reply subject set
@@ -4017,7 +4017,7 @@ func (c *client) handleGWReplyMap(msg []byte) bool {
reply = append(reply, '@')
reply = append(reply, c.pa.deliver...)
}
c.sendMsgToGateways(c.acc, msg, c.pa.subject, reply, nil)
c.sendMsgToGateways(c.acc, msg, c.pa.subject, reply, nil, false)
}
return true
}
@@ -4366,7 +4366,7 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt
flags |= pmrCollectQueueNames
var queues [][]byte
didDeliver, queues = c.processMsgResults(siAcc, rr, msg, c.pa.deliver, []byte(to), nrr, flags)
didDeliver = c.sendMsgToGateways(siAcc, msg, []byte(to), nrr, queues) || didDeliver
didDeliver = c.sendMsgToGateways(siAcc, msg, []byte(to), nrr, queues, false) || didDeliver
} else {
didDeliver, _ = c.processMsgResults(siAcc, rr, msg, c.pa.deliver, []byte(to), nrr, flags)
}
24 changes: 22 additions & 2 deletions server/gateway.go
Original file line number Diff line number Diff line change
@@ -2499,8 +2499,13 @@ var subPool = &sync.Pool{
// that the message is not sent to a given gateway if for instance
// it is known that this gateway has no interest in the account or
// subject, etc..
// When invoked from a LEAF connection, `checkLeafQF` should be passed as `true`
// so that we skip any queue subscription interest that is not part of the
// `c.pa.queues` filter (similar to what we do in `processMsgResults`). However,
// when processing service imports, then this boolean should be passes as `false`,
// regardless if it is a LEAF connection or not.
// <Invoked from any client connection's readLoop>
func (c *client) sendMsgToGateways(acc *Account, msg, subject, reply []byte, qgroups [][]byte) bool {
func (c *client) sendMsgToGateways(acc *Account, msg, subject, reply []byte, qgroups [][]byte, checkLeafQF bool) bool {
// We had some times when we were sending across a GW with no subject, and the other side would break
// due to parser error. These need to be fixed upstream but also double check here.
if len(subject) == 0 {
@@ -2577,6 +2582,21 @@ func (c *client) sendMsgToGateways(acc *Account, msg, subject, reply []byte, qgr
qsubs := qr.qsubs[i]
if len(qsubs) > 0 {
queue := qsubs[0].queue
if checkLeafQF {
// Skip any queue that is not in the leaf's queue filter.
skip := true
for _, qn := range c.pa.queues {
if bytes.Equal(queue, qn) {
skip = false
break
}
}
if skip {
continue
}
// Now we still need to check that it was not delivered
// locally by checking the given `qgroups`.
}
add := true
for _, qn := range qgroups {
if bytes.Equal(queue, qn) {
@@ -2969,7 +2989,7 @@ func (c *client) handleGatewayReply(msg []byte) (processed bool) {
// we now need to send the message with the real subject to
// gateways in case they have interest on that reply subject.
if !isServiceReply {
c.sendMsgToGateways(acc, msg, c.pa.subject, c.pa.reply, queues)
c.sendMsgToGateways(acc, msg, c.pa.subject, c.pa.reply, queues, false)
}
} else if c.kind == GATEWAY {
// Only if we are a gateway connection should we try to route
2 changes: 1 addition & 1 deletion server/leafnode.go
Original file line number Diff line number Diff line change
@@ -2781,7 +2781,7 @@ func (c *client) processInboundLeafMsg(msg []byte) {

// Now deal with gateways
if c.srv.gateway.enabled {
c.sendMsgToGateways(acc, msg, c.pa.subject, c.pa.reply, qnames)
c.sendMsgToGateways(acc, msg, c.pa.subject, c.pa.reply, qnames, true)
}
}

287 changes: 287 additions & 0 deletions server/leafnode_test.go
Original file line number Diff line number Diff line change
@@ -4554,6 +4554,289 @@ func TestLeafNodeQueueGroupDistributionWithDaisyChainAndGateway(t *testing.T) {
}
}

func TestLeafNodeAndGatewaysSingleMsgPerQueueGroup(t *testing.T) {
SetGatewaysSolicitDelay(0)
defer ResetGatewaysSolicitDelay()

accs := `
accounts {
SYS: {users: [{user:sys, password: pwd}]}
USER: {users: [{user:user, password: pwd}]}
}
system_account: SYS
`
gwUSConfTmpl := `
%s
server_name: GW_US
listen: "127.0.0.1:-1"
gateway {
name: US
listen: "127.0.0.1:-1"
}
leafnodes {
listen: "127.0.0.1:-1"
}
`
gwUSConf := createConfFile(t, []byte(fmt.Sprintf(gwUSConfTmpl, accs)))
gwUS, gwUSOpts := RunServerWithConfig(gwUSConf)
defer gwUS.Shutdown()

gwEUConfTmpl := `
%s
server_name: GW_EU
listen: "127.0.0.1:-1"
gateway {
name: EU
listen: "127.0.0.1:-1"
gateways [
{
name: US
url: "nats://127.0.0.1:%d"
}
]
}
leafnodes {
listen: "127.0.0.1:-1"
}
`
gwEUConf := createConfFile(t, []byte(fmt.Sprintf(gwEUConfTmpl, accs, gwUSOpts.Gateway.Port)))
gwEU, gwEUOpts := RunServerWithConfig(gwEUConf)
defer gwEU.Shutdown()

waitForOutboundGateways(t, gwUS, 1, time.Second)
waitForOutboundGateways(t, gwEU, 1, time.Second)
waitForInboundGateways(t, gwUS, 1, time.Second)
waitForInboundGateways(t, gwEU, 1, time.Second)

leafConfTmpl := `
%s
server_name: %s
listen: "127.0.0.1:-1"
leafnodes {
remotes [
{
url: "nats://user:pwd@127.0.0.1:%d"
account: USER
}
]
}
`
leafUSConf := createConfFile(t, []byte(fmt.Sprintf(leafConfTmpl, accs, "LEAF_US", gwUSOpts.LeafNode.Port)))
leafUS, _ := RunServerWithConfig(leafUSConf)
defer leafUS.Shutdown()
checkLeafNodeConnected(t, leafUS)

leafEUConf := createConfFile(t, []byte(fmt.Sprintf(leafConfTmpl, accs, "LEAF_EU", gwEUOpts.LeafNode.Port)))
leafEU, _ := RunServerWithConfig(leafEUConf)
defer leafEU.Shutdown()
checkLeafNodeConnected(t, leafEU)

// Order is important! (see rest of test to understand why)
var usLeafQ, usLeafPS, usGWQ, usGWPS, euGWQ, euGWPS, euLeafQ, euLeafPS, euLeafQBaz atomic.Int32
counters := []*atomic.Int32{&usLeafQ, &usLeafPS, &usGWQ, &usGWPS, &euGWQ, &euGWPS, &euLeafQ, &euLeafPS, &euLeafQBaz}
counterNames := []string{"usLeafQ", "usLeafPS", "usGWQ", "usGWPS", "euGWQ", "euGWPS", "euLeafQ", "euLeafPS", "euLeafQBaz"}
if len(counters) != len(counterNames) {
panic("Fix test!")
}
resetCounters := func() {
for _, a := range counters {
a.Store(0)
}
}

// This test will always produce from the US leaf.
ncProd := natsConnect(t, leafUS.ClientURL(), nats.UserInfo("user", "pwd"))
defer ncProd.Close()

total := int32(1)
check := func(t *testing.T, expected []int32) {
time.Sleep(50 * time.Millisecond)
resetCounters()
for i := 0; i < int(total); i++ {
natsPub(t, ncProd, "foo.1", []byte("hello"))
}
checkFor(t, 2*time.Second, 15*time.Millisecond, func() error {
for i := 0; i < len(expected); i++ {
if n := counters[i].Load(); n != expected[i] {
return fmt.Errorf("Expected counter %q to be %v, got %v", counterNames[i], expected[i], n)
}
}
return nil
})
}

// "usLeafQ", "usLeafPS", "usGWQ", "usGWPS", "euGWQ", "euGWPS", "euLeafQ", "euLeafPS", "euLeafQBaz"
for _, test := range []struct {
subs []int
expected []int32
}{
// We will always have the qsub on leaf EU, and have some permutations
// of queue and plain subs on other server(s) and check we get the
// expected distribution.

// Simple test firs, qsubs on leaf US and leaf EU, all messages stay in leaf US.
{
[]int{1, 0, 0, 0, 0, 0, 1, 0, 0},
[]int32{total, 0, 0, 0, 0, 0, 0, 0, 0},
},
// Move the queue sub from leaf US to GW US.
{
[]int{0, 0, 1, 0, 0, 0, 1, 0, 0},
[]int32{0, 0, total, 0, 0, 0, 0, 0, 0},
},
// Now move it to GW EU.
{
[]int{0, 0, 0, 0, 1, 0, 1, 0, 0},
[]int32{0, 0, 0, 0, total, 0, 0, 0, 0},
},

// More combinations...
{
[]int{1, 1, 0, 0, 0, 0, 1, 0, 0},
[]int32{total, total, 0, 0, 0, 0, 0, 0, 0},
},
{
[]int{0, 1, 1, 0, 0, 0, 1, 0, 0},
[]int32{0, total, total, 0, 0, 0, 0, 0, 0},
},
{
[]int{0, 1, 1, 1, 0, 0, 1, 0, 0},
[]int32{0, total, total, total, 0, 0, 0, 0, 0},
},
{
[]int{0, 1, 0, 1, 1, 0, 1, 0, 0},
[]int32{0, total, 0, total, total, 0, 0, 0, 0},
},
{
[]int{0, 1, 0, 1, 1, 1, 1, 0, 0},
[]int32{0, total, 0, total, total, total, 0, 0, 0},
},
// If we have the qsub in leaf US, does not matter if we have
// qsubs in GW US and EU, only leaf US should receive the messages,
// but plain sub in GW servers should get them too.
{
[]int{1, 1, 1, 0, 0, 0, 1, 0, 0},
[]int32{total, total, 0, 0, 0, 0, 0, 0, 0},
},
{
[]int{1, 1, 1, 1, 0, 0, 1, 0, 0},
[]int32{total, total, 0, total, 0, 0, 0, 0, 0},
},
{
[]int{1, 1, 1, 1, 1, 0, 1, 0, 0},
[]int32{total, total, 0, total, 0, 0, 0, 0, 0},
},
{
[]int{1, 1, 1, 1, 1, 1, 1, 0, 0},
[]int32{total, total, 0, total, 0, total, 0, 0, 0},
},
// Now back to a qsub on leaf US and leaf EU, but introduce plain sub
// interest in leaf EU
{
[]int{1, 0, 0, 0, 0, 0, 1, 1, 0},
[]int32{total, 0, 0, 0, 0, 0, 0, total, 0},
},
// And add a different queue group in leaf EU and it should get the messages too.
{
[]int{1, 0, 0, 0, 0, 0, 1, 1, 1},
[]int32{total, 0, 0, 0, 0, 0, 0, total, total},
},
// Keep plain and baz queue sub interests in leaf EU and add more combinations.
{
[]int{1, 1, 0, 0, 0, 0, 1, 1, 1},
[]int32{total, total, 0, 0, 0, 0, 0, total, total},
},
{
[]int{1, 1, 1, 0, 0, 0, 1, 1, 1},
[]int32{total, total, 0, 0, 0, 0, 0, total, total},
},
{
[]int{1, 1, 1, 1, 0, 0, 1, 1, 1},
[]int32{total, total, 0, total, 0, 0, 0, total, total},
},
{
[]int{1, 1, 1, 1, 1, 0, 1, 1, 1},
[]int32{total, total, 0, total, 0, 0, 0, total, total},
},
{
[]int{1, 1, 1, 1, 1, 1, 1, 1, 1},
[]int32{total, total, 0, total, 0, total, 0, total, total},
},
} {
t.Run(_EMPTY_, func(t *testing.T) {
if len(test.subs) != len(counters) || len(test.expected) != len(counters) {
panic("Fix test")
}

ncUS := natsConnect(t, leafUS.ClientURL(), nats.UserInfo("user", "pwd"))
defer ncUS.Close()

ncGWUS := natsConnect(t, gwUS.ClientURL(), nats.UserInfo("user", "pwd"))
defer ncGWUS.Close()

ncGWEU := natsConnect(t, gwEU.ClientURL(), nats.UserInfo("user", "pwd"))
defer ncGWEU.Close()

ncEU := natsConnect(t, leafEU.ClientURL(), nats.UserInfo("user", "pwd"))
defer ncEU.Close()

if test.subs[0] > 0 {
natsQueueSub(t, ncUS, "foo.*", "bar", func(_ *nats.Msg) {
usLeafQ.Add(1)
})
}
if test.subs[1] > 0 {
natsSub(t, ncUS, "foo.>", func(_ *nats.Msg) {
usLeafPS.Add(1)
})
}
natsFlush(t, ncUS)
if test.subs[2] > 0 {
natsQueueSub(t, ncGWUS, "foo.*", "bar", func(_ *nats.Msg) {
usGWQ.Add(1)
})
}
if test.subs[3] > 0 {
natsSub(t, ncGWUS, "foo.>", func(_ *nats.Msg) {
usGWPS.Add(1)
})
}
natsFlush(t, ncGWUS)
if test.subs[4] > 0 {
natsQueueSub(t, ncGWEU, "foo.*", "bar", func(_ *nats.Msg) {
euGWQ.Add(1)
})
}
if test.subs[5] > 0 {
natsSub(t, ncGWEU, "foo.>", func(_ *nats.Msg) {
euGWPS.Add(1)
})
}
natsFlush(t, ncGWEU)
if test.subs[6] > 0 {
natsQueueSub(t, ncEU, "foo.*", "bar", func(_ *nats.Msg) {
euLeafQ.Add(1)
})
}
if test.subs[7] > 0 {
natsSub(t, ncEU, "foo.>", func(_ *nats.Msg) {
euLeafPS.Add(1)
})
}
if test.subs[8] > 0 {
// Create on different group, called "baz"
natsQueueSub(t, ncEU, "foo.*", "baz", func(_ *nats.Msg) {
euLeafQBaz.Add(1)
})
}
natsFlush(t, ncEU)

// Check that we have what we expect.
check(t, test.expected)
})
}
}

func TestLeafNodeQueueGroupWeightCorrectOnConnectionCloseInSuperCluster(t *testing.T) {
SetGatewaysSolicitDelay(0)
defer ResetGatewaysSolicitDelay()
@@ -8044,6 +8327,8 @@ func TestLeafNodeWithWeightedDQRequestsToSuperClusterWithStreamImportAccounts(t
subs2 = append(subs2, sub)
nc.Flush()
}
// Let's them propagate
time.Sleep(100 * time.Millisecond)
defer closeSubs(subs1)
defer closeSubs(subs2)

@@ -8085,6 +8370,8 @@ func TestLeafNodeWithWeightedDQRequestsToSuperClusterWithStreamImportAccounts(t
nc.Flush()
rsubs = append(rsubs, sub)
}
// Let's them propagate
time.Sleep(100 * time.Millisecond)

nc, _ = jsClientConnect(t, ln.randomServer())
defer nc.Close()