Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry picks for v2.10.17-RC.8 #5590

Merged
merged 6 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 37 additions & 29 deletions server/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -2822,9 +2822,12 @@ func (a *Account) isIssuerClaimTrusted(claims *jwt.ActivationClaims) bool {
// check is done with the account's name, not the pointer. This is used
// during config reload where we are comparing current and new config
// in which pointers are different.
// No lock is acquired in this function, so it is assumed that the
// import maps are not changed while this executes.
// Acquires `a` read lock, but `b` is assumed to not be accessed
// by anyone but the caller (`b` is not registered anywhere).
func (a *Account) checkStreamImportsEqual(b *Account) bool {
a.mu.RLock()
defer a.mu.RUnlock()

if len(a.imports.streams) != len(b.imports.streams) {
return false
}
Expand Down Expand Up @@ -3192,6 +3195,9 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim
a.nameTag = ac.Name
a.tags = ac.Tags

// Grab trace label under lock.
tl := a.traceLabel()

// Check for external authorization.
if ac.HasExternalAuthorization() {
a.extAuth = &jwt.ExternalAuthorization{}
Expand All @@ -3212,10 +3218,10 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim
}
if a.imports.services != nil {
old.imports.services = make(map[string]*serviceImport, len(a.imports.services))
}
for k, v := range a.imports.services {
old.imports.services[k] = v
delete(a.imports.services, k)
for k, v := range a.imports.services {
old.imports.services[k] = v
delete(a.imports.services, k)
}
}

alteredScope := map[string]struct{}{}
Expand Down Expand Up @@ -3285,13 +3291,13 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim
for _, e := range ac.Exports {
switch e.Type {
case jwt.Stream:
s.Debugf("Adding stream export %q for %s", e.Subject, a.traceLabel())
s.Debugf("Adding stream export %q for %s", e.Subject, tl)
if err := a.addStreamExportWithAccountPos(
string(e.Subject), authAccounts(e.TokenReq), e.AccountTokenPosition); err != nil {
s.Debugf("Error adding stream export to account [%s]: %v", a.traceLabel(), err.Error())
s.Debugf("Error adding stream export to account [%s]: %v", tl, err.Error())
}
case jwt.Service:
s.Debugf("Adding service export %q for %s", e.Subject, a.traceLabel())
s.Debugf("Adding service export %q for %s", e.Subject, tl)
rt := Singleton
switch e.ResponseType {
case jwt.ResponseTypeStream:
Expand All @@ -3301,7 +3307,7 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim
}
if err := a.addServiceExportWithResponseAndAccountPos(
string(e.Subject), rt, authAccounts(e.TokenReq), e.AccountTokenPosition); err != nil {
s.Debugf("Error adding service export to account [%s]: %v", a.traceLabel(), err)
s.Debugf("Error adding service export to account [%s]: %v", tl, err)
continue
}
sub := string(e.Subject)
Expand All @@ -3311,13 +3317,13 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim
if e.Latency.Sampling == jwt.Headers {
hdrNote = " (using headers)"
}
s.Debugf("Error adding latency tracking%s for service export to account [%s]: %v", hdrNote, a.traceLabel(), err)
s.Debugf("Error adding latency tracking%s for service export to account [%s]: %v", hdrNote, tl, err)
}
}
if e.ResponseThreshold != 0 {
// Response threshold was set in options.
if err := a.SetServiceExportResponseThreshold(sub, e.ResponseThreshold); err != nil {
s.Debugf("Error adding service export response threshold for [%s]: %v", a.traceLabel(), err)
s.Debugf("Error adding service export response threshold for [%s]: %v", tl, err)
}
}
}
Expand Down Expand Up @@ -3362,44 +3368,41 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim
}
var incompleteImports []*jwt.Import
for _, i := range ac.Imports {
// check tmpAccounts with priority
var acc *Account
var err error
if v, ok := s.tmpAccounts.Load(i.Account); ok {
acc = v.(*Account)
} else {
acc, err = s.lookupAccount(i.Account)
}
acc, err := s.lookupAccount(i.Account)
if acc == nil || err != nil {
s.Errorf("Can't locate account [%s] for import of [%v] %s (err=%v)", i.Account, i.Subject, i.Type, err)
incompleteImports = append(incompleteImports, i)
continue
}
from := string(i.Subject)
to := i.GetTo()
// Capture trace labels.
acc.mu.RLock()
atl := acc.traceLabel()
acc.mu.RUnlock()
// Grab from and to
from, to := string(i.Subject), i.GetTo()
switch i.Type {
case jwt.Stream:
if i.LocalSubject != _EMPTY_ {
// set local subject implies to is empty
to = string(i.LocalSubject)
s.Debugf("Adding stream import %s:%q for %s:%q", acc.traceLabel(), from, a.traceLabel(), to)
s.Debugf("Adding stream import %s:%q for %s:%q", atl, from, tl, to)
err = a.AddMappedStreamImportWithClaim(acc, from, to, i)
} else {
s.Debugf("Adding stream import %s:%q for %s:%q", acc.traceLabel(), from, a.traceLabel(), to)
s.Debugf("Adding stream import %s:%q for %s:%q", atl, from, tl, to)
err = a.AddStreamImportWithClaim(acc, from, to, i)
}
if err != nil {
s.Debugf("Error adding stream import to account [%s]: %v", a.traceLabel(), err.Error())
s.Debugf("Error adding stream import to account [%s]: %v", tl, err.Error())
incompleteImports = append(incompleteImports, i)
}
case jwt.Service:
if i.LocalSubject != _EMPTY_ {
from = string(i.LocalSubject)
to = string(i.Subject)
}
s.Debugf("Adding service import %s:%q for %s:%q", acc.traceLabel(), from, a.traceLabel(), to)
s.Debugf("Adding service import %s:%q for %s:%q", atl, from, tl, to)
if err := a.AddServiceImportWithClaim(acc, from, to, i); err != nil {
s.Debugf("Error adding service import to account [%s]: %v", a.traceLabel(), err.Error())
s.Debugf("Error adding service import to account [%s]: %v", tl, err.Error())
incompleteImports = append(incompleteImports, i)
}
}
Expand Down Expand Up @@ -3570,7 +3573,7 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim
// regardless of enabled or disabled. It handles both cases.
if jsEnabled {
if err := s.configJetStream(a); err != nil {
s.Errorf("Error configuring jetstream for account [%s]: %v", a.traceLabel(), err.Error())
s.Errorf("Error configuring jetstream for account [%s]: %v", tl, err.Error())
a.mu.Lock()
// Absent reload of js server cfg, this is going to be broken until js is disabled
a.incomplete = true
Expand Down Expand Up @@ -3707,8 +3710,13 @@ func (s *Server) buildInternalAccount(ac *jwt.AccountClaims) *Account {
// We don't want to register an account that is in the process of
// being built, however, to solve circular import dependencies, we
// need to store it here.
s.tmpAccounts.Store(ac.Subject, acc)
if v, loaded := s.tmpAccounts.LoadOrStore(ac.Subject, acc); loaded {
return v.(*Account)
}

// Update based on claims.
s.UpdateAccountClaims(acc, ac)

return acc
}

Expand Down
5 changes: 4 additions & 1 deletion server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2914,8 +2914,11 @@ func (c *client) addShadowSubscriptions(acc *Account, sub *subscription, enact b

// Add in the shadow subscription.
func (c *client) addShadowSub(sub *subscription, ime *ime, enact bool) (*subscription, error) {
im := ime.im
c.mu.Lock()
nsub := *sub // copy
c.mu.Unlock()

im := ime.im
nsub.im = im

if !im.usePub && ime.dyn && im.tr != nil {
Expand Down
30 changes: 20 additions & 10 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2596,28 +2596,38 @@ func (fs *fileStore) FilteredState(sseq uint64, subj string) SimpleState {

// This is used to see if we can selectively jump start blocks based on filter subject and a floor block index.
// Will return -1 if no matches at all.
func (fs *fileStore) checkSkipFirstBlock(filter string, wc bool) int {
start := uint32(math.MaxUint32)
func (fs *fileStore) checkSkipFirstBlock(filter string, wc bool) (int, int) {
start, stop := uint32(math.MaxUint32), uint32(0)
if wc {
fs.psim.Match(stringToBytes(filter), func(_ []byte, psi *psi) {
if psi.fblk < start {
start = psi.fblk
}
if psi.lblk > stop {
stop = psi.lblk
}
})
} else if psi, ok := fs.psim.Find(stringToBytes(filter)); ok {
start = psi.fblk
start, stop = psi.fblk, psi.lblk
}
// Nothing found.
if start == uint32(math.MaxUint32) {
return -1
return -1, -1
}
// Here we need to translate this to index into fs.blks.
// Here we need to translate this to index into fs.blks properly.
mb := fs.bim[start]
if mb == nil {
return -1
return -1, -1
}
bi, _ := fs.selectMsgBlockWithIndex(atomic.LoadUint64(&mb.last.seq))
return bi
fi, _ := fs.selectMsgBlockWithIndex(atomic.LoadUint64(&mb.last.seq))

mb = fs.bim[stop]
if mb == nil {
return -1, -1
}
li, _ := fs.selectMsgBlockWithIndex(atomic.LoadUint64(&mb.last.seq))

return fi, li
}

// Optimized way for getting all num pending matching a filter subject.
Expand Down Expand Up @@ -6362,9 +6372,9 @@ func (fs *fileStore) LoadNextMsg(filter string, wc bool, start uint64, sm *Store
// Similar to above if start <= first seq.
// TODO(dlc) - For v2 track these by filter subject since they will represent filtered consumers.
if i == bi {
nbi := fs.checkSkipFirstBlock(filter, wc)
nbi, lbi := fs.checkSkipFirstBlock(filter, wc)
// Nothing available.
if nbi < 0 {
if nbi < 0 || lbi <= bi {
return nil, fs.state.LastSeq, ErrStoreEOF
}
// See if we can jump ahead here.
Expand Down
81 changes: 81 additions & 0 deletions server/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7162,6 +7162,58 @@ func TestFileStoreFilteredPendingPSIMFirstBlockUpdateNextBlock(t *testing.T) {
require_Equal(t, psi.lblk, 4)
}

func TestFileStoreLargeSparseMsgsDoNotLoadAfterLast(t *testing.T) {
sd := t.TempDir()
fs, err := newFileStore(
FileStoreConfig{StoreDir: sd, BlockSize: 128},
StreamConfig{Name: "zzz", Subjects: []string{"foo.*.*"}, Storage: FileStorage})
require_NoError(t, err)
defer fs.Stop()

msg := []byte("hello")
// Create 2 blocks with each, each block holds 2 msgs
for i := 0; i < 2; i++ {
fs.StoreMsg("foo.22.bar", nil, msg)
fs.StoreMsg("foo.22.baz", nil, msg)
}
// Now create 8 more blocks with just baz. So no matches for these 8 blocks
// for "foo.22.bar".
for i := 0; i < 8; i++ {
fs.StoreMsg("foo.22.baz", nil, msg)
fs.StoreMsg("foo.22.baz", nil, msg)
}
require_Equal(t, fs.numMsgBlocks(), 10)

// Remove all blk cache and fss.
fs.mu.RLock()
for _, mb := range fs.blks {
mb.mu.Lock()
mb.fss, mb.cache = nil, nil
mb.mu.Unlock()
}
fs.mu.RUnlock()

// "foo.22.bar" is at sequence 1 and 3.
// Make sure if we do a LoadNextMsg() starting at 4 that we do not load
// all the tail blocks.
_, _, err = fs.LoadNextMsg("foo.*.bar", true, 4, nil)
require_Error(t, err, ErrStoreEOF)

// Now make sure we did not load fss and cache.
var loaded int
fs.mu.RLock()
for _, mb := range fs.blks {
mb.mu.RLock()
if mb.cache != nil || mb.fss != nil {
loaded++
}
mb.mu.RUnlock()
}
fs.mu.RUnlock()
// We will load first block for starting seq 4, but no others should have loaded.
require_Equal(t, loaded, 1)
}

///////////////////////////////////////////////////////////////////////////
// Benchmarks
///////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -7419,3 +7471,32 @@ func Benchmark_FileStoreLoadNextMsgVerySparseMsgsInBetweenWithWildcard(b *testin
require_NoError(b, err)
}
}

func Benchmark_FileStoreLoadNextMsgVerySparseMsgsLargeTail(b *testing.B) {
fs, err := newFileStore(
FileStoreConfig{StoreDir: b.TempDir()},
StreamConfig{Name: "zzz", Subjects: []string{"foo.*.*"}, Storage: FileStorage})
require_NoError(b, err)
defer fs.Stop()

// Small om purpose.
msg := []byte("ok")

// Make first msg one that would match as well.
fs.StoreMsg("foo.1.baz", nil, msg)
// Add in a bunch of msgs.
// We need to make sure we have a range of subjects that could kick in a linear scan.
for i := 0; i < 1_000_000; i++ {
subj := fmt.Sprintf("foo.%d.bar", rand.Intn(100_000)+2)
fs.StoreMsg(subj, nil, msg)
}

b.ResetTimer()

var smv StoreMsg
for i := 0; i < b.N; i++ {
// Make sure not first seq.
_, _, err := fs.LoadNextMsg("foo.*.baz", true, 2, &smv)
require_Error(b, err, ErrStoreEOF)
}
}
Loading