Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stree: Split Iter into IterFast and IterOrdered #6458

Merged
merged 1 commit into from
Feb 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2117,7 +2117,7 @@ func (fs *fileStore) expireMsgsOnRecover() error {
}
// Make sure we do subject cleanup as well.
mb.ensurePerSubjectInfoLoaded()
mb.fss.Iter(func(bsubj []byte, ss *SimpleState) bool {
mb.fss.IterOrdered(func(bsubj []byte, ss *SimpleState) bool {
subj := bytesToString(bsubj)
for i := uint64(0); i < ss.Msgs; i++ {
fs.removePerSubject(subj, false)
Expand Down Expand Up @@ -3446,7 +3446,7 @@ func (fs *fileStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject bo

// See if filter was provided but its the only subject.
if !isAll && fs.psim.Size() == 1 {
fs.psim.Iter(func(subject []byte, _ *psi) bool {
fs.psim.IterFast(func(subject []byte, _ *psi) bool {
isAll = sl.HasInterest(bytesToString(subject))
return true
})
Expand Down Expand Up @@ -4381,7 +4381,7 @@ func (fs *fileStore) enforceMsgPerSubjectLimit(fireCallback bool) {

// collect all that are not correct.
needAttention := make(map[string]*psi)
fs.psim.Iter(func(subj []byte, psi *psi) bool {
fs.psim.IterFast(func(subj []byte, psi *psi) bool {
numMsgs += psi.total
if psi.total > maxMsgsPer {
needAttention[string(subj)] = psi
Expand All @@ -4406,7 +4406,7 @@ func (fs *fileStore) enforceMsgPerSubjectLimit(fireCallback bool) {
fs.rebuildStateLocked(nil)
// Need to redo blocks that need attention.
needAttention = make(map[string]*psi)
fs.psim.Iter(func(subj []byte, psi *psi) bool {
fs.psim.IterFast(func(subj []byte, psi *psi) bool {
if psi.total > maxMsgsPer {
needAttention[string(subj)] = psi
}
Expand Down Expand Up @@ -7798,7 +7798,7 @@ func (fs *fileStore) purge(fseq uint64) (uint64, error) {
fs.bim = make(map[uint32]*msgBlock)
// Subject delete markers if needed.
if fs.cfg.SubjectDeleteMarkerTTL > 0 {
fs.psim.Iter(func(subject []byte, _ *psi) bool {
fs.psim.IterOrdered(func(subject []byte, _ *psi) bool {
fs.markers = append(fs.markers, string(subject))
return true
})
Expand Down Expand Up @@ -7911,7 +7911,7 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) {
bytes += mb.bytes
// Make sure we do subject cleanup as well.
mb.ensurePerSubjectInfoLoaded()
mb.fss.Iter(func(bsubj []byte, ss *SimpleState) bool {
mb.fss.IterOrdered(func(bsubj []byte, ss *SimpleState) bool {
subj := bytesToString(bsubj)
for i := uint64(0); i < ss.Msgs; i++ {
fs.removePerSubject(subj, fs.cfg.SubjectDeleteMarkerTTL > 0)
Expand Down Expand Up @@ -8633,7 +8633,7 @@ func (fs *fileStore) populateGlobalPerSubjectInfo(mb *msgBlock) {
}

// Now populate psim.
mb.fss.Iter(func(bsubj []byte, ss *SimpleState) bool {
mb.fss.IterFast(func(bsubj []byte, ss *SimpleState) bool {
if len(bsubj) > 0 {
if info, ok := fs.psim.Find(bsubj); ok {
info.total += ss.Msgs
Expand Down
6 changes: 3 additions & 3 deletions server/memstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (ms *memStore) UpdateConfig(cfg *StreamConfig) error {
// If the value is smaller, or was unset before, we need to enforce that.
if ms.maxp > 0 && (maxp == 0 || ms.maxp < maxp) {
lm := uint64(ms.maxp)
ms.fss.Iter(func(subj []byte, ss *SimpleState) bool {
ms.fss.IterFast(func(subj []byte, ss *SimpleState) bool {
if ss.Msgs > lm {
ms.enforcePerSubjectLimit(bytesToString(subj), ss)
}
Expand Down Expand Up @@ -1161,7 +1161,7 @@ func (ms *memStore) purge(fseq uint64) (uint64, error) {
ms.msgs = make(map[uint64]*StoreMsg)
// Subject delete markers if needed.
if ms.cfg.SubjectDeleteMarkerTTL > 0 {
ms.fss.Iter(func(bsubj []byte, ss *SimpleState) bool {
ms.fss.IterOrdered(func(bsubj []byte, ss *SimpleState) bool {
ms.markers = append(ms.markers, string(bsubj))
return true
})
Expand Down Expand Up @@ -1231,7 +1231,7 @@ func (ms *memStore) Compact(seq uint64) (uint64, error) {
ms.state.LastSeq = seq - 1
// Subject delete markers if needed.
if ms.cfg.SubjectDeleteMarkerTTL > 0 {
ms.fss.Iter(func(bsubj []byte, ss *SimpleState) bool {
ms.fss.IterOrdered(func(bsubj []byte, ss *SimpleState) bool {
ms.markers = append(ms.markers, string(bsubj))
return true
})
Expand Down
32 changes: 27 additions & 5 deletions server/stree/stree.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,13 +124,22 @@ func (t *SubjectTree[T]) Match(filter []byte, cb func(subject []byte, val *T)) {
t.match(t.root, parts, _pre[:0], cb)
}

// Iter will walk all entries in the SubjectTree lexographically. The callback can return false to terminate the walk.
func (t *SubjectTree[T]) Iter(cb func(subject []byte, val *T) bool) {
// IterOrdered will walk all entries in the SubjectTree lexographically. The callback can return false to terminate the walk.
func (t *SubjectTree[T]) IterOrdered(cb func(subject []byte, val *T) bool) {
if t == nil || t.root == nil {
return
}
var _pre [256]byte
t.iter(t.root, _pre[:0], cb)
t.iter(t.root, _pre[:0], true, cb)
}

// IterFast will walk all entries in the SubjectTree with no guarantees of ordering. The callback can return false to terminate the walk.
func (t *SubjectTree[T]) IterFast(cb func(subject []byte, val *T) bool) {
if t == nil || t.root == nil {
return
}
var _pre [256]byte
t.iter(t.root, _pre[:0], false, cb)
}

// Internal methods
Expand Down Expand Up @@ -369,7 +378,7 @@ func (t *SubjectTree[T]) match(n node, parts [][]byte, pre []byte, cb func(subje
}

// Interal iter function to walk nodes in lexigraphical order.
func (t *SubjectTree[T]) iter(n node, pre []byte, cb func(subject []byte, val *T) bool) bool {
func (t *SubjectTree[T]) iter(n node, pre []byte, ordered bool, cb func(subject []byte, val *T) bool) bool {
if n.isLeaf() {
ln := n.(*leaf[T])
return cb(append(pre, ln.suffix...), &ln.value)
Expand All @@ -378,6 +387,19 @@ func (t *SubjectTree[T]) iter(n node, pre []byte, cb func(subject []byte, val *T
bn := n.base()
// Note that this append may reallocate, but it doesn't modify "pre" at the "iter" callsite.
pre = append(pre, bn.prefix...)
// Not everything requires lexicographical sorting, so support a fast path for iterating in
// whatever order the stree has things stored instead.
if !ordered {
for _, cn := range n.children() {
if cn == nil {
continue
}
if !t.iter(cn, pre, false, cb) {
return false
}
}
return true
}
// Collect nodes since unsorted.
var _nodes [256]node
nodes := _nodes[:0]
Expand All @@ -390,7 +412,7 @@ func (t *SubjectTree[T]) iter(n node, pre []byte, cb func(subject []byte, val *T
slices.SortStableFunc(nodes, func(a, b node) int { return bytes.Compare(a.path(), b.path()) })
// Now walk the nodes in order and call into next iter.
for i := range nodes {
if !t.iter(nodes[i], pre, cb) {
if !t.iter(nodes[i], pre, true, cb) {
return false
}
}
Expand Down
53 changes: 47 additions & 6 deletions server/stree/stree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ func TestSubjectTreeMatchRandomDoublePWC(t *testing.T) {
seen++
})
// Now check via walk to make sure we are right.
st.Iter(func(subject []byte, v *int) bool {
st.IterOrdered(func(subject []byte, v *int) bool {
tokens := strings.Split(string(subject), ".")
require_Equal(t, len(tokens), 3)
if tokens[1] == "2" {
Expand All @@ -479,7 +479,7 @@ func TestSubjectTreeMatchRandomDoublePWC(t *testing.T) {
st.Match(b("*.*.222"), func(_ []byte, _ *int) {
seen++
})
st.Iter(func(subject []byte, v *int) bool {
st.IterOrdered(func(subject []byte, v *int) bool {
tokens := strings.Split(string(subject), ".")
require_Equal(t, len(tokens), 3)
if tokens[2] == "222" {
Expand All @@ -490,7 +490,7 @@ func TestSubjectTreeMatchRandomDoublePWC(t *testing.T) {
require_Equal(t, seen, verified)
}

func TestSubjectTreeIter(t *testing.T) {
func TestSubjectTreeIterOrdered(t *testing.T) {
st := NewSubjectTree[int]()
st.Insert(b("foo.bar.A"), 1)
st.Insert(b("foo.bar.B"), 2)
Expand Down Expand Up @@ -531,12 +531,53 @@ func TestSubjectTreeIter(t *testing.T) {
return true
}
// Kick in the iter.
st.Iter(walk)
st.IterOrdered(walk)
require_Equal(t, received, len(checkOrder))

// Make sure we can terminate properly.
received = 0
st.Iter(func(subject []byte, v *int) bool {
st.IterOrdered(func(subject []byte, v *int) bool {
received++
return received != 4
})
require_Equal(t, received, 4)
}

func TestSubjectTreeIterFast(t *testing.T) {
st := NewSubjectTree[int]()
st.Insert(b("foo.bar.A"), 1)
st.Insert(b("foo.bar.B"), 2)
st.Insert(b("foo.bar.C"), 3)
st.Insert(b("foo.baz.A"), 11)
st.Insert(b("foo.baz.B"), 22)
st.Insert(b("foo.baz.C"), 33)
st.Insert(b("foo.bar"), 42)

checkValMap := map[string]int{
"foo.bar.A": 1,
"foo.bar.B": 2,
"foo.bar.C": 3,
"foo.baz.A": 11,
"foo.baz.B": 22,
"foo.baz.C": 33,
"foo.bar": 42,
}
var received int
walk := func(subject []byte, v *int) bool {
received++
require_True(t, v != nil)
if expected := checkValMap[string(subject)]; expected != *v {
t.Fatalf("Expected %q to have value of %d, but got %d", subject, expected, *v)
}
return true
}
// Kick in the iter.
st.IterFast(walk)
require_Equal(t, received, len(checkValMap))

// Make sure we can terminate properly.
received = 0
st.IterFast(func(subject []byte, v *int) bool {
received++
return received != 4
})
Expand Down Expand Up @@ -710,7 +751,7 @@ func TestSubjectTreeIterPerf(t *testing.T) {

start := time.Now()
count := 0
st.Iter(func(_ []byte, _ *int) bool {
st.IterOrdered(func(_ []byte, _ *int) bool {
count++
return true
})
Expand Down
8 changes: 0 additions & 8 deletions server/stree/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,3 @@ func pivot[N position](subject []byte, pos N) byte {
}
return subject[pos]
}

// TODO(dlc) - Can be removed with Go 1.21 once server is on Go 1.22.
func min(a, b int) int {
if a < b {
return a
}
return b
}