Skip to content

Commit

Permalink
Remove EnforceTTLs from filestore config; when MaxAge is set, use…
Browse files Browse the repository at this point in the history
… that when checking for next expiry

Signed-off-by: Neil Twigg <neil@nats.io>
  • Loading branch information
neilalexander committed Dec 24, 2024
1 parent d517d30 commit 6ea9fe0
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 21 deletions.
14 changes: 9 additions & 5 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,6 @@ type FileStoreConfig struct {
Cipher StoreCipher
// Compression is the algorithm to use when compressing.
Compression StoreCompression
// EnforceTTLs decides whether or not to enforce per-message TTLs.
EnforceTTLs bool

// Internal reference to our server.
srv *Server
Expand Down Expand Up @@ -418,7 +416,7 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
}

// Only create a THW if we're going to allow TTLs.
if fs.fcfg.EnforceTTLs {
if cfg.AllowMsgTTL {
fs.ttls = thw.NewHashWheel()
}

Expand Down Expand Up @@ -486,7 +484,7 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
}

// See if we can bring back our TTL timed hash wheel state from disk.
if fcfg.EnforceTTLs {
if cfg.AllowMsgTTL {
if err = fs.recoverTTLState(); err != nil && !os.IsNotExist(err) {
fs.warn("Recovering TTL state from index errored: %v", err)
}
Expand Down Expand Up @@ -5324,7 +5322,13 @@ func (fs *fileStore) expireMsgs() {
fs.ttls.ExpireTasks(func(seq uint64, ts int64) {
fs.removeMsgViaLimits(seq)
})
nextTTL = fs.ttls.GetNextExpiration(math.MaxInt64)
if maxAge > 0 {
// Only check if we're expiring something in the next MaxAge interval, saves us a bit
// of work if MaxAge will beat us to the next expiry anyway.
nextTTL = fs.ttls.GetNextExpiration(time.Now().Add(time.Duration(maxAge)).UnixNano())
} else {
nextTTL = fs.ttls.GetNextExpiration(math.MaxInt64)
}
}

// Onky cancel if no message left, not on potential lookup error that would result in sm == nil.
Expand Down
28 changes: 14 additions & 14 deletions server/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8299,8 +8299,8 @@ func TestFileStoreNumPendingMulti(t *testing.T) {

func TestFileStoreMessageTTL(t *testing.T) {
fs, err := newFileStore(
FileStoreConfig{StoreDir: t.TempDir(), EnforceTTLs: true},
StreamConfig{Name: "zzz", Subjects: []string{"test"}, Storage: FileStorage})
FileStoreConfig{StoreDir: t.TempDir()},
StreamConfig{Name: "zzz", Subjects: []string{"test"}, Storage: FileStorage, AllowMsgTTL: true})
require_NoError(t, err)
defer fs.Stop()

Expand Down Expand Up @@ -8330,8 +8330,8 @@ func TestFileStoreMessageTTLRestart(t *testing.T) {

t.Run("BeforeRestart", func(t *testing.T) {
fs, err := newFileStore(
FileStoreConfig{StoreDir: dir, EnforceTTLs: true},
StreamConfig{Name: "zzz", Subjects: []string{"test"}, Storage: FileStorage})
FileStoreConfig{StoreDir: dir},
StreamConfig{Name: "zzz", Subjects: []string{"test"}, Storage: FileStorage, AllowMsgTTL: true})
require_NoError(t, err)
defer fs.Stop()

Expand All @@ -8351,8 +8351,8 @@ func TestFileStoreMessageTTLRestart(t *testing.T) {

t.Run("AfterRestart", func(t *testing.T) {
fs, err := newFileStore(
FileStoreConfig{StoreDir: dir, EnforceTTLs: true},
StreamConfig{Name: "zzz", Subjects: []string{"test"}, Storage: FileStorage})
FileStoreConfig{StoreDir: dir},
StreamConfig{Name: "zzz", Subjects: []string{"test"}, Storage: FileStorage, AllowMsgTTL: true})
require_NoError(t, err)
defer fs.Stop()

Expand All @@ -8379,8 +8379,8 @@ func TestFileStoreMessageTTLRecovered(t *testing.T) {

t.Run("BeforeRestart", func(t *testing.T) {
fs, err := newFileStore(
FileStoreConfig{StoreDir: dir, EnforceTTLs: true, srv: s},
StreamConfig{Name: "zzz", Subjects: []string{"test"}, Storage: FileStorage})
FileStoreConfig{StoreDir: dir, srv: s},
StreamConfig{Name: "zzz", Subjects: []string{"test"}, Storage: FileStorage, AllowMsgTTL: true})
require_NoError(t, err)
defer fs.Stop()

Expand Down Expand Up @@ -8409,8 +8409,8 @@ func TestFileStoreMessageTTLRecovered(t *testing.T) {
require_NoError(t, os.RemoveAll(fn))

fs, err := newFileStore(
FileStoreConfig{StoreDir: dir, EnforceTTLs: true, srv: s},
StreamConfig{Name: "zzz", Subjects: []string{"test"}, Storage: FileStorage})
FileStoreConfig{StoreDir: dir, srv: s},
StreamConfig{Name: "zzz", Subjects: []string{"test"}, Storage: FileStorage, AllowMsgTTL: true})
require_NoError(t, err)
defer fs.Stop()

Expand All @@ -8437,8 +8437,8 @@ func TestFileStoreMessageTTLRecoveredSingleMessageWithoutStreamState(t *testing.

t.Run("BeforeRestart", func(t *testing.T) {
fs, err := newFileStore(
FileStoreConfig{StoreDir: dir, EnforceTTLs: true, srv: s},
StreamConfig{Name: "zzz", Subjects: []string{"test"}, Storage: FileStorage})
FileStoreConfig{StoreDir: dir, srv: s},
StreamConfig{Name: "zzz", Subjects: []string{"test"}, Storage: FileStorage, AllowMsgTTL: true})
require_NoError(t, err)
defer fs.Stop()

Expand All @@ -8464,8 +8464,8 @@ func TestFileStoreMessageTTLRecoveredSingleMessageWithoutStreamState(t *testing.
require_NoError(t, os.RemoveAll(fn))

fs, err := newFileStore(
FileStoreConfig{StoreDir: dir, EnforceTTLs: true, srv: s},
StreamConfig{Name: "zzz", Subjects: []string{"test"}, Storage: FileStorage})
FileStoreConfig{StoreDir: dir, srv: s},
StreamConfig{Name: "zzz", Subjects: []string{"test"}, Storage: FileStorage, AllowMsgTTL: true})
require_NoError(t, err)
defer fs.Stop()

Expand Down
2 changes: 0 additions & 2 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -717,7 +717,6 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt
fsCfg.SyncInterval = s.getOpts().SyncInterval
fsCfg.SyncAlways = s.getOpts().SyncAlways
fsCfg.Compression = config.Compression
fsCfg.EnforceTTLs = config.AllowMsgTTL

if err := mset.setupStore(fsCfg); err != nil {
mset.stop(true, false)
Expand Down Expand Up @@ -4209,7 +4208,6 @@ func getExpectedLastSeqPerSubjectForSubject(hdr []byte) string {
// Fast lookup of the message TTL:
// - Positive return value: duration in seconds.
// - Zero return value: no TTL or parse error.
// - Negative return value: don't expire.
func getMessageTTL(hdr []byte) (int64, error) {
ttl := getHeader(JSMessageTTL, hdr)
if len(ttl) == 0 {
Expand Down

0 comments on commit 6ea9fe0

Please sign in to comment.