Skip to content

Commit aa8a594

Browse files
committed
stree: Split Iter into IterFast and IterOrdered
Signed-off-by: Neil Twigg <neil@nats.io>
1 parent 54f3076 commit aa8a594

File tree

5 files changed

+84
-29
lines changed

5 files changed

+84
-29
lines changed

server/filestore.go

+7-7
Original file line numberDiff line numberDiff line change
@@ -2117,7 +2117,7 @@ func (fs *fileStore) expireMsgsOnRecover() error {
21172117
}
21182118
// Make sure we do subject cleanup as well.
21192119
mb.ensurePerSubjectInfoLoaded()
2120-
mb.fss.Iter(func(bsubj []byte, ss *SimpleState) bool {
2120+
mb.fss.IterOrdered(func(bsubj []byte, ss *SimpleState) bool {
21212121
subj := bytesToString(bsubj)
21222122
for i := uint64(0); i < ss.Msgs; i++ {
21232123
fs.removePerSubject(subj, false)
@@ -3446,7 +3446,7 @@ func (fs *fileStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject bo
34463446

34473447
// See if filter was provided but its the only subject.
34483448
if !isAll && fs.psim.Size() == 1 {
3449-
fs.psim.Iter(func(subject []byte, _ *psi) bool {
3449+
fs.psim.IterFast(func(subject []byte, _ *psi) bool {
34503450
isAll = sl.HasInterest(bytesToString(subject))
34513451
return true
34523452
})
@@ -4381,7 +4381,7 @@ func (fs *fileStore) enforceMsgPerSubjectLimit(fireCallback bool) {
43814381

43824382
// collect all that are not correct.
43834383
needAttention := make(map[string]*psi)
4384-
fs.psim.Iter(func(subj []byte, psi *psi) bool {
4384+
fs.psim.IterFast(func(subj []byte, psi *psi) bool {
43854385
numMsgs += psi.total
43864386
if psi.total > maxMsgsPer {
43874387
needAttention[string(subj)] = psi
@@ -4406,7 +4406,7 @@ func (fs *fileStore) enforceMsgPerSubjectLimit(fireCallback bool) {
44064406
fs.rebuildStateLocked(nil)
44074407
// Need to redo blocks that need attention.
44084408
needAttention = make(map[string]*psi)
4409-
fs.psim.Iter(func(subj []byte, psi *psi) bool {
4409+
fs.psim.IterFast(func(subj []byte, psi *psi) bool {
44104410
if psi.total > maxMsgsPer {
44114411
needAttention[string(subj)] = psi
44124412
}
@@ -7798,7 +7798,7 @@ func (fs *fileStore) purge(fseq uint64) (uint64, error) {
77987798
fs.bim = make(map[uint32]*msgBlock)
77997799
// Subject delete markers if needed.
78007800
if fs.cfg.SubjectDeleteMarkerTTL > 0 {
7801-
fs.psim.Iter(func(subject []byte, _ *psi) bool {
7801+
fs.psim.IterOrdered(func(subject []byte, _ *psi) bool {
78027802
fs.markers = append(fs.markers, string(subject))
78037803
return true
78047804
})
@@ -7911,7 +7911,7 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) {
79117911
bytes += mb.bytes
79127912
// Make sure we do subject cleanup as well.
79137913
mb.ensurePerSubjectInfoLoaded()
7914-
mb.fss.Iter(func(bsubj []byte, ss *SimpleState) bool {
7914+
mb.fss.IterOrdered(func(bsubj []byte, ss *SimpleState) bool {
79157915
subj := bytesToString(bsubj)
79167916
for i := uint64(0); i < ss.Msgs; i++ {
79177917
fs.removePerSubject(subj, fs.cfg.SubjectDeleteMarkerTTL > 0)
@@ -8633,7 +8633,7 @@ func (fs *fileStore) populateGlobalPerSubjectInfo(mb *msgBlock) {
86338633
}
86348634

86358635
// Now populate psim.
8636-
mb.fss.Iter(func(bsubj []byte, ss *SimpleState) bool {
8636+
mb.fss.IterFast(func(bsubj []byte, ss *SimpleState) bool {
86378637
if len(bsubj) > 0 {
86388638
if info, ok := fs.psim.Find(bsubj); ok {
86398639
info.total += ss.Msgs

server/memstore.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ func (ms *memStore) UpdateConfig(cfg *StreamConfig) error {
102102
// If the value is smaller, or was unset before, we need to enforce that.
103103
if ms.maxp > 0 && (maxp == 0 || ms.maxp < maxp) {
104104
lm := uint64(ms.maxp)
105-
ms.fss.Iter(func(subj []byte, ss *SimpleState) bool {
105+
ms.fss.IterFast(func(subj []byte, ss *SimpleState) bool {
106106
if ss.Msgs > lm {
107107
ms.enforcePerSubjectLimit(bytesToString(subj), ss)
108108
}
@@ -1161,7 +1161,7 @@ func (ms *memStore) purge(fseq uint64) (uint64, error) {
11611161
ms.msgs = make(map[uint64]*StoreMsg)
11621162
// Subject delete markers if needed.
11631163
if ms.cfg.SubjectDeleteMarkerTTL > 0 {
1164-
ms.fss.Iter(func(bsubj []byte, ss *SimpleState) bool {
1164+
ms.fss.IterOrdered(func(bsubj []byte, ss *SimpleState) bool {
11651165
ms.markers = append(ms.markers, string(bsubj))
11661166
return true
11671167
})
@@ -1231,7 +1231,7 @@ func (ms *memStore) Compact(seq uint64) (uint64, error) {
12311231
ms.state.LastSeq = seq - 1
12321232
// Subject delete markers if needed.
12331233
if ms.cfg.SubjectDeleteMarkerTTL > 0 {
1234-
ms.fss.Iter(func(bsubj []byte, ss *SimpleState) bool {
1234+
ms.fss.IterOrdered(func(bsubj []byte, ss *SimpleState) bool {
12351235
ms.markers = append(ms.markers, string(bsubj))
12361236
return true
12371237
})

server/stree/stree.go

+27-5
Original file line numberDiff line numberDiff line change
@@ -124,13 +124,22 @@ func (t *SubjectTree[T]) Match(filter []byte, cb func(subject []byte, val *T)) {
124124
t.match(t.root, parts, _pre[:0], cb)
125125
}
126126

127-
// Iter will walk all entries in the SubjectTree lexographically. The callback can return false to terminate the walk.
128-
func (t *SubjectTree[T]) Iter(cb func(subject []byte, val *T) bool) {
127+
// IterOrdered will walk all entries in the SubjectTree lexographically. The callback can return false to terminate the walk.
128+
func (t *SubjectTree[T]) IterOrdered(cb func(subject []byte, val *T) bool) {
129129
if t == nil || t.root == nil {
130130
return
131131
}
132132
var _pre [256]byte
133-
t.iter(t.root, _pre[:0], cb)
133+
t.iter(t.root, _pre[:0], true, cb)
134+
}
135+
136+
// IterFast will walk all entries in the SubjectTree with no guarantees of ordering. The callback can return false to terminate the walk.
137+
func (t *SubjectTree[T]) IterFast(cb func(subject []byte, val *T) bool) {
138+
if t == nil || t.root == nil {
139+
return
140+
}
141+
var _pre [256]byte
142+
t.iter(t.root, _pre[:0], false, cb)
134143
}
135144

136145
// Internal methods
@@ -369,7 +378,7 @@ func (t *SubjectTree[T]) match(n node, parts [][]byte, pre []byte, cb func(subje
369378
}
370379

371380
// Interal iter function to walk nodes in lexigraphical order.
372-
func (t *SubjectTree[T]) iter(n node, pre []byte, cb func(subject []byte, val *T) bool) bool {
381+
func (t *SubjectTree[T]) iter(n node, pre []byte, ordered bool, cb func(subject []byte, val *T) bool) bool {
373382
if n.isLeaf() {
374383
ln := n.(*leaf[T])
375384
return cb(append(pre, ln.suffix...), &ln.value)
@@ -378,6 +387,19 @@ func (t *SubjectTree[T]) iter(n node, pre []byte, cb func(subject []byte, val *T
378387
bn := n.base()
379388
// Note that this append may reallocate, but it doesn't modify "pre" at the "iter" callsite.
380389
pre = append(pre, bn.prefix...)
390+
// Not everything requires lexicographical sorting, so support a fast path for iterating in
391+
// whatever order the stree has things stored instead.
392+
if !ordered {
393+
for _, cn := range n.children() {
394+
if cn == nil {
395+
continue
396+
}
397+
if !t.iter(cn, pre, false, cb) {
398+
return false
399+
}
400+
}
401+
return true
402+
}
381403
// Collect nodes since unsorted.
382404
var _nodes [256]node
383405
nodes := _nodes[:0]
@@ -390,7 +412,7 @@ func (t *SubjectTree[T]) iter(n node, pre []byte, cb func(subject []byte, val *T
390412
slices.SortStableFunc(nodes, func(a, b node) int { return bytes.Compare(a.path(), b.path()) })
391413
// Now walk the nodes in order and call into next iter.
392414
for i := range nodes {
393-
if !t.iter(nodes[i], pre, cb) {
415+
if !t.iter(nodes[i], pre, true, cb) {
394416
return false
395417
}
396418
}

server/stree/stree_test.go

+47-6
Original file line numberDiff line numberDiff line change
@@ -465,7 +465,7 @@ func TestSubjectTreeMatchRandomDoublePWC(t *testing.T) {
465465
seen++
466466
})
467467
// Now check via walk to make sure we are right.
468-
st.Iter(func(subject []byte, v *int) bool {
468+
st.IterOrdered(func(subject []byte, v *int) bool {
469469
tokens := strings.Split(string(subject), ".")
470470
require_Equal(t, len(tokens), 3)
471471
if tokens[1] == "2" {
@@ -479,7 +479,7 @@ func TestSubjectTreeMatchRandomDoublePWC(t *testing.T) {
479479
st.Match(b("*.*.222"), func(_ []byte, _ *int) {
480480
seen++
481481
})
482-
st.Iter(func(subject []byte, v *int) bool {
482+
st.IterOrdered(func(subject []byte, v *int) bool {
483483
tokens := strings.Split(string(subject), ".")
484484
require_Equal(t, len(tokens), 3)
485485
if tokens[2] == "222" {
@@ -490,7 +490,7 @@ func TestSubjectTreeMatchRandomDoublePWC(t *testing.T) {
490490
require_Equal(t, seen, verified)
491491
}
492492

493-
func TestSubjectTreeIter(t *testing.T) {
493+
func TestSubjectTreeIterOrdered(t *testing.T) {
494494
st := NewSubjectTree[int]()
495495
st.Insert(b("foo.bar.A"), 1)
496496
st.Insert(b("foo.bar.B"), 2)
@@ -531,12 +531,53 @@ func TestSubjectTreeIter(t *testing.T) {
531531
return true
532532
}
533533
// Kick in the iter.
534-
st.Iter(walk)
534+
st.IterOrdered(walk)
535535
require_Equal(t, received, len(checkOrder))
536536

537537
// Make sure we can terminate properly.
538538
received = 0
539-
st.Iter(func(subject []byte, v *int) bool {
539+
st.IterOrdered(func(subject []byte, v *int) bool {
540+
received++
541+
return received != 4
542+
})
543+
require_Equal(t, received, 4)
544+
}
545+
546+
func TestSubjectTreeIterFast(t *testing.T) {
547+
st := NewSubjectTree[int]()
548+
st.Insert(b("foo.bar.A"), 1)
549+
st.Insert(b("foo.bar.B"), 2)
550+
st.Insert(b("foo.bar.C"), 3)
551+
st.Insert(b("foo.baz.A"), 11)
552+
st.Insert(b("foo.baz.B"), 22)
553+
st.Insert(b("foo.baz.C"), 33)
554+
st.Insert(b("foo.bar"), 42)
555+
556+
checkValMap := map[string]int{
557+
"foo.bar.A": 1,
558+
"foo.bar.B": 2,
559+
"foo.bar.C": 3,
560+
"foo.baz.A": 11,
561+
"foo.baz.B": 22,
562+
"foo.baz.C": 33,
563+
"foo.bar": 42,
564+
}
565+
var received int
566+
walk := func(subject []byte, v *int) bool {
567+
received++
568+
require_True(t, v != nil)
569+
if expected := checkValMap[string(subject)]; expected != *v {
570+
t.Fatalf("Expected %q to have value of %d, but got %d", subject, expected, *v)
571+
}
572+
return true
573+
}
574+
// Kick in the iter.
575+
st.IterFast(walk)
576+
require_Equal(t, received, len(checkValMap))
577+
578+
// Make sure we can terminate properly.
579+
received = 0
580+
st.IterFast(func(subject []byte, v *int) bool {
540581
received++
541582
return received != 4
542583
})
@@ -710,7 +751,7 @@ func TestSubjectTreeIterPerf(t *testing.T) {
710751

711752
start := time.Now()
712753
count := 0
713-
st.Iter(func(_ []byte, _ *int) bool {
754+
st.IterOrdered(func(_ []byte, _ *int) bool {
714755
count++
715756
return true
716757
})

server/stree/util.go

-8
Original file line numberDiff line numberDiff line change
@@ -55,11 +55,3 @@ func pivot[N position](subject []byte, pos N) byte {
5555
}
5656
return subject[pos]
5757
}
58-
59-
// TODO(dlc) - Can be removed with Go 1.21 once server is on Go 1.22.
60-
func min(a, b int) int {
61-
if a < b {
62-
return a
63-
}
64-
return b
65-
}

0 commit comments

Comments
 (0)