Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] LeafNode: queue distribution with daisy-chain and gateway #6126

Merged
merged 1 commit into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 34 additions & 13 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4752,6 +4752,21 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver,
// Declared here because of goto.
var queues [][]byte

var leafOrigin string
switch c.kind {
case ROUTER:
if len(c.pa.origin) > 0 {
// Picture a message sent from a leafnode to a server that then routes
// this message: CluserA -leaf-> HUB1 -route-> HUB2
// Here we are in HUB2, so c.kind is a ROUTER, but the message will
// contain a c.pa.origin set to "ClusterA" to indicate that this message
// originated from that leafnode cluster.
leafOrigin = bytesToString(c.pa.origin)
}
case LEAF:
leafOrigin = c.remoteCluster()
}

// For all routes/leaf/gateway connections, we may still want to send messages to
// leaf nodes or routes even if there are no queue filters since we collect
// them above and do not process inline like normal clients.
Expand Down Expand Up @@ -4791,7 +4806,13 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver,
for i := 0; i < len(qsubs); i++ {
sub = qsubs[i]
if dst := sub.client.kind; dst == LEAF || dst == ROUTER {
// If we have assigned an ROUTER rsub already, replace if
// If the destination is a LEAF, we first need to make sure
// that we would not pick one that was the origin of this
// message.
if dst == LEAF && leafOrigin != _EMPTY_ && leafOrigin == sub.client.remoteCluster() {
continue
}
// If we have assigned a ROUTER rsub already, replace if
// the destination is a LEAF since we want to favor that.
if rsub == nil || (rsub.client.kind == ROUTER && dst == LEAF) {
rsub = sub
Expand All @@ -4817,6 +4838,8 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver,
}

// Find a subscription that is able to deliver this message starting at a random index.
// Note that if the message came from a ROUTER, we will only have CLIENT or LEAF
// queue subs here, otherwise we can have all types.
for i := 0; i < lqs; i++ {
if sindex+i < lqs {
sub = qsubs[sindex+i]
Expand All @@ -4837,6 +4860,11 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver,
// Here we just care about a client or leaf and skipping a leaf and preferring locals.
if dst := sub.client.kind; dst == ROUTER || dst == LEAF {
if (src == LEAF || src == CLIENT) && dst == LEAF {
// If we come from a LEAF and are about to pick a LEAF connection,
// make sure this is not the same leaf cluster.
if src == LEAF && leafOrigin != _EMPTY_ && leafOrigin == sub.client.remoteCluster() {
continue
}
// Remember that leaf in case we don't find any other candidate.
if rsub == nil {
rsub = sub
Expand Down Expand Up @@ -4980,18 +5008,11 @@ sendToRoutesOrLeafs:
// If so make sure we do not send it back to the same cluster for a different
// leafnode. Cluster wide no echo.
if dc.kind == LEAF {
// Check two scenarios. One is inbound from a route (c.pa.origin)
if c.kind == ROUTER && len(c.pa.origin) > 0 {
if bytesToString(c.pa.origin) == dc.remoteCluster() {
continue
}
}
// The other is leaf to leaf.
if c.kind == LEAF {
src, dest := c.remoteCluster(), dc.remoteCluster()
if src != _EMPTY_ && src == dest {
continue
}
// Check two scenarios. One is inbound from a route (c.pa.origin),
// and the other is leaf to leaf. In both case, leafOrigin is the one
// to use for the comparison.
if leafOrigin != _EMPTY_ && leafOrigin == dc.remoteCluster() {
continue
}

// We need to check if this is a request that has a stamped client information header.
Expand Down
Loading