Skip to content

Commit

Permalink
Revert "[FIXED] Subject state consistency (#6226)"
Browse files Browse the repository at this point in the history
This reverts commit 60e2982, reversing
changes made to f264fb3.
  • Loading branch information
MauriceVanVeen authored and wallyqs committed Dec 13, 2024
1 parent b606d5b commit 614e55d
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 107 deletions.
21 changes: 16 additions & 5 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2616,6 +2616,10 @@ func (fs *fileStore) numFilteredPendingWithLast(filter string, last bool, ss *Si
// Always reset.
ss.First, ss.Last, ss.Msgs = 0, 0, 0

if filter == _EMPTY_ {
filter = fwcs
}

// We do need to figure out the first and last sequences.
wc := subjectHasWildcard(filter)
start, stop := uint32(math.MaxUint32), uint32(0)
Expand Down Expand Up @@ -7832,6 +7836,17 @@ func (mb *msgBlock) removeSeqPerSubject(subj string, seq uint64) {

ss.Msgs--

// Only one left.
if ss.Msgs == 1 {
if seq == ss.Last {
ss.Last = ss.First
} else {
ss.First = ss.Last
}
ss.firstNeedsUpdate = false
return
}

// We can lazily calculate the first sequence when needed.
ss.firstNeedsUpdate = seq == ss.First || ss.firstNeedsUpdate
}
Expand All @@ -7857,12 +7872,8 @@ func (mb *msgBlock) recalculateFirstForSubj(subj string, startSeq uint64, ss *Si
startSlot = 0
}

fseq := startSeq + 1
if mbFseq := atomic.LoadUint64(&mb.first.seq); fseq < mbFseq {
fseq = mbFseq
}
var le = binary.LittleEndian
for slot := startSlot; slot < len(mb.cache.idx); slot++ {
for slot, fseq := startSlot, atomic.LoadUint64(&mb.first.seq); slot < len(mb.cache.idx); slot++ {
bi := mb.cache.idx[slot] &^ hbit
if bi == dbit {
// delete marker so skip.
Expand Down
22 changes: 14 additions & 8 deletions server/memstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -1009,9 +1009,8 @@ func (ms *memStore) Compact(seq uint64) (uint64, error) {
if sm := ms.msgs[seq]; sm != nil {
bytes += memStoreMsgSize(sm.subj, sm.hdr, sm.msg)
purged++
ms.removeSeqPerSubject(sm.subj, seq)
// Must delete message after updating per-subject info, to be consistent with file store.
delete(ms.msgs, seq)
ms.removeSeqPerSubject(sm.subj, seq)
}
}
if purged > ms.state.Msgs {
Expand Down Expand Up @@ -1099,9 +1098,8 @@ func (ms *memStore) Truncate(seq uint64) error {
if sm := ms.msgs[i]; sm != nil {
purged++
bytes += memStoreMsgSize(sm.subj, sm.hdr, sm.msg)
ms.removeSeqPerSubject(sm.subj, i)
// Must delete message after updating per-subject info, to be consistent with file store.
delete(ms.msgs, i)
ms.removeSeqPerSubject(sm.subj, i)
}
}
// Reset last.
Expand Down Expand Up @@ -1362,8 +1360,17 @@ func (ms *memStore) removeSeqPerSubject(subj string, seq uint64) {
}
ss.Msgs--

// We can lazily calculate the first sequence when needed.
ss.firstNeedsUpdate = seq == ss.First || ss.firstNeedsUpdate
// If we know we only have 1 msg left don't need to search for next first.
if ss.Msgs == 1 {
if seq == ss.Last {
ss.Last = ss.First
} else {
ss.First = ss.Last
}
ss.firstNeedsUpdate = false
} else {
ss.firstNeedsUpdate = seq == ss.First || ss.firstNeedsUpdate
}
}

// Will recalculate the first sequence for this subject in this block.
Expand Down Expand Up @@ -1396,6 +1403,7 @@ func (ms *memStore) removeMsg(seq uint64, secure bool) bool {

ss = memStoreMsgSize(sm.subj, sm.hdr, sm.msg)

delete(ms.msgs, seq)
if ms.state.Msgs > 0 {
ms.state.Msgs--
if ss > ms.state.Bytes {
Expand All @@ -1420,8 +1428,6 @@ func (ms *memStore) removeMsg(seq uint64, secure bool) bool {

// Remove any per subject tracking.
ms.removeSeqPerSubject(sm.subj, seq)
// Must delete message after updating per-subject info, to be consistent with file store.
delete(ms.msgs, seq)

if ms.scb != nil {
// We do not want to hold any locks here.
Expand Down
94 changes: 0 additions & 94 deletions server/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,100 +142,6 @@ func TestStoreDeleteRange(t *testing.T) {
require_Equal(t, num, 1)
}

func TestStoreSubjectStateConsistency(t *testing.T) {
testAllStoreAllPermutations(
t, false,
StreamConfig{Name: "TEST", Subjects: []string{"foo"}},
func(t *testing.T, fs StreamStore) {
getSubjectState := func() SimpleState {
t.Helper()
ss := fs.SubjectsState("foo")
return ss["foo"]
}

// Publish an initial batch of messages.
for i := 0; i < 4; i++ {
_, _, err := fs.StoreMsg("foo", nil, nil)
require_NoError(t, err)
}

// Expect 4 msgs, with first=1, last=4.
ss := getSubjectState()
require_Equal(t, ss.Msgs, 4)
require_Equal(t, ss.First, 1)
require_Equal(t, ss.Last, 4)

// Remove first message, ss.First is lazy so will only mark ss.firstNeedsUpdate.
removed, err := fs.RemoveMsg(1)
require_NoError(t, err)
require_True(t, removed)

// Will update first, so corrects to seq 2.
ss = getSubjectState()
require_Equal(t, ss.Msgs, 3)
require_Equal(t, ss.First, 2)
require_Equal(t, ss.Last, 4)

// Remove last message.
removed, err = fs.RemoveMsg(4)
require_NoError(t, err)
require_True(t, removed)

// ss.Last is lazy, just like ss.First, but it's not recalculated. Only total msg count decreases.
ss = getSubjectState()
require_Equal(t, ss.Msgs, 2)
require_Equal(t, ss.First, 2)
require_Equal(t, ss.Last, 4)

// Remove first message again.
removed, err = fs.RemoveMsg(2)
require_NoError(t, err)
require_True(t, removed)

// Since we only have one message left, must update ss.First and set ss.Last to equal.
ss = getSubjectState()
require_Equal(t, ss.Msgs, 1)
require_Equal(t, ss.First, 3)
require_Equal(t, ss.Last, 3)

// Publish some more messages so we can test another scenario.
for i := 0; i < 3; i++ {
_, _, err := fs.StoreMsg("foo", nil, nil)
require_NoError(t, err)
}

// Just check the state is complete again.
ss = getSubjectState()
require_Equal(t, ss.Msgs, 4)
require_Equal(t, ss.First, 3)
require_Equal(t, ss.Last, 7)

// Remove last sequence, ss.Last is lazy so doesn't get updated.
removed, err = fs.RemoveMsg(7)
require_NoError(t, err)
require_True(t, removed)

// Remove first sequence, ss.First is lazy so doesn't get updated.
removed, err = fs.RemoveMsg(3)
require_NoError(t, err)
require_True(t, removed)

// Remove (now) first sequence, but because ss.First is lazy we first need to recalculate
// to know seq 5 became ss.First. And since we're removing seq 5 we need to recalculate ss.First
// yet again, since ss.Last is lazy and is not correct.
removed, err = fs.RemoveMsg(5)
require_NoError(t, err)
require_True(t, removed)

// ss.First should equal ss.Last, last should have been updated now.
ss = getSubjectState()
require_Equal(t, ss.Msgs, 1)
require_Equal(t, ss.First, 6)
require_Equal(t, ss.Last, 6)
},
)
}

func TestStoreMaxMsgsPerUpdateBug(t *testing.T) {
config := func() StreamConfig {
return StreamConfig{Name: "TEST", Subjects: []string{"foo"}, MaxMsgsPer: 0}
Expand Down

0 comments on commit 614e55d

Please sign in to comment.