Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] Subject state consistency #6226

Merged
merged 1 commit into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 17 additions & 9 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2614,10 +2614,6 @@ func (fs *fileStore) numFilteredPendingWithLast(filter string, last bool, ss *Si
// Always reset.
ss.First, ss.Last, ss.Msgs = 0, 0, 0

if filter == _EMPTY_ {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is redundant, if isAll := filter == _EMPTY_ || filter == fwcs we early return above.

filter = fwcs
}

// We do need to figure out the first and last sequences.
wc := subjectHasWildcard(filter)
start, stop := uint32(math.MaxUint32), uint32(0)
Expand Down Expand Up @@ -7502,6 +7498,9 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) {
// Update fss
smb.removeSeqPerSubject(sm.subj, mseq)
fs.removePerSubject(sm.subj)
// Need to mark the sequence as deleted. Otherwise, recalculating ss.First
// for per-subject info would be able to find it still.
smb.dmap.Insert(mseq)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another important addition; TestFileStoreExpireMsgsOnStart was failing as it relied on the bug existing. Now that the bug is gone, we need to actually mark the sequence as deleted. Otherwise, mb.recalculateFirstForSubj could still find and select that sequence.

}
}

Expand Down Expand Up @@ -7943,11 +7942,16 @@ func (mb *msgBlock) removeSeqPerSubject(subj string, seq uint64) {

// Only one left.
if ss.Msgs == 1 {
if seq == ss.Last {
ss.Last = ss.First
} else {
ss.First = ss.Last
// Update first if we need to, we must check if this removal is about what's going to be ss.First
if ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
}
// If we're removing the first message, we must recalculate again.
// ss.Last is lazy as well, so need to calculate new ss.First and set ss.Last to it.
if ss.First == seq {
mb.recalculateFirstForSubj(subj, ss.First, ss)
}
ss.Last = ss.First
ss.firstNeedsUpdate = false
return
}
Expand Down Expand Up @@ -7977,8 +7981,12 @@ func (mb *msgBlock) recalculateFirstForSubj(subj string, startSeq uint64, ss *Si
startSlot = 0
}

fseq := startSeq + 1
if mbFseq := atomic.LoadUint64(&mb.first.seq); fseq < mbFseq {
fseq = mbFseq
}
var le = binary.LittleEndian
for slot, fseq := startSlot, atomic.LoadUint64(&mb.first.seq); slot < len(mb.cache.idx); slot++ {
for slot := startSlot; slot < len(mb.cache.idx); slot++ {
bi := mb.cache.idx[slot] &^ hbit
if bi == dbit {
// delete marker so skip.
Expand Down
30 changes: 20 additions & 10 deletions server/memstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -1055,8 +1055,9 @@ func (ms *memStore) Compact(seq uint64) (uint64, error) {
if sm := ms.msgs[seq]; sm != nil {
bytes += memStoreMsgSize(sm.subj, sm.hdr, sm.msg)
purged++
delete(ms.msgs, seq)
ms.removeSeqPerSubject(sm.subj, seq)
// Must delete message after updating per-subject info, to be consistent with file store.
delete(ms.msgs, seq)
}
}
if purged > ms.state.Msgs {
Expand Down Expand Up @@ -1144,8 +1145,9 @@ func (ms *memStore) Truncate(seq uint64) error {
if sm := ms.msgs[i]; sm != nil {
purged++
bytes += memStoreMsgSize(sm.subj, sm.hdr, sm.msg)
delete(ms.msgs, i)
ms.removeSeqPerSubject(sm.subj, i)
// Must delete message after updating per-subject info, to be consistent with file store.
delete(ms.msgs, i)
}
}
// Reset last.
Expand Down Expand Up @@ -1406,17 +1408,24 @@ func (ms *memStore) removeSeqPerSubject(subj string, seq uint64) {
}
ss.Msgs--

// If we know we only have 1 msg left don't need to search for next first.
// Only one left.
if ss.Msgs == 1 {
if seq == ss.Last {
ss.Last = ss.First
} else {
ss.First = ss.Last
// Update first if we need to, we must check if this removal is about what's going to be ss.First
if ss.firstNeedsUpdate {
ms.recalculateFirstForSubj(subj, ss.First, ss)
}
// If we're removing the first message, we must recalculate again.
// ss.Last is lazy as well, so need to calculate new ss.First and set ss.Last to it.
if ss.First == seq {
ms.recalculateFirstForSubj(subj, ss.First, ss)
}
ss.Last = ss.First
ss.firstNeedsUpdate = false
} else {
ss.firstNeedsUpdate = seq == ss.First || ss.firstNeedsUpdate
return
}

// We can lazily calculate the first sequence when needed.
ss.firstNeedsUpdate = seq == ss.First || ss.firstNeedsUpdate
}

// Will recalculate the first sequence for this subject in this block.
Expand Down Expand Up @@ -1446,7 +1455,6 @@ func (ms *memStore) removeMsg(seq uint64, secure bool) bool {

ss = memStoreMsgSize(sm.subj, sm.hdr, sm.msg)

delete(ms.msgs, seq)
if ms.state.Msgs > 0 {
ms.state.Msgs--
if ss > ms.state.Bytes {
Expand All @@ -1471,6 +1479,8 @@ func (ms *memStore) removeMsg(seq uint64, secure bool) bool {

// Remove any per subject tracking.
ms.removeSeqPerSubject(sm.subj, seq)
// Must delete message after updating per-subject info, to be consistent with file store.
delete(ms.msgs, seq)

if ms.scb != nil {
// We do not want to hold any locks here.
Expand Down
94 changes: 94 additions & 0 deletions server/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,3 +141,97 @@ func TestStoreDeleteRange(t *testing.T) {
require_Equal(t, last, 2)
require_Equal(t, num, 1)
}

func TestStoreSubjectStateConsistency(t *testing.T) {
testAllStoreAllPermutations(
t, false,
StreamConfig{Name: "TEST", Subjects: []string{"foo"}},
func(t *testing.T, fs StreamStore) {
getSubjectState := func() SimpleState {
t.Helper()
ss := fs.SubjectsState("foo")
return ss["foo"]
}

// Publish an initial batch of messages.
for i := 0; i < 4; i++ {
_, _, err := fs.StoreMsg("foo", nil, nil)
require_NoError(t, err)
}

// Expect 4 msgs, with first=1, last=4.
ss := getSubjectState()
require_Equal(t, ss.Msgs, 4)
require_Equal(t, ss.First, 1)
require_Equal(t, ss.Last, 4)

// Remove first message, ss.First is lazy so will only mark ss.firstNeedsUpdate.
removed, err := fs.RemoveMsg(1)
require_NoError(t, err)
require_True(t, removed)

// Will update first, so corrects to seq 2.
ss = getSubjectState()
require_Equal(t, ss.Msgs, 3)
require_Equal(t, ss.First, 2)
require_Equal(t, ss.Last, 4)

// Remove last message.
removed, err = fs.RemoveMsg(4)
require_NoError(t, err)
require_True(t, removed)

// ss.Last is lazy, just like ss.First, but it's not recalculated. Only total msg count decreases.
ss = getSubjectState()
require_Equal(t, ss.Msgs, 2)
require_Equal(t, ss.First, 2)
require_Equal(t, ss.Last, 4)

// Remove first message again.
removed, err = fs.RemoveMsg(2)
require_NoError(t, err)
require_True(t, removed)

// Since we only have one message left, must update ss.First and set ss.Last to equal.
ss = getSubjectState()
require_Equal(t, ss.Msgs, 1)
require_Equal(t, ss.First, 3)
require_Equal(t, ss.Last, 3)

// Publish some more messages so we can test another scenario.
for i := 0; i < 3; i++ {
_, _, err := fs.StoreMsg("foo", nil, nil)
require_NoError(t, err)
}

// Just check the state is complete again.
ss = getSubjectState()
require_Equal(t, ss.Msgs, 4)
require_Equal(t, ss.First, 3)
require_Equal(t, ss.Last, 7)

// Remove last sequence, ss.Last is lazy so doesn't get updated.
removed, err = fs.RemoveMsg(7)
require_NoError(t, err)
require_True(t, removed)

// Remove first sequence, ss.First is lazy so doesn't get updated.
removed, err = fs.RemoveMsg(3)
require_NoError(t, err)
require_True(t, removed)

// Remove (now) first sequence, but because ss.First is lazy we first need to recalculate
// to know seq 5 became ss.First. And since we're removing seq 5 we need to recalculate ss.First
// yet again, since ss.Last is lazy and is not correct.
removed, err = fs.RemoveMsg(5)
require_NoError(t, err)
require_True(t, removed)

// ss.First should equal ss.Last, last should have been updated now.
ss = getSubjectState()
require_Equal(t, ss.Msgs, 1)
require_Equal(t, ss.First, 6)
require_Equal(t, ss.Last, 6)
},
)
}
Loading