Skip to content

Commit

Permalink
Added in ability to get number of subjects from StreamInfo, and optio…
Browse files Browse the repository at this point in the history
…nally details per subject on how many messages each subject has.

This can also be filtered, meaning you can filter out the subjects when asking for details.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Feb 1, 2022
1 parent 0d15872 commit c3d2c2d
Show file tree
Hide file tree
Showing 7 changed files with 225 additions and 21 deletions.
12 changes: 11 additions & 1 deletion server/errors.json
Original file line number Diff line number Diff line change
Expand Up @@ -1148,5 +1148,15 @@
"help": "",
"url": "",
"deprecates": ""
},
{
"constant": "JSStreamInfoMaxSubjectsErr",
"code": 500,
"error_code": 10117,
"description": "subject details would exceed maximum allowed",
"comment": "",
"help": "",
"url": "",
"deprecates": ""
}
]
]
21 changes: 14 additions & 7 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -3274,19 +3274,18 @@ func (mb *msgBlock) cacheLookup(seq uint64) (*fileStoredMsg, error) {
mb.llseq = seq
}

// We use the high bit to denote we have already checked the checksum.
var hh hash.Hash64
if !hashChecked {
hh = mb.hh // This will force the hash check in msgFromBuf.
mb.cache.idx[seq-mb.cache.fseq] = (bi | hbit)
}

li := int(bi) - mb.cache.off
if li >= len(mb.cache.buf) {
return nil, errPartialCache
}
buf := mb.cache.buf[li:]

// We use the high bit to denote we have already checked the checksum.
var hh hash.Hash64
if !hashChecked {
hh = mb.hh // This will force the hash check in msgFromBuf.
}

// Parse from the raw buffer.
subj, hdr, msg, mseq, ts, err := msgFromBuf(buf, hh)
if err != nil {
Expand All @@ -3297,6 +3296,11 @@ func (mb *msgBlock) cacheLookup(seq uint64) (*fileStoredMsg, error) {
return nil, fmt.Errorf("sequence numbers for cache load did not match, %d vs %d", seq, mseq)
}

// Clear the check bit here after we know all is good.
if !hashChecked {
mb.cache.idx[seq-mb.cache.fseq] = (bi | hbit)
}

return &fileStoredMsg{subj, hdr, msg, seq, ts, mb, int64(bi)}, nil
}

Expand Down Expand Up @@ -3451,6 +3455,7 @@ func (fs *fileStore) FastState(state *StreamState) {
}
}
state.Consumers = len(fs.cfs)

fs.mu.RUnlock()
}

Expand All @@ -3460,6 +3465,7 @@ func (fs *fileStore) State() StreamState {
state := fs.state
state.Consumers = len(fs.cfs)
state.Deleted = nil // make sure.

for _, mb := range fs.blks {
mb.mu.Lock()
fseq := mb.first.seq
Expand Down Expand Up @@ -4243,6 +4249,7 @@ func (mb *msgBlock) generatePerSubjectInfo() error {
if mb.fss == nil {
mb.fss = make(map[string]*SimpleState)
}

fseq, lseq := mb.first.seq, mb.last.seq
for seq := fseq; seq <= lseq; seq++ {
if sm, _ := mb.cacheLookup(seq); sm != nil && len(sm.subj) > 0 {
Expand Down
26 changes: 24 additions & 2 deletions server/jetstream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,8 +321,12 @@ type JSApiStreamDeleteResponse struct {

const JSApiStreamDeleteResponseType = "io.nats.jetstream.api.v1.stream_delete_response"

// Maximum number of subject details we will send in the stream info.
const JSMaxSubjectDetails = 100_000

type JSApiStreamInfoRequest struct {
DeletedDetails bool `json:"deleted_details,omitempty"`
DeletedDetails bool `json:"deleted_details,omitempty"`
SubjectsFilter string `json:"subjects_filter,omitempty"`
}

type JSApiStreamInfoResponse struct {
Expand Down Expand Up @@ -1697,14 +1701,15 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, s
}

var details bool
var subjects string
if !isEmptyRequest(msg) {
var req JSApiStreamInfoRequest
if err := json.Unmarshal(msg, &req); err != nil {
resp.Error = NewJSInvalidJSONError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
details = req.DeletedDetails
details, subjects = req.DeletedDetails, req.SubjectsFilter
}

mset, err := acc.lookupStream(streamName)
Expand All @@ -1730,6 +1735,23 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, s
resp.StreamInfo.Sources = mset.sourcesInfo()
}

// Check if they have asked for subject details.
if subjects != _EMPTY_ {
if mss := mset.store.SubjectsState(subjects); len(mss) > 0 {
if len(mss) > JSMaxSubjectDetails {
resp.StreamInfo = nil
resp.Error = NewJSStreamInfoMaxSubjectsError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
sd := make(map[string]uint64, len(mss))
for subj, ss := range mss {
sd[subj] = ss.Msgs
}
resp.StreamInfo.State.Subjects = sd
}

}
// Check for out of band catchups.
if mset.hasCatchupPeers() {
mset.checkClusterInfo(resp.StreamInfo)
Expand Down
14 changes: 14 additions & 0 deletions server/jetstream_errors_generated.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,9 @@ const (
// JSStreamHeaderExceedsMaximumErr header size exceeds maximum allowed of 64k
JSStreamHeaderExceedsMaximumErr ErrorIdentifier = 10097

// JSStreamInfoMaxSubjectsErr subject details would exceed maximum allowed
JSStreamInfoMaxSubjectsErr ErrorIdentifier = 10117

// JSStreamInvalidConfigF Stream configuration validation error string ({err})
JSStreamInvalidConfigF ErrorIdentifier = 10052

Expand Down Expand Up @@ -438,6 +441,7 @@ var (
JSStreamExternalDelPrefixOverlapsErrF: {Code: 400, ErrCode: 10022, Description: "stream external delivery prefix {prefix} overlaps with stream subject {subject}"},
JSStreamGeneralErrorF: {Code: 500, ErrCode: 10051, Description: "{err}"},
JSStreamHeaderExceedsMaximumErr: {Code: 400, ErrCode: 10097, Description: "header size exceeds maximum allowed of 64k"},
JSStreamInfoMaxSubjectsErr: {Code: 500, ErrCode: 10117, Description: "subject details would exceed maximum allowed"},
JSStreamInvalidConfigF: {Code: 500, ErrCode: 10052, Description: "{err}"},
JSStreamInvalidErr: {Code: 500, ErrCode: 10096, Description: "stream not valid"},
JSStreamInvalidExternalDeliverySubjErrF: {Code: 400, ErrCode: 10024, Description: "stream external delivery prefix {prefix} must not contain wildcards"},
Expand Down Expand Up @@ -1445,6 +1449,16 @@ func NewJSStreamHeaderExceedsMaximumError(opts ...ErrorOption) *ApiError {
return ApiErrors[JSStreamHeaderExceedsMaximumErr]
}

// NewJSStreamInfoMaxSubjectsError creates a new JSStreamInfoMaxSubjectsErr error: "subject details would exceed maximum allowed"
func NewJSStreamInfoMaxSubjectsError(opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
if ae, ok := eopts.err.(*ApiError); ok {
return ae
}

return ApiErrors[JSStreamInfoMaxSubjectsErr]
}

// NewJSStreamInvalidConfigError creates a new JSStreamInvalidConfigF error: "{err}"
func NewJSStreamInvalidConfigError(err error, opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
Expand Down
64 changes: 64 additions & 0 deletions server/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4387,6 +4387,7 @@ func TestJetStreamSnapshotsAPI(t *testing.T) {
if rresp.Error != nil {
t.Fatalf("Got an unexpected error response: %+v", rresp.Error)
}

// Can be any size message.
var chunk [512]byte
for r := bytes.NewReader(snapshot); ; {
Expand Down Expand Up @@ -4478,6 +4479,7 @@ func TestJetStreamSnapshotsAPI(t *testing.T) {
t.Fatalf("Expected restore subscription to be closed")
}

req, _ = json.Marshal(rreq)
rmsg, err = nc2.Request(strings.ReplaceAll(JSApiStreamRestoreT, JSApiPrefix, "$JS.domain.API"), req, time.Second)
if err != nil {
t.Fatalf("Unexpected error on snapshot request: %v", err)
Expand All @@ -4499,6 +4501,7 @@ func TestJetStreamSnapshotsAPI(t *testing.T) {
t.Fatalf("Restore not honoring reply subjects for ack flow")
}
}

// For EOF this will send back stream info or an error.
si, err := nc2.Request(rresp.DeliverSubject, nil, time.Second)
if err != nil {
Expand Down Expand Up @@ -14642,6 +14645,67 @@ func TestJetStreamMaxMsgsPerSubjectWithDiscardNew(t *testing.T) {
}
}

func TestJetStreamStreamInfoSubjectsDetails(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()

if config := s.JetStreamConfig(); config != nil {
defer removeDir(t, config.StoreDir)
}

nc, js := jsClientConnect(t, s)
defer nc.Close()

getInfo := func(filter string) *StreamInfo {
t.Helper()
// Need to grab StreamInfo by hand for now.
req, err := json.Marshal(&JSApiStreamInfoRequest{SubjectsFilter: filter})
require_NoError(t, err)
resp, err := nc.Request(fmt.Sprintf(JSApiStreamInfoT, "TEST"), req, time.Second)
require_NoError(t, err)
var si StreamInfo
err = json.Unmarshal(resp.Data, &si)
require_NoError(t, err)
return &si
}

testSubjects := func(t *testing.T, st nats.StorageType) {
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"*"},
Storage: st,
})
require_NoError(t, err)
defer js.DeleteStream("TEST")

counts, msg := []int{22, 33, 44}, []byte("ok")
// Now place msgs, foo-22, bar-33 and baz-44.
for i, subj := range []string{"foo", "bar", "baz"} {
for n := 0; n < counts[i]; n++ {
_, err = js.Publish(subj, msg)
require_NoError(t, err)
}
}

// Test all subjects first.
expected := map[string]uint64{"foo": 22, "bar": 33, "baz": 44}
if si := getInfo(nats.AllKeys); !reflect.DeepEqual(si.State.Subjects, expected) {
t.Fatalf("Expected subjects of %+v, but got %+v", expected, si.State.Subjects)
}
if si := getInfo("*"); !reflect.DeepEqual(si.State.Subjects, expected) {
t.Fatalf("Expected subjects of %+v, but got %+v", expected, si.State.Subjects)
}
// Filtered to 1.
expected = map[string]uint64{"foo": 22}
if si := getInfo("foo"); !reflect.DeepEqual(si.State.Subjects, expected) {
t.Fatalf("Expected subjects of %+v, but got %+v", expected, si.State.Subjects)
}
}

t.Run("MemoryStore", func(t *testing.T) { testSubjects(t, nats.MemoryStorage) })
t.Run("FileStore", func(t *testing.T) { testSubjects(t, nats.FileStorage) })
}

///////////////////////////////////////////////////////////////////////////
// Simple JetStream Benchmarks
///////////////////////////////////////////////////////////////////////////
Expand Down
88 changes: 87 additions & 1 deletion server/norace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3459,7 +3459,7 @@ func TestNoRaceJetStreamClusterCorruptWAL(t *testing.T) {
}

// This will cause us to stepdown and truncate our WAL.
fetchMsgs(t, sub, 100, 5*time.Second)
fetchMsgs(t, sub, 100, 20*time.Second)

checkFor(t, 20*time.Second, 500*time.Millisecond, func() error {
// Make sure we changed leaders.
Expand Down Expand Up @@ -4108,3 +4108,89 @@ func TestNoRaceJetStreamClusterHealthz(t *testing.T) {
t.Fatalf("Expected to have some errors until we became current, got none")
}
}

// Test that we can receive larger messages with stream subject details.
// Also test that we will fail at some point and the user can fall back to
// an orderedconsumer like we do with watch for KV Keys() call.
func TestNoRaceJetStreamStreamInfoSubjectDetailsLimits(t *testing.T) {
conf := createConfFile(t, []byte(`
listen: 127.0.0.1:-1
jetstream: enabled
accounts: {
default: {
jetstream: true
users: [ {user: me, password: pwd} ]
limits { max_payload: 256 }
}
}
`))
defer removeFile(t, conf)

s, _ := RunServerWithConfig(conf)
defer s.Shutdown()

if config := s.JetStreamConfig(); config != nil {
defer removeDir(t, config.StoreDir)
}

nc, js := jsClientConnect(t, s, nats.UserInfo("me", "pwd"))
defer nc.Close()

// Make sure we cannot send larger than 256 bytes.
// But we can receive larger.
sub, err := nc.SubscribeSync("foo")
require_NoError(t, err)
err = nc.Publish("foo", []byte(strings.Repeat("A", 300)))
require_Error(t, err, nats.ErrMaxPayload)
sub.Unsubscribe()

_, err = js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"*", "X.*"},
})
require_NoError(t, err)

n := JSMaxSubjectDetails
for i := 0; i < n; i++ {
_, err := js.PublishAsync(fmt.Sprintf("X.%d", i), []byte("OK"))
require_NoError(t, err)
}
select {
case <-js.PublishAsyncComplete():
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive completion signal")
}

getInfo := func(filter string) *StreamInfo {
t.Helper()
// Need to grab StreamInfo by hand for now.
req, err := json.Marshal(&JSApiStreamInfoRequest{SubjectsFilter: filter})
require_NoError(t, err)
resp, err := nc.Request(fmt.Sprintf(JSApiStreamInfoT, "TEST"), req, 5*time.Second)
require_NoError(t, err)
var si StreamInfo
err = json.Unmarshal(resp.Data, &si)
require_NoError(t, err)
return &si
}

si := getInfo("X.*")
if len(si.State.Subjects) != n {
t.Fatalf("Expected to get %d subject details, got %d", n, len(si.State.Subjects))
}

// Now add one more message in which will exceed our internal limits for subject details.
_, err = js.Publish("foo", []byte("TOO MUCH"))
require_NoError(t, err)

req, err := json.Marshal(&JSApiStreamInfoRequest{SubjectsFilter: nats.AllKeys})
require_NoError(t, err)
resp, err := nc.Request(fmt.Sprintf(JSApiStreamInfoT, "TEST"), req, 5*time.Second)
require_NoError(t, err)
var sir JSApiStreamInfoResponse
err = json.Unmarshal(resp.Data, &sir)
require_NoError(t, err)
if sir.Error == nil || sir.Error.Code != 500 || sir.Error.Description != "subject details would exceed maximum allowed" {
t.Fatalf("Did not get correct error response: %+v", sir.Error)
}
}
21 changes: 11 additions & 10 deletions server/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,16 +119,17 @@ const (

// StreamState is information about the given stream.
type StreamState struct {
Msgs uint64 `json:"messages"`
Bytes uint64 `json:"bytes"`
FirstSeq uint64 `json:"first_seq"`
FirstTime time.Time `json:"first_ts"`
LastSeq uint64 `json:"last_seq"`
LastTime time.Time `json:"last_ts"`
NumDeleted int `json:"num_deleted,omitempty"`
Deleted []uint64 `json:"deleted,omitempty"`
Lost *LostStreamData `json:"lost,omitempty"`
Consumers int `json:"consumer_count"`
Msgs uint64 `json:"messages"`
Bytes uint64 `json:"bytes"`
FirstSeq uint64 `json:"first_seq"`
FirstTime time.Time `json:"first_ts"`
LastSeq uint64 `json:"last_seq"`
LastTime time.Time `json:"last_ts"`
Subjects map[string]uint64 `json:"subjects,omitempty"`
NumDeleted int `json:"num_deleted,omitempty"`
Deleted []uint64 `json:"deleted,omitempty"`
Lost *LostStreamData `json:"lost,omitempty"`
Consumers int `json:"consumer_count"`
}

// SimpleState for filtered subject specific state.
Expand Down

0 comments on commit c3d2c2d

Please sign in to comment.