Skip to content

Commit

Permalink
When LoadNextMsg misses, make sure to consult psim but conservatively.
Browse files Browse the repository at this point in the history
We had a use case with millions of subjects and the last sequence checked being in the next to the last block.
The consumer had a wildcard that matched lots of entries that were behind where we were.
This would burn alot of cpu and when a stream had lots of consumers and they shift leadership this would introduce some instability due to all the cpu cycles.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison authored and bruth committed Jul 29, 2024
1 parent 7e986c0 commit ef6815c
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 1 deletion.
5 changes: 4 additions & 1 deletion server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -6429,7 +6429,10 @@ func (fs *fileStore) LoadNextMsg(filter string, wc bool, start uint64, sm *Store
// Nothing found in this block. We missed, if first block (bi) check psim.
// Similar to above if start <= first seq.
// TODO(dlc) - For v2 track these by filter subject since they will represent filtered consumers.
if i == bi {
// We should not do this at all if we are already on the last block.
// Also if we are a wildcard do not check if large subject space.
const wcMaxSizeToCheck = 64 * 1024
if i == bi && i < len(fs.blks)-1 && (!wc || fs.psim.Size() < wcMaxSizeToCheck) {
nbi, lbi := fs.checkSkipFirstBlock(filter, wc)
// Nothing available.
if nbi < 0 || lbi <= bi {
Expand Down
31 changes: 31 additions & 0 deletions server/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7528,6 +7528,37 @@ func Benchmark_FileStoreLoadNextMsgVerySparseMsgsInBetweenWithWildcard(b *testin
}
}

func Benchmark_FileStoreLoadNextManySubjectsWithWildcardNearLastBlock(b *testing.B) {
fs, err := newFileStore(
FileStoreConfig{StoreDir: b.TempDir()},
StreamConfig{Name: "zzz", Subjects: []string{"foo.*.*"}, Storage: FileStorage})
require_NoError(b, err)
defer fs.Stop()

// Small om purpose.
msg := []byte("ok")

// Make first msg one that would match as well.
fs.StoreMsg("foo.1.baz", nil, msg)
// Add in a bunch of msgs.
// We need to make sure we have a range of subjects that could kick in a linear scan.
for i := 0; i < 1_000_000; i++ {
subj := fmt.Sprintf("foo.%d.bar", rand.Intn(100_000)+2)
fs.StoreMsg(subj, nil, msg)
}
// Make last msg one that would match as well.
fs.StoreMsg("foo.1.baz", nil, msg)

b.ResetTimer()

var smv StoreMsg
for i := 0; i < b.N; i++ {
// Make sure not first seq.
_, _, err := fs.LoadNextMsg("foo.*.baz", true, 999_990, &smv)
require_NoError(b, err)
}
}

func Benchmark_FileStoreLoadNextMsgVerySparseMsgsLargeTail(b *testing.B) {
fs, err := newFileStore(
FileStoreConfig{StoreDir: b.TempDir()},
Expand Down

0 comments on commit ef6815c

Please sign in to comment.