Skip to content

Commit

Permalink
Parsable duration for Nats-TTL header
Browse files Browse the repository at this point in the history
Signed-off-by: Neil Twigg <neil@nats.io>
  • Loading branch information
neilalexander committed Dec 20, 2024
1 parent 4a50d57 commit 55537d2
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 10 deletions.
10 changes: 10 additions & 0 deletions server/errors.json
Original file line number Diff line number Diff line change
Expand Up @@ -1628,5 +1628,15 @@
"help": "",
"url": "",
"deprecates": ""
},
{
"constant": "JSInvalidTTLErr",
"code": 400,
"error_code": 10165,
"description": "invalid per-message TTL",
"comment": "",
"help": "",
"url": "",
"deprecates": ""
}
]
6 changes: 3 additions & 3 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -1395,7 +1395,7 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, []uint64, error) {
hasHeaders := rl&hbit != 0
var ttl int64
if len(hdr) > 0 {
ttl = getMessageTTL(hdr)
ttl, _ = getMessageTTL(hdr)
}
// Clear any headers bit that could be set.
rl &^= hbit
Expand Down Expand Up @@ -1885,7 +1885,7 @@ func (fs *fileStore) recoverTTLState() error {
if len(msg.hdr) == 0 {
continue
}
if ttl := getMessageTTL(msg.hdr); ttl > 0 {
if ttl, _ := getMessageTTL(msg.hdr); ttl > 0 {
expires := time.Duration(msg.ts) + (time.Second * time.Duration(ttl))
fs.ttls.Add(seq, int64(expires))
if seq > fs.ttlseq {
Expand Down Expand Up @@ -6166,7 +6166,7 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error {
// TODO(nat): Not terribly optimal...
if hasHeaders {
if fsm, err := mb.msgFromBuf(buf, &sm, nil); err == nil && fsm != nil {
if len(fsm.hdr) > 0 && getMessageTTL(fsm.hdr) > 0 {
if _, err = getMessageTTL(fsm.hdr); err == nil && len(fsm.hdr) > 0 {
ttls++
}
}
Expand Down
14 changes: 14 additions & 0 deletions server/jetstream_errors_generated.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,9 @@ const (
// JSInvalidJSONErr invalid JSON: {err}
JSInvalidJSONErr ErrorIdentifier = 10025

// JSInvalidTTLErr invalid per-message TTL
JSInvalidTTLErr ErrorIdentifier = 10165

// JSMaximumConsumersLimitErr maximum consumers limit reached
JSMaximumConsumersLimitErr ErrorIdentifier = 10026

Expand Down Expand Up @@ -573,6 +576,7 @@ var (
JSConsumerWithFlowControlNeedsHeartbeats: {Code: 400, ErrCode: 10108, Description: "consumer with flow control also needs heartbeats"},
JSInsufficientResourcesErr: {Code: 503, ErrCode: 10023, Description: "insufficient resources"},
JSInvalidJSONErr: {Code: 400, ErrCode: 10025, Description: "invalid JSON: {err}"},
JSInvalidTTLErr: {Code: 400, ErrCode: 10165, Description: "invalid per-message TTL"},
JSMaximumConsumersLimitErr: {Code: 400, ErrCode: 10026, Description: "maximum consumers limit reached"},
JSMaximumStreamsLimitErr: {Code: 400, ErrCode: 10027, Description: "maximum number of streams reached"},
JSMemoryResourcesExceededErr: {Code: 500, ErrCode: 10028, Description: "insufficient memory resources available"},
Expand Down Expand Up @@ -1517,6 +1521,16 @@ func NewJSInvalidJSONError(err error, opts ...ErrorOption) *ApiError {
}
}

// NewJSInvalidTTLError creates a new JSInvalidTTLErr error: "invalid per-message TTL"
func NewJSInvalidTTLError(opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
if ae, ok := eopts.err.(*ApiError); ok {
return ae
}

return ApiErrors[JSInvalidTTLErr]
}

// NewJSMaximumConsumersLimitError creates a new JSMaximumConsumersLimitErr error: "maximum consumers limit reached"
func NewJSMaximumConsumersLimitError(opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
Expand Down
33 changes: 30 additions & 3 deletions server/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24757,7 +24757,7 @@ func TestJetStreamMessageTTL(t *testing.T) {
}

for i := 1; i <= 10; i++ {
msg.Header.Set("Nats-TTL", "1")
msg.Header.Set("Nats-TTL", "1s")
_, err := js.PublishMsg(msg)
require_NoError(t, err)
}
Expand Down Expand Up @@ -24796,7 +24796,7 @@ func TestJetStreamMessageTTLRestart(t *testing.T) {
}

for i := 1; i <= 10; i++ {
msg.Header.Set("Nats-TTL", "1")
msg.Header.Set("Nats-TTL", "1s")
_, err := js.PublishMsg(msg)
require_NoError(t, err)
}
Expand Down Expand Up @@ -24850,7 +24850,7 @@ func TestJetStreamMessageTTLRecovered(t *testing.T) {
}

for i := 1; i <= 10; i++ {
msg.Header.Set("Nats-TTL", "1")
msg.Header.Set("Nats-TTL", "1s")
_, err := js.PublishMsg(msg)
require_NoError(t, err)
}
Expand Down Expand Up @@ -24887,3 +24887,30 @@ func TestJetStreamMessageTTLRecovered(t *testing.T) {
require_Equal(t, si.State.FirstSeq, 11)
require_Equal(t, si.State.LastSeq, 10)
}

func TestJetStreamMessageTTLInvalid(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

nc, js := jsClientConnect(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"test"},
})
require_NoError(t, err)

msg := &nats.Msg{
Subject: "test",
Header: nats.Header{},
}

msg.Header.Set("Nats-TTL", "500ms")
_, err = js.PublishMsg(msg)
require_Error(t, err)

msg.Header.Set("Nats-TTL", "something")
_, err = js.PublishMsg(msg)
require_Error(t, err)
}
33 changes: 29 additions & 4 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -4203,10 +4203,25 @@ func getExpectedLastSeqPerSubjectForSubject(hdr []byte) string {
return string(getHeader(JSExpectedLastSubjSeqSubj, hdr))
}

// Fast lookup of the message TTL.
func getMessageTTL(hdr []byte) int64 {
// Fast lookup of the message TTL:
// - Positive return value: duration in seconds.
// - Zero return value: no TTL or parse error.
// - Negative return value: don't expire.
func getMessageTTL(hdr []byte) (int64, error) {
ttl := getHeader(JSMessageTTL, hdr)
return parseInt64(ttl)
sttl := bytesToString(ttl)
dur, err := time.ParseDuration(sttl)
if err == nil {
if dur < time.Second {
return 0, NewJSInvalidTTLError()
}
return int64(dur.Seconds()), nil
}
t := parseInt64(ttl)
if t < 0 {
return 0, NewJSInvalidTTLError()
}
return t * int64(time.Second), nil
}

// Signal if we are clustered. Will acquire rlock.
Expand Down Expand Up @@ -5017,7 +5032,17 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
}

// Find the message TTL if any.
ttl := getMessageTTL(hdr)
ttl, err := getMessageTTL(hdr)
if err != nil {
if canRespond {
resp.PubAck = &PubAck{Stream: name}
resp.Error = NewJSInvalidTTLError()
response, _ = json.Marshal(resp)
mset.outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, nil, response, nil, 0))
}
mset.mu.Unlock()
return err
}

// Store actual msg.
if lseq == 0 && ts == 0 {
Expand Down

0 comments on commit 55537d2

Please sign in to comment.