diff --git a/server/filestore.go b/server/filestore.go index d48ee088a0c..622341970cd 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -1458,15 +1458,14 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, []uint64, error) { if seq == 0 || seq&ebit != 0 || seq < fseq { seq = seq &^ ebit if seq >= fseq { - // Only add to dmap if past recorded first seq and non-zero. - if seq != 0 { - addToDmap(seq) - } atomic.StoreUint64(&mb.last.seq, seq) mb.last.ts = ts if mb.msgs == 0 { atomic.StoreUint64(&mb.first.seq, seq+1) mb.first.ts = 0 + } else if seq != 0 { + // Only add to dmap if past recorded first seq and non-zero. + addToDmap(seq) } } index += rl @@ -7425,11 +7424,7 @@ func (fs *fileStore) State() StreamState { } // Add in deleted. mb.dmap.Range(func(seq uint64) bool { - if seq < fseq { - mb.dmap.Delete(seq) - } else { - state.Deleted = append(state.Deleted, seq) - } + state.Deleted = append(state.Deleted, seq) return true }) mb.mu.Unlock() diff --git a/server/filestore_test.go b/server/filestore_test.go index 00f58423956..ae671a03b28 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -8887,3 +8887,58 @@ func changeDirectoryPermission(directory string, mode fs.FileMode) error { }) return err } + +func TestFileStoreLeftoverSkipMsgInDmap(t *testing.T) { + storeDir := t.TempDir() + fs, err := newFileStore( + FileStoreConfig{StoreDir: storeDir}, + StreamConfig{Name: "zzz", Subjects: []string{"test.*"}, Storage: FileStorage, MaxMsgsPer: 1}, + ) + require_NoError(t, err) + defer fs.Stop() + + getLmbState := func(fs *fileStore) (uint64, uint64, int) { + fs.mu.RLock() + lmb := fs.lmb + fs.mu.RUnlock() + lmb.mu.RLock() + fseq := atomic.LoadUint64(&lmb.first.seq) + lseq := atomic.LoadUint64(&lmb.last.seq) + dmaps := lmb.dmap.Size() + lmb.mu.RUnlock() + return fseq, lseq, dmaps + } + + // Only skip a message. + fs.SkipMsg() + + // Confirm state. + state := fs.State() + require_Equal(t, state.FirstSeq, 2) + require_Equal(t, state.LastSeq, 1) + require_Equal(t, state.NumDeleted, 0) + fseq, lseq, dmaps := getLmbState(fs) + require_Equal(t, fseq, 2) + require_Equal(t, lseq, 1) + require_Len(t, dmaps, 0) + + // Stop without writing index.db so we recover based on just the blk file. + require_NoError(t, fs.stop(false, false)) + + fs, err = newFileStore( + FileStoreConfig{StoreDir: storeDir}, + StreamConfig{Name: "zzz", Subjects: []string{"test.*"}, Storage: FileStorage, MaxMsgsPer: 1}, + ) + require_NoError(t, err) + defer fs.Stop() + + // Confirm the skipped message is not included in the deletes. + state = fs.State() + require_Equal(t, state.FirstSeq, 2) + require_Equal(t, state.LastSeq, 1) + require_Equal(t, state.NumDeleted, 0) + fseq, lseq, dmaps = getLmbState(fs) + require_Equal(t, fseq, 2) + require_Equal(t, lseq, 1) + require_Len(t, dmaps, 0) +}