From e659de1b0c5e570b8458cd33f62303380ffb4400 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Wed, 11 Dec 2024 16:30:12 +0100 Subject: [PATCH] [FIXED] MaxMsgsPerSubject limit not applied when updating from no value Signed-off-by: Maurice van Veen --- server/filestore.go | 5 ++++- server/memstore.go | 7 +++++-- server/store_test.go | 44 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 53 insertions(+), 3 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index 20f6657917..67c0932485 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -581,6 +581,9 @@ func (fs *fileStore) UpdateConfig(cfg *StreamConfig) error { if cfg.Storage != FileStorage { return fmt.Errorf("fileStore requires file storage type in config") } + if cfg.MaxMsgsPer < -1 { + cfg.MaxMsgsPer = -1 + } fs.mu.Lock() new_cfg := FileStreamInfo{Created: fs.cfg.Created, StreamConfig: *cfg} @@ -611,7 +614,7 @@ func (fs *fileStore) UpdateConfig(cfg *StreamConfig) error { fs.ageChk = nil } - if fs.cfg.MaxMsgsPer > 0 && fs.cfg.MaxMsgsPer < old_cfg.MaxMsgsPer { + if fs.cfg.MaxMsgsPer > 0 && (old_cfg.MaxMsgsPer == 0 || fs.cfg.MaxMsgsPer < old_cfg.MaxMsgsPer) { fs.enforceMsgPerSubjectLimit(true) } fs.mu.Unlock() diff --git a/server/memstore.go b/server/memstore.go index b183d117da..b83f62535f 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -85,10 +85,13 @@ func (ms *memStore) UpdateConfig(cfg *StreamConfig) error { ms.ageChk = nil } // Make sure to update MaxMsgsPer + if cfg.MaxMsgsPer < -1 { + cfg.MaxMsgsPer = -1 + } maxp := ms.maxp ms.maxp = cfg.MaxMsgsPer - // If the value is smaller we need to enforce that. - if ms.maxp != 0 && ms.maxp < maxp { + // If the value is smaller, or was unset before, we need to enforce that. + if ms.maxp > 0 && (maxp == 0 || ms.maxp < maxp) { lm := uint64(ms.maxp) ms.fss.Iter(func(subj []byte, ss *SimpleState) bool { if ss.Msgs > lm { diff --git a/server/store_test.go b/server/store_test.go index d2c3481a1a..19d4c0251f 100644 --- a/server/store_test.go +++ b/server/store_test.go @@ -235,3 +235,47 @@ func TestStoreSubjectStateConsistency(t *testing.T) { }, ) } + +func TestStoreMaxMsgsPerUpdateBug(t *testing.T) { + config := func() StreamConfig { + return StreamConfig{Name: "TEST", Subjects: []string{"foo"}, MaxMsgsPer: 0} + } + testAllStoreAllPermutations( + t, false, config(), + func(t *testing.T, fs StreamStore) { + for i := 0; i < 5; i++ { + _, _, err := fs.StoreMsg("foo", nil, nil) + require_NoError(t, err) + } + + ss := fs.State() + require_Equal(t, ss.Msgs, 5) + require_Equal(t, ss.FirstSeq, 1) + require_Equal(t, ss.LastSeq, 5) + + // Update max messages per-subject from 0 (infinite) to 1. + // Since the per-subject limit was not specified before, messages should be removed upon config update. + cfg := config() + if _, ok := fs.(*fileStore); ok { + cfg.Storage = FileStorage + } else { + cfg.Storage = MemoryStorage + } + cfg.MaxMsgsPer = 1 + err := fs.UpdateConfig(&cfg) + require_NoError(t, err) + + // Only one message should remain. + ss = fs.State() + require_Equal(t, ss.Msgs, 1) + require_Equal(t, ss.FirstSeq, 5) + require_Equal(t, ss.LastSeq, 5) + + // Update max messages per-subject from 0 (infinite) to an invalid value (< -1). + cfg.MaxMsgsPer = -2 + err = fs.UpdateConfig(&cfg) + require_NoError(t, err) + require_Equal(t, cfg.MaxMsgsPer, -1) + }, + ) +}