-
-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Initial support for per-message TTLs #6272
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Found and fixed two bugs; if there was no index.db
the TTLs would not be recovered (with or without thw.db
), and if a single message needed to be recovered (when no thw.db
) it would not due to an off-by-one.
Still work to be done in future PRs, but think this one is good. 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general looks good.
Let's move hdr to a parseable duration (see inline comments). Also we should reject messages when the stream has not opted in to per msg TTLs.
server/filestore.go
Outdated
fs.ttls.ExpireTasks(func(seq uint64, ts int64) { | ||
fs.removeMsgViaLimits(seq) | ||
}) | ||
nextTTL = fs.ttls.GetNextExpiration(math.MaxInt64) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should only pass max int if we are not doing max age, if max age then we should capture the next fireIn from the loop above and pass that here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did we get this changed as discussed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not yet, I'm still fiddling with this and want to write a proper test for it. Happy to do in either this PR or in a follow-up one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's update this one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was done.
server/stream.go
Outdated
@@ -4996,17 +5011,20 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, | |||
} | |||
} | |||
|
|||
// Find the message TTL if any. | |||
ttl := getMessageTTL(hdr) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reject if ttl present and stream is not opted in.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we can do this, otherwise we can run into a problem when we source/mirror messages with a TTL into another stream.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need special processing for streams and mirrors anyway that ignores (for ingest criteria) certain headers. This would be one added to the list.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will add that to the list of follow-up changes to make.
4b5b21e
to
55537d2
Compare
server/filestore.go
Outdated
fs.ttls.ExpireTasks(func(seq uint64, ts int64) { | ||
fs.removeMsgViaLimits(seq) | ||
}) | ||
nextTTL = fs.ttls.GetNextExpiration(math.MaxInt64) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did we get this changed as discussed?
server/filestore_test.go
Outdated
|
||
func TestFileStoreMessageTTL(t *testing.T) { | ||
fs, err := newFileStore( | ||
FileStoreConfig{StoreDir: t.TempDir(), EnforceTTLs: true}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we not inherit enforeTTLS from the stream config?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In stream.go
when creating the store for streams, we do indeed set it from the AllowMsgTTLs
value from the stream config. In the filestore-specific tests, like here, we set it to true directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we pass in a stream config in the tests we could do it there, or you feel we should keep the redundancy and have it in two places?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not attached to the duplication at all, will remove it and use the passed-in StreamConfig
one.
require_Equal(t, ss.LastSeq, 1) | ||
require_Equal(t, ss.Msgs, 0) | ||
}) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to add in test with max age but with msgs with never expire and make sure they do not get expired.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At the moment this PR doesn't have "never expire" behaviour for MaxAge
, that's one of the things I have queued up for a follow-up PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok maybe remove the header then from this PR. Confusing as just a placeholder.
@@ -100,6 +100,10 @@ type StreamConfig struct { | |||
// TODO(nat): Can/should we name these better? | |||
ConsumerLimits StreamConsumerLimits `json:"consumer_limits"` | |||
|
|||
// AllowMsgTTL allows header initiated per-message TTLs. If disabled, | |||
// then the `NATS-TTL` header will be ignored. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update comment if we reject these vs ignore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At the moment the comment is correct, as of right now the header is passed through and ignored. We probably want to figure out the sourcing/mirroring behaviour at the same time as changing this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I saw the thread between you and R.I. and Byron. From an app perspective if I set a header and get no feedback it is not honored that is not good IMO.
Yes we need to fixup the sourcing and mirroring to ignore certain headers. It does so already on some but we need a better solution here.
621a876
to
4d4187b
Compare
Have rebased the PR, I think it's best to deal with rejecting the TTL'd messages in a separate PR at the same time as stripping them from sourcing/mirroring. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Signed-off-by: Neil Twigg <neil@nats.io>
… that when checking for next expiry Signed-off-by: Neil Twigg <neil@nats.io>
4d4187b
to
6ea9fe0
Compare
Fixed merge conflicts. |
This is an incomplete but somewhat-working implementations of per-message TTLs.
Notes:
AllowMsgTTL
to stream config;Nats-TTL
message header as a parsable duration, or if not, second precision;index.db
magic version so that we can add a new field for tracking how many TTL'd messages are in each message block (means thatindex.db
needs to be rebuilt if downgrading);thw.db
to store the timed hash wheel state, and tries to rebuild it from a linear scan if it is missing or out-of-date.Future work:
Signed-off-by: Neil Twigg neil@nats.io