From 283b6078da353d45fae875d7ee811b50c55e2e55 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Fri, 6 Dec 2024 15:32:36 +0100 Subject: [PATCH] [FIXED] Subject state consistency Signed-off-by: Maurice van Veen --- server/filestore.go | 26 +++++++----- server/memstore.go | 30 +++++++++----- server/store_test.go | 94 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 131 insertions(+), 19 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index 713b317eff4..b1c4c6e4ec4 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -2614,10 +2614,6 @@ func (fs *fileStore) numFilteredPendingWithLast(filter string, last bool, ss *Si // Always reset. ss.First, ss.Last, ss.Msgs = 0, 0, 0 - if filter == _EMPTY_ { - filter = fwcs - } - // We do need to figure out the first and last sequences. wc := subjectHasWildcard(filter) start, stop := uint32(math.MaxUint32), uint32(0) @@ -7502,6 +7498,9 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) { // Update fss smb.removeSeqPerSubject(sm.subj, mseq) fs.removePerSubject(sm.subj) + // Need to mark the sequence as deleted. Otherwise, recalculating ss.First + // for per-subject info would be able to find it still. + smb.dmap.Insert(mseq) } } @@ -7943,11 +7942,16 @@ func (mb *msgBlock) removeSeqPerSubject(subj string, seq uint64) { // Only one left. if ss.Msgs == 1 { - if seq == ss.Last { - ss.Last = ss.First - } else { - ss.First = ss.Last + // Update first if we need to, we must check if this removal is about what's going to be ss.First + if ss.firstNeedsUpdate { + mb.recalculateFirstForSubj(subj, ss.First, ss) } + // If we're removing the first message, we must recalculate again. + // ss.Last is lazy as well, so need to calculate new ss.First and set ss.Last to it. + if ss.First == seq { + mb.recalculateFirstForSubj(subj, ss.First, ss) + } + ss.Last = ss.First ss.firstNeedsUpdate = false return } @@ -7977,8 +7981,12 @@ func (mb *msgBlock) recalculateFirstForSubj(subj string, startSeq uint64, ss *Si startSlot = 0 } + fseq := startSeq + 1 + if mbFseq := atomic.LoadUint64(&mb.first.seq); fseq < mbFseq { + fseq = mbFseq + } var le = binary.LittleEndian - for slot, fseq := startSlot, atomic.LoadUint64(&mb.first.seq); slot < len(mb.cache.idx); slot++ { + for slot := startSlot; slot < len(mb.cache.idx); slot++ { bi := mb.cache.idx[slot] &^ hbit if bi == dbit { // delete marker so skip. diff --git a/server/memstore.go b/server/memstore.go index 040704f2599..b183d117da5 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -1055,8 +1055,9 @@ func (ms *memStore) Compact(seq uint64) (uint64, error) { if sm := ms.msgs[seq]; sm != nil { bytes += memStoreMsgSize(sm.subj, sm.hdr, sm.msg) purged++ - delete(ms.msgs, seq) ms.removeSeqPerSubject(sm.subj, seq) + // Must delete message after updating per-subject info, to be consistent with file store. + delete(ms.msgs, seq) } } if purged > ms.state.Msgs { @@ -1144,8 +1145,9 @@ func (ms *memStore) Truncate(seq uint64) error { if sm := ms.msgs[i]; sm != nil { purged++ bytes += memStoreMsgSize(sm.subj, sm.hdr, sm.msg) - delete(ms.msgs, i) ms.removeSeqPerSubject(sm.subj, i) + // Must delete message after updating per-subject info, to be consistent with file store. + delete(ms.msgs, i) } } // Reset last. @@ -1406,17 +1408,24 @@ func (ms *memStore) removeSeqPerSubject(subj string, seq uint64) { } ss.Msgs-- - // If we know we only have 1 msg left don't need to search for next first. + // Only one left. if ss.Msgs == 1 { - if seq == ss.Last { - ss.Last = ss.First - } else { - ss.First = ss.Last + // Update first if we need to, we must check if this removal is about what's going to be ss.First + if ss.firstNeedsUpdate { + ms.recalculateFirstForSubj(subj, ss.First, ss) } + // If we're removing the first message, we must recalculate again. + // ss.Last is lazy as well, so need to calculate new ss.First and set ss.Last to it. + if ss.First == seq { + ms.recalculateFirstForSubj(subj, ss.First, ss) + } + ss.Last = ss.First ss.firstNeedsUpdate = false - } else { - ss.firstNeedsUpdate = seq == ss.First || ss.firstNeedsUpdate + return } + + // We can lazily calculate the first sequence when needed. + ss.firstNeedsUpdate = seq == ss.First || ss.firstNeedsUpdate } // Will recalculate the first sequence for this subject in this block. @@ -1446,7 +1455,6 @@ func (ms *memStore) removeMsg(seq uint64, secure bool) bool { ss = memStoreMsgSize(sm.subj, sm.hdr, sm.msg) - delete(ms.msgs, seq) if ms.state.Msgs > 0 { ms.state.Msgs-- if ss > ms.state.Bytes { @@ -1471,6 +1479,8 @@ func (ms *memStore) removeMsg(seq uint64, secure bool) bool { // Remove any per subject tracking. ms.removeSeqPerSubject(sm.subj, seq) + // Must delete message after updating per-subject info, to be consistent with file store. + delete(ms.msgs, seq) if ms.scb != nil { // We do not want to hold any locks here. diff --git a/server/store_test.go b/server/store_test.go index f7832974b5b..d2c3481a1a5 100644 --- a/server/store_test.go +++ b/server/store_test.go @@ -141,3 +141,97 @@ func TestStoreDeleteRange(t *testing.T) { require_Equal(t, last, 2) require_Equal(t, num, 1) } + +func TestStoreSubjectStateConsistency(t *testing.T) { + testAllStoreAllPermutations( + t, false, + StreamConfig{Name: "TEST", Subjects: []string{"foo"}}, + func(t *testing.T, fs StreamStore) { + getSubjectState := func() SimpleState { + t.Helper() + ss := fs.SubjectsState("foo") + return ss["foo"] + } + + // Publish an initial batch of messages. + for i := 0; i < 4; i++ { + _, _, err := fs.StoreMsg("foo", nil, nil) + require_NoError(t, err) + } + + // Expect 4 msgs, with first=1, last=4. + ss := getSubjectState() + require_Equal(t, ss.Msgs, 4) + require_Equal(t, ss.First, 1) + require_Equal(t, ss.Last, 4) + + // Remove first message, ss.First is lazy so will only mark ss.firstNeedsUpdate. + removed, err := fs.RemoveMsg(1) + require_NoError(t, err) + require_True(t, removed) + + // Will update first, so corrects to seq 2. + ss = getSubjectState() + require_Equal(t, ss.Msgs, 3) + require_Equal(t, ss.First, 2) + require_Equal(t, ss.Last, 4) + + // Remove last message. + removed, err = fs.RemoveMsg(4) + require_NoError(t, err) + require_True(t, removed) + + // ss.Last is lazy, just like ss.First, but it's not recalculated. Only total msg count decreases. + ss = getSubjectState() + require_Equal(t, ss.Msgs, 2) + require_Equal(t, ss.First, 2) + require_Equal(t, ss.Last, 4) + + // Remove first message again. + removed, err = fs.RemoveMsg(2) + require_NoError(t, err) + require_True(t, removed) + + // Since we only have one message left, must update ss.First and set ss.Last to equal. + ss = getSubjectState() + require_Equal(t, ss.Msgs, 1) + require_Equal(t, ss.First, 3) + require_Equal(t, ss.Last, 3) + + // Publish some more messages so we can test another scenario. + for i := 0; i < 3; i++ { + _, _, err := fs.StoreMsg("foo", nil, nil) + require_NoError(t, err) + } + + // Just check the state is complete again. + ss = getSubjectState() + require_Equal(t, ss.Msgs, 4) + require_Equal(t, ss.First, 3) + require_Equal(t, ss.Last, 7) + + // Remove last sequence, ss.Last is lazy so doesn't get updated. + removed, err = fs.RemoveMsg(7) + require_NoError(t, err) + require_True(t, removed) + + // Remove first sequence, ss.First is lazy so doesn't get updated. + removed, err = fs.RemoveMsg(3) + require_NoError(t, err) + require_True(t, removed) + + // Remove (now) first sequence, but because ss.First is lazy we first need to recalculate + // to know seq 5 became ss.First. And since we're removing seq 5 we need to recalculate ss.First + // yet again, since ss.Last is lazy and is not correct. + removed, err = fs.RemoveMsg(5) + require_NoError(t, err) + require_True(t, removed) + + // ss.First should equal ss.Last, last should have been updated now. + ss = getSubjectState() + require_Equal(t, ss.Msgs, 1) + require_Equal(t, ss.First, 6) + require_Equal(t, ss.Last, 6) + }, + ) +}