Skip to content

Commit 84474c6

Browse files
derekcollisonneilalexander
authored andcommitted
Added LoadPrevMsg to optimize when we walk backwards looking for source sequences and we have lots of interior deletes.
Signed-off-by: Derek Collison <derek@nats.io>
1 parent 299a411 commit 84474c6

File tree

5 files changed

+164
-8
lines changed

5 files changed

+164
-8
lines changed

server/filestore.go

+51
Original file line numberDiff line numberDiff line change
@@ -6898,6 +6898,57 @@ func (fs *fileStore) LoadNextMsg(filter string, wc bool, start uint64, sm *Store
68986898
return nil, fs.state.LastSeq, ErrStoreEOF
68996899
}
69006900

6901+
// Will load the next non-deleted msg starting at the start sequence and walking backwards.
6902+
func (fs *fileStore) LoadPrevMsg(start uint64, smp *StoreMsg) (sm *StoreMsg, err error) {
6903+
fs.mu.RLock()
6904+
defer fs.mu.RUnlock()
6905+
6906+
if fs.closed {
6907+
return nil, ErrStoreClosed
6908+
}
6909+
if fs.state.Msgs == 0 || start < fs.state.FirstSeq {
6910+
return nil, ErrStoreEOF
6911+
}
6912+
6913+
if start > fs.state.LastSeq {
6914+
start = fs.state.LastSeq
6915+
}
6916+
if smp == nil {
6917+
smp = new(StoreMsg)
6918+
}
6919+
6920+
if bi, _ := fs.selectMsgBlockWithIndex(start); bi >= 0 {
6921+
for i := bi; i >= 0; i-- {
6922+
mb := fs.blks[i]
6923+
mb.mu.Lock()
6924+
// Need messages loaded from here on out.
6925+
if mb.cacheNotLoaded() {
6926+
if err := mb.loadMsgsWithLock(); err != nil {
6927+
mb.mu.Unlock()
6928+
return nil, err
6929+
}
6930+
}
6931+
6932+
lseq, fseq := atomic.LoadUint64(&mb.last.seq), atomic.LoadUint64(&mb.first.seq)
6933+
if start > lseq {
6934+
start = lseq
6935+
}
6936+
for seq := start; seq >= fseq; seq-- {
6937+
if mb.dmap.Exists(seq) {
6938+
continue
6939+
}
6940+
if sm, err := mb.cacheLookup(seq, smp); err == nil {
6941+
mb.mu.Unlock()
6942+
return sm, nil
6943+
}
6944+
}
6945+
mb.mu.Unlock()
6946+
}
6947+
}
6948+
6949+
return nil, ErrStoreEOF
6950+
}
6951+
69016952
// Type returns the type of the underlying store.
69026953
func (fs *fileStore) Type() StorageType {
69036954
return FileStorage

server/memstore.go

+27
Original file line numberDiff line numberDiff line change
@@ -1299,6 +1299,33 @@ func (ms *memStore) LoadNextMsg(filter string, wc bool, start uint64, smp *Store
12991299
return nil, ms.state.LastSeq, ErrStoreEOF
13001300
}
13011301

1302+
// Will load the next non-deleted msg starting at the start sequence and walking backwards.
1303+
func (ms *memStore) LoadPrevMsg(start uint64, smp *StoreMsg) (sm *StoreMsg, err error) {
1304+
ms.mu.RLock()
1305+
defer ms.mu.RUnlock()
1306+
1307+
if ms.msgs == nil {
1308+
return nil, ErrStoreClosed
1309+
}
1310+
if ms.state.Msgs == 0 || start < ms.state.FirstSeq {
1311+
return nil, ErrStoreEOF
1312+
}
1313+
if start > ms.state.LastSeq {
1314+
start = ms.state.LastSeq
1315+
}
1316+
1317+
for seq := start; seq >= ms.state.FirstSeq; seq-- {
1318+
if sm, ok := ms.msgs[seq]; ok {
1319+
if smp == nil {
1320+
smp = new(StoreMsg)
1321+
}
1322+
sm.copy(smp)
1323+
return smp, nil
1324+
}
1325+
}
1326+
return nil, ErrStoreEOF
1327+
}
1328+
13021329
// RemoveMsg will remove the message from this store.
13031330
// Will return the number of bytes removed.
13041331
func (ms *memStore) RemoveMsg(seq uint64) (bool, error) {

server/norace_test.go

+68
Original file line numberDiff line numberDiff line change
@@ -9004,6 +9004,7 @@ func TestNoRaceStoreStreamEncoderDecoder(t *testing.T) {
90049004
}
90059005
ms, err := newMemStore(cfg)
90069006
require_NoError(t, err)
9007+
defer ms.Stop()
90079008

90089009
fs, err := newFileStore(
90099010
FileStoreConfig{StoreDir: t.TempDir()},
@@ -11288,3 +11289,70 @@ func TestNoRaceJetStreamClusterLargeMetaSnapshotTiming(t *testing.T) {
1128811289
require_NoError(t, n.InstallSnapshot(snap))
1128911290
t.Logf("Took %v to snap meta with size of %v\n", time.Since(start), friendlyBytes(len(snap)))
1129011291
}
11292+
11293+
func TestNoRaceStoreReverseWalkWithDeletesPerf(t *testing.T) {
11294+
cfg := StreamConfig{Name: "zzz", Subjects: []string{"foo.*"}, Storage: FileStorage}
11295+
11296+
fs, err := newFileStore(FileStoreConfig{StoreDir: t.TempDir()}, cfg)
11297+
require_NoError(t, err)
11298+
defer fs.Stop()
11299+
11300+
cfg.Storage = MemoryStorage
11301+
ms, err := newMemStore(&cfg)
11302+
require_NoError(t, err)
11303+
defer ms.Stop()
11304+
11305+
msg := []byte("Hello")
11306+
11307+
for _, store := range []StreamStore{fs, ms} {
11308+
store.StoreMsg("foo.A", nil, msg)
11309+
for i := 0; i < 1_000_000; i++ {
11310+
store.StoreMsg("foo.B", nil, msg)
11311+
}
11312+
store.StoreMsg("foo.C", nil, msg)
11313+
11314+
var ss StreamState
11315+
store.FastState(&ss)
11316+
require_Equal(t, ss.Msgs, 1_000_002)
11317+
11318+
// Create a bunch of interior deletes.
11319+
p, err := store.PurgeEx("foo.B", 1, 0)
11320+
require_NoError(t, err)
11321+
require_Equal(t, p, 1_000_000)
11322+
11323+
// Now simulate a walk backwards as we currently do when searching for starting sequence numbers in sourced streams.
11324+
start := time.Now()
11325+
var smv StoreMsg
11326+
for seq := ss.LastSeq; seq > 0; seq-- {
11327+
_, err := store.LoadMsg(seq, &smv)
11328+
if err == errDeletedMsg || err == ErrStoreMsgNotFound {
11329+
continue
11330+
}
11331+
require_NoError(t, err)
11332+
}
11333+
elapsed := time.Since(start)
11334+
11335+
// Now use the optimized load prev.
11336+
seq, seen := ss.LastSeq, 0
11337+
start = time.Now()
11338+
for {
11339+
sm, err := store.LoadPrevMsg(seq, &smv)
11340+
if err == ErrStoreEOF {
11341+
break
11342+
}
11343+
require_NoError(t, err)
11344+
seq = sm.seq - 1
11345+
seen++
11346+
}
11347+
elapsedNew := time.Since(start)
11348+
require_Equal(t, seen, 2)
11349+
11350+
switch store.(type) {
11351+
case *memStore:
11352+
require_True(t, elapsedNew < elapsed)
11353+
case *fileStore:
11354+
// Bigger gains for filestore, 10x
11355+
require_True(t, elapsedNew*10 < elapsed)
11356+
}
11357+
}
11358+
}

server/store.go

+1
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ type StreamStore interface {
9191
LoadNextMsg(filter string, wc bool, start uint64, smp *StoreMsg) (sm *StoreMsg, skip uint64, err error)
9292
LoadNextMsgMulti(sl *Sublist, start uint64, smp *StoreMsg) (sm *StoreMsg, skip uint64, err error)
9393
LoadLastMsg(subject string, sm *StoreMsg) (*StoreMsg, error)
94+
LoadPrevMsg(start uint64, smp *StoreMsg) (sm *StoreMsg, err error)
9495
RemoveMsg(seq uint64) (bool, error)
9596
EraseMsg(seq uint64) (bool, error)
9697
Purge() (uint64, error)

server/stream.go

+17-8
Original file line numberDiff line numberDiff line change
@@ -3498,16 +3498,21 @@ func (mset *stream) setStartingSequenceForSources(iNames map[string]struct{}) {
34983498
}
34993499

35003500
var smv StoreMsg
3501-
for seq := state.LastSeq; seq >= state.FirstSeq; seq-- {
3502-
sm, err := mset.store.LoadMsg(seq, &smv)
3503-
if err != nil || len(sm.hdr) == 0 {
3501+
for seq := state.LastSeq; seq >= state.FirstSeq; {
3502+
sm, err := mset.store.LoadPrevMsg(seq, &smv)
3503+
if err == ErrStoreEOF || err != nil {
3504+
break
3505+
}
3506+
seq = sm.seq - 1
3507+
if len(sm.hdr) == 0 {
35043508
continue
35053509
}
3510+
35063511
ss := getHeader(JSStreamSource, sm.hdr)
35073512
if len(ss) == 0 {
35083513
continue
35093514
}
3510-
streamName, indexName, sseq := streamAndSeq(string(ss))
3515+
streamName, indexName, sseq := streamAndSeq(bytesToString(ss))
35113516

35123517
if _, ok := iNames[indexName]; ok {
35133518
si := mset.sources[indexName]
@@ -3613,17 +3618,21 @@ func (mset *stream) startingSequenceForSources() {
36133618
}
36143619

36153620
var smv StoreMsg
3616-
for seq := state.LastSeq; seq >= state.FirstSeq; seq-- {
3617-
sm, err := mset.store.LoadMsg(seq, &smv)
3618-
if err != nil || sm == nil || len(sm.hdr) == 0 {
3621+
for seq := state.LastSeq; ; {
3622+
sm, err := mset.store.LoadPrevMsg(seq, &smv)
3623+
if err == ErrStoreEOF || err != nil {
3624+
break
3625+
}
3626+
seq = sm.seq - 1
3627+
if len(sm.hdr) == 0 {
36193628
continue
36203629
}
36213630
ss := getHeader(JSStreamSource, sm.hdr)
36223631
if len(ss) == 0 {
36233632
continue
36243633
}
36253634

3626-
streamName, iName, sseq := streamAndSeq(string(ss))
3635+
streamName, iName, sseq := streamAndSeq(bytesToString(ss))
36273636
if iName == _EMPTY_ { // Pre-2.10 message header means it's a match for any source using that stream name
36283637
for _, ssi := range mset.cfg.Sources {
36293638
if streamName == ssi.Name || (ssi.External != nil && streamName == ssi.Name+":"+getHash(ssi.External.ApiPrefix)) {

0 commit comments

Comments
 (0)