From 001ea30acbbaa0304a308c336150c0680a5059b2 Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Tue, 18 Jun 2024 11:37:15 +0100 Subject: [PATCH] Use stree for message block `fss` Should provide some deduplication of long subjects in memory. Signed-off-by: Neil Twigg --- server/filestore.go | 178 +++++++++++++++++++++------------------ server/filestore_test.go | 8 +- 2 files changed, 101 insertions(+), 85 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index f76271ce11c..e33446fb609 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -214,7 +214,7 @@ type msgBlock struct { bytes uint64 // User visible bytes count. rbytes uint64 // Total bytes (raw) including deleted. Used for rolling to new blk. msgs uint64 // User visible message count. - fss map[string]*SimpleState + fss *stree.SubjectTree[SimpleState] kfn string lwts int64 llts int64 @@ -2063,11 +2063,13 @@ func (fs *fileStore) expireMsgsOnRecover() { } // Make sure we do subject cleanup as well. mb.ensurePerSubjectInfoLoaded() - for subj, ss := range mb.fss { + mb.fss.Iter(func(bsubj []byte, ss *SimpleState) bool { + subj := bytesToString(bsubj) for i := uint64(0); i < ss.Msgs; i++ { fs.removePerSubject(subj) } - } + return true + }) mb.dirtyCloseWithRemove(true) deleted++ } @@ -2315,8 +2317,8 @@ func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *Stor mb.lsts = time.Now().UnixNano() // If we only have 1 subject currently and it matches our filter we can also set isAll. - if !isAll && len(mb.fss) == 1 { - _, isAll = mb.fss[filter] + if !isAll && mb.fss.Size() == 1 { + _, isAll = mb.fss.Find(stringToBytes(filter)) } // Make sure to start at mb.first.seq if fseq < mb.first.seq if seq := atomic.LoadUint64(&mb.first.seq); seq > fseq { @@ -2345,18 +2347,18 @@ func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *Stor // 25th quantile of a match in a linear walk. Filter should be a wildcard. // We should consult fss if our cache is not loaded and we only have fss loaded. if !doLinearScan && wc && mb.cacheAlreadyLoaded() { - doLinearScan = len(mb.fss)*4 > int(lseq-fseq) + doLinearScan = mb.fss.Size()*4 > int(lseq-fseq) } if !doLinearScan { // If we have a wildcard match against all tracked subjects we know about. if wc { subs = subs[:0] - for subj := range mb.fss { - if isMatch(subj) { - subs = append(subs, subj) + mb.fss.Match(stringToBytes(filter), func(bsubj []byte, _ *SimpleState) { + if subj := bytesToString(bsubj); isMatch(subj) { + subs = append(subs, string(bsubj)) } - } + }) // Check if we matched anything if len(subs) == 0 { return nil, didLoad, ErrStoreMsgNotFound @@ -2364,7 +2366,7 @@ func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *Stor } fseq = lseq + 1 for _, subj := range subs { - ss := mb.fss[subj] + ss, _ := mb.fss.Find(stringToBytes(subj)) if ss != nil && ss.firstNeedsUpdate { mb.recalculateFirstForSubj(subj, ss.First, ss) } @@ -2455,6 +2457,10 @@ func (mb *msgBlock) filteredPendingLocked(filter string, wc bool, sseq uint64) ( } } + if filter == _EMPTY_ { + filter = fwcs + } + update := func(ss *SimpleState) { total += ss.Msgs if first == 0 || ss.First < first { @@ -2485,8 +2491,8 @@ func (mb *msgBlock) filteredPendingLocked(filter string, wc bool, sseq uint64) ( } var havePartial bool - for subj, ss := range mb.fss { - if isAll || isMatch(subj) { + mb.fss.Match(stringToBytes(filter), func(bsubj []byte, ss *SimpleState) { + if subj := bytesToString(bsubj); isAll || isMatch(subj) { if ss.firstNeedsUpdate { mb.recalculateFirstForSubj(subj, ss.First, ss) } @@ -2495,10 +2501,9 @@ func (mb *msgBlock) filteredPendingLocked(filter string, wc bool, sseq uint64) ( } else if sseq <= ss.Last { // We matched but its a partial. havePartial = true - break } } - } + }) // If we did not encounter any partials we can return here. if !havePartial { @@ -2522,14 +2527,12 @@ func (mb *msgBlock) filteredPendingLocked(filter string, wc bool, sseq uint64) ( if sm == nil { continue } - if isAll || isMatch(sm.subj) { - total++ - if first == 0 || seq < first { - first = seq - } - if seq > last { - last = seq - } + total++ + if first == 0 || seq < first { + first = seq + } + if seq > last { + last = seq } } // If we loaded this block for this operation go ahead and expire it here. @@ -2643,6 +2646,10 @@ func (fs *fileStore) numFilteredPendingWithLast(filter string, last bool, ss *Si // Always reset. ss.First, ss.Last, ss.Msgs = 0, 0, 0 + if filter == _EMPTY_ { + filter = fwcs + } + // We do need to figure out the first and last sequences. wc := subjectHasWildcard(filter) start, stop := uint32(math.MaxUint32), uint32(0) @@ -2722,6 +2729,10 @@ func (fs *fileStore) SubjectsState(subject string) map[string]SimpleState { return nil } + if subject == _EMPTY_ { + subject = fwcs + } + start, stop := fs.blks[0], fs.lmb // We can short circuit if not a wildcard using psim for start and stop. if !subjectHasWildcard(subject) { @@ -2753,21 +2764,20 @@ func (fs *fileStore) SubjectsState(subject string) map[string]SimpleState { } // Mark fss activity. mb.lsts = time.Now().UnixNano() - for subj, ss := range mb.fss { - if subject == _EMPTY_ || subject == fwcs || subjectIsSubsetMatch(subj, subject) { - if ss.firstNeedsUpdate { - mb.recalculateFirstForSubj(subj, ss.First, ss) - } - oss := fss[subj] - if oss.First == 0 { // New - fss[subj] = *ss - } else { - // Merge here. - oss.Last, oss.Msgs = ss.Last, oss.Msgs+ss.Msgs - fss[subj] = oss - } + mb.fss.Match(stringToBytes(subject), func(bsubj []byte, ss *SimpleState) { + subj := string(bsubj) + if ss.firstNeedsUpdate { + mb.recalculateFirstForSubj(subj, ss.First, ss) } - } + oss := fss[subj] + if oss.First == 0 { // New + fss[subj] = *ss + } else { + // Merge here. + oss.Last, oss.Msgs = ss.Last, oss.Msgs+ss.Msgs + fss[subj] = oss + } + }) if shouldExpire { // Expire this cache before moving on. mb.tryForceExpireCacheLocked() @@ -2852,7 +2862,7 @@ func (fs *fileStore) MultiLastSeqs(filters []string, maxSeq uint64, maxAllowed i mb.mu.Lock() mb.ensurePerSubjectInfoLoaded() for subj, psi := range subs { - if ss := mb.fss[subj]; ss != nil { + if ss, ok := mb.fss.Find(stringToBytes(subj)); ok && ss != nil { if ss.Last <= maxSeq { seqs = append(seqs, ss.Last) delete(subs, subj) @@ -3050,20 +3060,18 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool) mb.lsts = time.Now().UnixNano() var havePartial bool - for subj, ss := range mb.fss { - if isMatch(subj) { - if ss.firstNeedsUpdate { - mb.recalculateFirstForSubj(subj, ss.First, ss) - } - if sseq <= ss.First { - t += ss.Msgs - } else if sseq <= ss.Last { - // We matched but its a partial. - havePartial = true - break - } + mb.fss.Match(stringToBytes(filter), func(bsubj []byte, ss *SimpleState) { + subj := bytesToString(bsubj) + if ss.firstNeedsUpdate { + mb.recalculateFirstForSubj(subj, ss.First, ss) } - } + if sseq <= ss.First { + t += ss.Msgs + } else if sseq <= ss.Last { + // We matched but its a partial. + havePartial = true + } + }) // See if we need to scan msgs here. if havePartial { @@ -3141,11 +3149,12 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool) // Mark fss activity. mb.lsts = time.Now().UnixNano() - for subj, ss := range mb.fss { - if isMatch(subj) { + mb.fss.Iter(func(bsubj []byte, ss *SimpleState) bool { + if subj := bytesToString(bsubj); isMatch(subj) { adjust += ss.Msgs } - } + return true + }) } } else { // This is the last block. We need to scan per message here. @@ -3266,7 +3275,7 @@ func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) { // Lock should be held to quiet race detector. mb.mu.Lock() mb.setupWriteCache(rbuf) - mb.fss = make(map[string]*SimpleState) + mb.fss = stree.NewSubjectTree[SimpleState]() // Set cache time to creation time to start. ts := time.Now().UnixNano() @@ -3718,10 +3727,11 @@ func (fs *fileStore) firstSeqForSubj(subj string) (uint64, error) { // Mark fss activity. mb.lsts = time.Now().UnixNano() - if ss := mb.fss[subj]; ss != nil { + bsubj := stringToBytes(subj) + if ss, ok := mb.fss.Find(bsubj); ok && ss != nil { // Adjust first if it was not where we thought it should be. if i != start { - if info, ok := fs.psim.Find(stringToBytes(subj)); ok { + if info, ok := fs.psim.Find(bsubj); ok { info.fblk = i } } @@ -3854,8 +3864,8 @@ func (fs *fileStore) enforceMsgPerSubjectLimit(fireCallback bool) { // Grab the ss entry for this subject in case sparse. mb.mu.Lock() mb.ensurePerSubjectInfoLoaded() - ss := mb.fss[subj] - if ss != nil && ss.firstNeedsUpdate { + ss, ok := mb.fss.Find(stringToBytes(subj)) + if ok && ss != nil && ss.firstNeedsUpdate { mb.recalculateFirstForSubj(subj, ss.First, ss) } mb.mu.Unlock() @@ -4950,11 +4960,11 @@ func (mb *msgBlock) writeMsgRecord(rl, seq uint64, subj string, mhdr, msg []byte } // Mark fss activity. mb.lsts = time.Now().UnixNano() - if ss := mb.fss[subj]; ss != nil { + if ss, ok := mb.fss.Find(stringToBytes(subj)); ok && ss != nil { ss.Msgs++ ss.Last = seq } else { - mb.fss[subj] = &SimpleState{Msgs: 1, First: seq, Last: seq} + mb.fss.Insert(stringToBytes(subj), SimpleState{Msgs: 1, First: seq, Last: seq}) } } @@ -5555,7 +5565,7 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error { // Create FSS if we should track. var popFss bool if mb.fssNotLoaded() { - mb.fss = make(map[string]*SimpleState) + mb.fss = stree.NewSubjectTree[SimpleState]() popFss = true } // Mark fss activity. @@ -5622,15 +5632,15 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error { // Handle FSS inline here. if popFss && slen > 0 && !mb.noTrack && !erased && !mb.dmap.Exists(seq) { bsubj := buf[index+msgHdrSize : index+msgHdrSize+uint32(slen)] - if ss := mb.fss[string(bsubj)]; ss != nil { + if ss, ok := mb.fss.Find(bsubj); ok && ss != nil { ss.Msgs++ ss.Last = seq } else { - mb.fss[string(bsubj)] = &SimpleState{ + mb.fss.Insert(bsubj, SimpleState{ Msgs: 1, First: seq, Last: seq, - } + }) } } } @@ -6344,7 +6354,7 @@ func (fs *fileStore) loadLast(subj string, sm *StoreMsg) (lsm *StoreMsg, err err var l uint64 // Optimize if subject is not a wildcard. if !wc { - if ss := mb.fss[subj]; ss != nil { + if ss, ok := mb.fss.Find(stringToBytes(subj)); ok && ss != nil { l = ss.Last } } @@ -7059,11 +7069,13 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) { bytes += mb.bytes // Make sure we do subject cleanup as well. mb.ensurePerSubjectInfoLoaded() - for subj, ss := range mb.fss { + mb.fss.Iter(func(bsubj []byte, ss *SimpleState) bool { + subj := bytesToString(bsubj) for i := uint64(0); i < ss.Msgs; i++ { fs.removePerSubject(subj) } - } + return true + }) // Now close. mb.dirtyCloseWithRemove(true) mb.mu.Unlock() @@ -7464,13 +7476,17 @@ func (mb *msgBlock) dirtyCloseWithRemove(remove bool) { // Lock should be held. func (mb *msgBlock) removeSeqPerSubject(subj string, seq uint64) { mb.ensurePerSubjectInfoLoaded() - ss := mb.fss[subj] - if ss == nil { + if mb.fss == nil { + return + } + bsubj := stringToBytes(subj) + ss, ok := mb.fss.Find(bsubj) + if !ok || ss == nil { return } if ss.Msgs == 1 { - delete(mb.fss, subj) + mb.fss.Delete(bsubj) return } @@ -7572,7 +7588,7 @@ func (mb *msgBlock) generatePerSubjectInfo() error { } // Create new one regardless. - mb.fss = make(map[string]*SimpleState) + mb.fss = stree.NewSubjectTree[SimpleState]() var smv StoreMsg fseq, lseq := atomic.LoadUint64(&mb.first.seq), atomic.LoadUint64(&mb.last.seq) @@ -7589,16 +7605,16 @@ func (mb *msgBlock) generatePerSubjectInfo() error { return err } if sm != nil && len(sm.subj) > 0 { - if ss := mb.fss[sm.subj]; ss != nil { + if ss, ok := mb.fss.Find(stringToBytes(sm.subj)); ok && ss != nil { ss.Msgs++ ss.Last = seq } else { - mb.fss[sm.subj] = &SimpleState{Msgs: 1, First: seq, Last: seq} + mb.fss.Insert(stringToBytes(sm.subj), SimpleState{Msgs: 1, First: seq, Last: seq}) } } } - if len(mb.fss) > 0 { + if mb.fss.Size() > 0 { // Make sure we run the cache expire timer. mb.llts = time.Now().UnixNano() // Mark fss activity. @@ -7619,7 +7635,7 @@ func (mb *msgBlock) ensurePerSubjectInfoLoaded() error { return nil } if mb.msgs == 0 { - mb.fss = make(map[string]*SimpleState) + mb.fss = stree.NewSubjectTree[SimpleState]() return nil } return mb.generatePerSubjectInfo() @@ -7636,9 +7652,8 @@ func (fs *fileStore) populateGlobalPerSubjectInfo(mb *msgBlock) { } // Now populate psim. - for subj, ss := range mb.fss { - if len(subj) > 0 { - bsubj := stringToBytes(subj) + mb.fss.Iter(func(bsubj []byte, ss *SimpleState) bool { + if len(bsubj) > 0 { if info, ok := fs.psim.Find(bsubj); ok { info.total += ss.Msgs if mb.index > info.lblk { @@ -7646,10 +7661,11 @@ func (fs *fileStore) populateGlobalPerSubjectInfo(mb *msgBlock) { } } else { fs.psim.Insert(bsubj, psi{total: ss.Msgs, fblk: mb.index, lblk: mb.index}) - fs.tsl += len(subj) + fs.tsl += len(bsubj) } } - } + return true + }) } // Close the message block. diff --git a/server/filestore_test.go b/server/filestore_test.go index 3238c296a78..5caa762a0ec 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -4098,10 +4098,10 @@ func TestFileStoreNoFSSBugAfterRemoveFirst(t *testing.T) { mb := fs.blks[0] fs.mu.Unlock() mb.mu.RLock() - ss := mb.fss["foo.bar.0"] + ss, ok := mb.fss.Find([]byte("foo.bar.0")) mb.mu.RUnlock() - if ss != nil { + if ok && ss != nil { t.Fatalf("Expected no state for %q, but got %+v\n", "foo.bar.0", ss) } }) @@ -6883,7 +6883,7 @@ func TestFileStoreFSSExpireNumPending(t *testing.T) { require_True(t, elapsed > time.Since(start)) // Sleep enough so that all mb.fss should expire, which is 2s above. - time.Sleep(3 * time.Second) + time.Sleep(4 * time.Second) fs.mu.RLock() for i, mb := range fs.blks { mb.mu.RLock() @@ -6891,7 +6891,7 @@ func TestFileStoreFSSExpireNumPending(t *testing.T) { mb.mu.RUnlock() if fss != nil { fs.mu.RUnlock() - t.Fatalf("Detected loaded fss for mb %d", i) + t.Fatalf("Detected loaded fss for mb %d (size %d)", i, fss.Size()) } } fs.mu.RUnlock()