Skip to content

Commit

Permalink
Cherry-picks for 2.10.24-RC.1 (#6251)
Browse files Browse the repository at this point in the history
Includes the following:

- #6226
- #6232
- #6235
- #6064
- #6244
- #6246
- #6247
- #6248
- #6250

Signed-off-by: Neil Twigg <neil@nats.io>
  • Loading branch information
neilalexander authored Dec 13, 2024
2 parents 145e44d + aca44bf commit 64392c2
Show file tree
Hide file tree
Showing 18 changed files with 601 additions and 301 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/nats-io/nkeys v0.4.8
github.com/nats-io/nuid v1.0.1
go.uber.org/automaxprocs v1.6.0
golang.org/x/crypto v0.30.0
golang.org/x/crypto v0.31.0
golang.org/x/sys v0.28.0
golang.org/x/time v0.8.0
)
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMT
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs=
go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8=
golang.org/x/crypto v0.30.0 h1:RwoQn3GkWiMkzlX562cLB7OxWvjH1L8xutO2WoJcRoY=
golang.org/x/crypto v0.30.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk=
golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U=
golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk=
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA=
golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
Expand Down
35 changes: 15 additions & 20 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,9 @@ type client struct {
last time.Time
lastIn time.Time

repliesSincePrune uint16
lastReplyPrune time.Time

headers bool

rtt time.Duration
Expand Down Expand Up @@ -420,6 +423,7 @@ const (
pruneSize = 32
routeTargetInit = 8
replyPermLimit = 4096
replyPruneTime = time.Second
)

// Represent read cache booleans with a bitmask
Expand Down Expand Up @@ -3526,9 +3530,11 @@ func (c *client) deliverMsg(prodIsMQTT bool, sub *subscription, acc *Account, su

// If we are tracking dynamic publish permissions that track reply subjects,
// do that accounting here. We only look at client.replies which will be non-nil.
if client.replies != nil && len(reply) > 0 {
// Only reply subject permissions if the client is not already allowed to publish to the reply subject.
if client.replies != nil && len(reply) > 0 && !client.pubAllowedFullCheck(string(reply), true, true) {
client.replies[string(reply)] = &resp{time.Now(), 0}
if len(client.replies) > replyPermLimit {
client.repliesSincePrune++
if client.repliesSincePrune > replyPermLimit || time.Since(client.lastReplyPrune) > replyPruneTime {
client.pruneReplyPerms()
}
}
Expand Down Expand Up @@ -3652,6 +3658,9 @@ func (c *client) pruneReplyPerms() {
delete(c.replies, k)
}
}

c.repliesSincePrune = 0
c.lastReplyPrune = now
}

// pruneDenyCache will prune the deny cache via randomly
Expand Down Expand Up @@ -3720,7 +3729,7 @@ func (c *client) pubAllowedFullCheck(subject string, fullCheck, hasLock bool) bo
allowed = np == 0
}

// If we are currently not allowed but we are tracking reply subjects
// If we are tracking reply subjects
// dynamically, check to see if we are allowed here but avoid pcache.
// We need to acquire the lock though.
if !allowed && fullCheck && c.perms.resp != nil {
Expand Down Expand Up @@ -4160,9 +4169,8 @@ func getHeader(key string, hdr []byte) []byte {

// For bytes.HasPrefix below.
var (
jsRequestNextPreB = []byte(jsRequestNextPre)
jsDirectGetPreB = []byte(jsDirectGetPre)
jsConsumerInfoPreB = []byte(JSApiConsumerInfoPre)
jsRequestNextPreB = []byte(jsRequestNextPre)
jsDirectGetPreB = []byte(jsDirectGetPre)
)

// processServiceImport is an internal callback when a subscription matches an imported service
Expand All @@ -4182,16 +4190,12 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt
}
}

var checkJS, checkConsumerInfo bool

acc.mu.RLock()
var checkJS bool
shouldReturn := si.invalid || acc.sl == nil
if !shouldReturn && !isResponse && si.to == jsAllAPI {
if bytes.HasPrefix(c.pa.subject, jsDirectGetPreB) || bytes.HasPrefix(c.pa.subject, jsRequestNextPreB) {
checkJS = true
} else if len(c.pa.psi) == 0 && bytes.HasPrefix(c.pa.subject, jsConsumerInfoPreB) {
// Only check if we are clustered and expecting a reply.
checkConsumerInfo = len(c.pa.reply) > 0 && c.srv.JetStreamIsClustered()
}
}
siAcc := si.acc
Expand All @@ -4205,15 +4209,6 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt
return
}

// Here we will do a fast check for consumer info only to check if it does not exists. This will spread the
// load to all servers with connected clients since service imports are processed at point of entry.
// Only call for clustered setups.
if checkConsumerInfo && si.se != nil && si.se.acc == c.srv.SystemAccount() {
if c.srv.jsConsumerProcessMissing(c, acc) {
return
}
}

var nrr []byte
var rsi *serviceImport

Expand Down
17 changes: 7 additions & 10 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2667,23 +2667,20 @@ func (o *consumer) infoWithSnapAndReply(snap bool, reply string) *ConsumerInfo {
TimeStamp: time.Now().UTC(),
}

// If we are replicated and we are not the leader or we are filtered, we need to pull certain data from our store.
isLeader := o.isLeader()
if rg != nil && rg.node != nil && o.store != nil && (!isLeader || o.isFiltered()) {
// If we are replicated, we need to pull certain data from our store.
if rg != nil && rg.node != nil && o.store != nil {
state, err := o.store.BorrowState()
if err != nil {
o.mu.Unlock()
return nil
}
if !isLeader {
info.Delivered.Consumer, info.Delivered.Stream = state.Delivered.Consumer, state.Delivered.Stream
info.AckFloor.Consumer, info.AckFloor.Stream = state.AckFloor.Consumer, state.AckFloor.Stream
// If we are the leader we could have o.sseq that is skipped ahead.
// To maintain consistency in reporting (e.g. jsz) we always take the state for our delivered/ackfloor stream sequence.
info.Delivered.Consumer, info.Delivered.Stream = state.Delivered.Consumer, state.Delivered.Stream
info.AckFloor.Consumer, info.AckFloor.Stream = state.AckFloor.Consumer, state.AckFloor.Stream
if !o.isLeader() {
info.NumAckPending = len(state.Pending)
info.NumRedelivered = len(state.Redelivered)
} else {
// Since we are filtered and we are the leader we could have o.sseq that is skipped ahead.
// To maintain consistency in reporting (e.g. jsz) we take the state for our delivered stream sequence.
info.Delivered.Stream = state.Delivered.Stream
}
}

Expand Down
Loading

0 comments on commit 64392c2

Please sign in to comment.