Skip to content

Commit

Permalink
[FIXED] Subject state consistency (#6226)
Browse files Browse the repository at this point in the history
Subject state would not always remain consistent given a specific
pattern of message removals.

There were three bugs:
- `recalculateFirstForSubj` in memstore would do `startSeq+1`, but
filestore would always just start at `mb.first.seq`. These are now
consistent.
- `recalculateFirstForSubj` was not called when `ss.Msgs == 1`, which
could mean we had a stale `ss.FirstSeq` if it needed to be recalculated.
- If after recalculation it turns out `ss.FirstSeq` equals the message
we're trying to remove, we need to `recalculateFirstForSubj` again,
since `ss.Last` is also lazy and could be incorrect.

Apart from that, filestore and memstore are now both equivalent when it
comes to first updating per-subject state and then removing the message,
as well as `removeSeqPerSubject` and how it updates the subject state.


Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
  • Loading branch information
derekcollison authored Dec 6, 2024
2 parents f264fb3 + 283b607 commit 60e2982
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 19 deletions.
26 changes: 17 additions & 9 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2614,10 +2614,6 @@ func (fs *fileStore) numFilteredPendingWithLast(filter string, last bool, ss *Si
// Always reset.
ss.First, ss.Last, ss.Msgs = 0, 0, 0

if filter == _EMPTY_ {
filter = fwcs
}

// We do need to figure out the first and last sequences.
wc := subjectHasWildcard(filter)
start, stop := uint32(math.MaxUint32), uint32(0)
Expand Down Expand Up @@ -7502,6 +7498,9 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) {
// Update fss
smb.removeSeqPerSubject(sm.subj, mseq)
fs.removePerSubject(sm.subj)
// Need to mark the sequence as deleted. Otherwise, recalculating ss.First
// for per-subject info would be able to find it still.
smb.dmap.Insert(mseq)
}
}

Expand Down Expand Up @@ -7943,11 +7942,16 @@ func (mb *msgBlock) removeSeqPerSubject(subj string, seq uint64) {

// Only one left.
if ss.Msgs == 1 {
if seq == ss.Last {
ss.Last = ss.First
} else {
ss.First = ss.Last
// Update first if we need to, we must check if this removal is about what's going to be ss.First
if ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
}
// If we're removing the first message, we must recalculate again.
// ss.Last is lazy as well, so need to calculate new ss.First and set ss.Last to it.
if ss.First == seq {
mb.recalculateFirstForSubj(subj, ss.First, ss)
}
ss.Last = ss.First
ss.firstNeedsUpdate = false
return
}
Expand Down Expand Up @@ -7977,8 +7981,12 @@ func (mb *msgBlock) recalculateFirstForSubj(subj string, startSeq uint64, ss *Si
startSlot = 0
}

fseq := startSeq + 1
if mbFseq := atomic.LoadUint64(&mb.first.seq); fseq < mbFseq {
fseq = mbFseq
}
var le = binary.LittleEndian
for slot, fseq := startSlot, atomic.LoadUint64(&mb.first.seq); slot < len(mb.cache.idx); slot++ {
for slot := startSlot; slot < len(mb.cache.idx); slot++ {
bi := mb.cache.idx[slot] &^ hbit
if bi == dbit {
// delete marker so skip.
Expand Down
30 changes: 20 additions & 10 deletions server/memstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -1055,8 +1055,9 @@ func (ms *memStore) Compact(seq uint64) (uint64, error) {
if sm := ms.msgs[seq]; sm != nil {
bytes += memStoreMsgSize(sm.subj, sm.hdr, sm.msg)
purged++
delete(ms.msgs, seq)
ms.removeSeqPerSubject(sm.subj, seq)
// Must delete message after updating per-subject info, to be consistent with file store.
delete(ms.msgs, seq)
}
}
if purged > ms.state.Msgs {
Expand Down Expand Up @@ -1144,8 +1145,9 @@ func (ms *memStore) Truncate(seq uint64) error {
if sm := ms.msgs[i]; sm != nil {
purged++
bytes += memStoreMsgSize(sm.subj, sm.hdr, sm.msg)
delete(ms.msgs, i)
ms.removeSeqPerSubject(sm.subj, i)
// Must delete message after updating per-subject info, to be consistent with file store.
delete(ms.msgs, i)
}
}
// Reset last.
Expand Down Expand Up @@ -1406,17 +1408,24 @@ func (ms *memStore) removeSeqPerSubject(subj string, seq uint64) {
}
ss.Msgs--

// If we know we only have 1 msg left don't need to search for next first.
// Only one left.
if ss.Msgs == 1 {
if seq == ss.Last {
ss.Last = ss.First
} else {
ss.First = ss.Last
// Update first if we need to, we must check if this removal is about what's going to be ss.First
if ss.firstNeedsUpdate {
ms.recalculateFirstForSubj(subj, ss.First, ss)
}
// If we're removing the first message, we must recalculate again.
// ss.Last is lazy as well, so need to calculate new ss.First and set ss.Last to it.
if ss.First == seq {
ms.recalculateFirstForSubj(subj, ss.First, ss)
}
ss.Last = ss.First
ss.firstNeedsUpdate = false
} else {
ss.firstNeedsUpdate = seq == ss.First || ss.firstNeedsUpdate
return
}

// We can lazily calculate the first sequence when needed.
ss.firstNeedsUpdate = seq == ss.First || ss.firstNeedsUpdate
}

// Will recalculate the first sequence for this subject in this block.
Expand Down Expand Up @@ -1446,7 +1455,6 @@ func (ms *memStore) removeMsg(seq uint64, secure bool) bool {

ss = memStoreMsgSize(sm.subj, sm.hdr, sm.msg)

delete(ms.msgs, seq)
if ms.state.Msgs > 0 {
ms.state.Msgs--
if ss > ms.state.Bytes {
Expand All @@ -1471,6 +1479,8 @@ func (ms *memStore) removeMsg(seq uint64, secure bool) bool {

// Remove any per subject tracking.
ms.removeSeqPerSubject(sm.subj, seq)
// Must delete message after updating per-subject info, to be consistent with file store.
delete(ms.msgs, seq)

if ms.scb != nil {
// We do not want to hold any locks here.
Expand Down
94 changes: 94 additions & 0 deletions server/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,3 +141,97 @@ func TestStoreDeleteRange(t *testing.T) {
require_Equal(t, last, 2)
require_Equal(t, num, 1)
}

func TestStoreSubjectStateConsistency(t *testing.T) {
testAllStoreAllPermutations(
t, false,
StreamConfig{Name: "TEST", Subjects: []string{"foo"}},
func(t *testing.T, fs StreamStore) {
getSubjectState := func() SimpleState {
t.Helper()
ss := fs.SubjectsState("foo")
return ss["foo"]
}

// Publish an initial batch of messages.
for i := 0; i < 4; i++ {
_, _, err := fs.StoreMsg("foo", nil, nil)
require_NoError(t, err)
}

// Expect 4 msgs, with first=1, last=4.
ss := getSubjectState()
require_Equal(t, ss.Msgs, 4)
require_Equal(t, ss.First, 1)
require_Equal(t, ss.Last, 4)

// Remove first message, ss.First is lazy so will only mark ss.firstNeedsUpdate.
removed, err := fs.RemoveMsg(1)
require_NoError(t, err)
require_True(t, removed)

// Will update first, so corrects to seq 2.
ss = getSubjectState()
require_Equal(t, ss.Msgs, 3)
require_Equal(t, ss.First, 2)
require_Equal(t, ss.Last, 4)

// Remove last message.
removed, err = fs.RemoveMsg(4)
require_NoError(t, err)
require_True(t, removed)

// ss.Last is lazy, just like ss.First, but it's not recalculated. Only total msg count decreases.
ss = getSubjectState()
require_Equal(t, ss.Msgs, 2)
require_Equal(t, ss.First, 2)
require_Equal(t, ss.Last, 4)

// Remove first message again.
removed, err = fs.RemoveMsg(2)
require_NoError(t, err)
require_True(t, removed)

// Since we only have one message left, must update ss.First and set ss.Last to equal.
ss = getSubjectState()
require_Equal(t, ss.Msgs, 1)
require_Equal(t, ss.First, 3)
require_Equal(t, ss.Last, 3)

// Publish some more messages so we can test another scenario.
for i := 0; i < 3; i++ {
_, _, err := fs.StoreMsg("foo", nil, nil)
require_NoError(t, err)
}

// Just check the state is complete again.
ss = getSubjectState()
require_Equal(t, ss.Msgs, 4)
require_Equal(t, ss.First, 3)
require_Equal(t, ss.Last, 7)

// Remove last sequence, ss.Last is lazy so doesn't get updated.
removed, err = fs.RemoveMsg(7)
require_NoError(t, err)
require_True(t, removed)

// Remove first sequence, ss.First is lazy so doesn't get updated.
removed, err = fs.RemoveMsg(3)
require_NoError(t, err)
require_True(t, removed)

// Remove (now) first sequence, but because ss.First is lazy we first need to recalculate
// to know seq 5 became ss.First. And since we're removing seq 5 we need to recalculate ss.First
// yet again, since ss.Last is lazy and is not correct.
removed, err = fs.RemoveMsg(5)
require_NoError(t, err)
require_True(t, removed)

// ss.First should equal ss.Last, last should have been updated now.
ss = getSubjectState()
require_Equal(t, ss.Msgs, 1)
require_Equal(t, ss.First, 6)
require_Equal(t, ss.Last, 6)
},
)
}

0 comments on commit 60e2982

Please sign in to comment.