Skip to content

Commit

Permalink
Removed NumSubjects, too costly for default
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Feb 1, 2022
1 parent 4057363 commit 6bbe75e
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 80 deletions.
27 changes: 6 additions & 21 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -3274,18 +3274,18 @@ func (mb *msgBlock) cacheLookup(seq uint64) (*fileStoredMsg, error) {
mb.llseq = seq
}

// We use the high bit to denote we have already checked the checksum.
var hh hash.Hash64
if !hashChecked {
hh = mb.hh // This will force the hash check in msgFromBuf.
}

li := int(bi) - mb.cache.off
if li >= len(mb.cache.buf) {
return nil, errPartialCache
}
buf := mb.cache.buf[li:]

// We use the high bit to denote we have already checked the checksum.
var hh hash.Hash64
if !hashChecked {
hh = mb.hh // This will force the hash check in msgFromBuf.
}

// Parse from the raw buffer.
subj, hdr, msg, mseq, ts, err := msgFromBuf(buf, hh)
if err != nil {
Expand Down Expand Up @@ -3456,16 +3456,6 @@ func (fs *fileStore) FastState(state *StreamState) {
}
state.Consumers = len(fs.cfs)

fss := make(map[string]struct{})
for _, mb := range fs.blks {
mb.mu.Lock()
for subj := range mb.fss {
fss[subj] = struct{}{}
}
mb.mu.Unlock()
}
state.NumSubjects = len(fss)

fs.mu.RUnlock()
}

Expand All @@ -3476,7 +3466,6 @@ func (fs *fileStore) State() StreamState {
state.Consumers = len(fs.cfs)
state.Deleted = nil // make sure.

fss := make(map[string]struct{})
for _, mb := range fs.blks {
mb.mu.Lock()
fseq := mb.first.seq
Expand All @@ -3487,15 +3476,11 @@ func (fs *fileStore) State() StreamState {
state.Deleted = append(state.Deleted, seq)
}
}
for subj := range mb.fss {
fss[subj] = struct{}{}
}
mb.mu.Unlock()
}
fs.mu.RUnlock()

state.Lost = fs.lostData()
state.NumSubjects = len(fss)

// Can not be guaranteed to be sorted.
if len(state.Deleted) > 0 {
Expand Down
45 changes: 0 additions & 45 deletions server/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14645,51 +14645,6 @@ func TestJetStreamMaxMsgsPerSubjectWithDiscardNew(t *testing.T) {
}
}

func TestJetStreamStreamInfoNumSubjects(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()

if config := s.JetStreamConfig(); config != nil {
defer removeDir(t, config.StoreDir)
}

nc, js := jsClientConnect(t, s)
defer nc.Close()

testNumSubjects := func(t *testing.T, st nats.StorageType) {
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"*"},
Storage: st,
})
require_NoError(t, err)
defer js.DeleteStream("TEST")

counts, msg := []int{22, 33, 44}, []byte("ok")
// Now place msgs, foo-22, bar-33 and baz-44.
for i, subj := range []string{"foo", "bar", "baz"} {
for n := 0; n < counts[i]; n++ {
_, err = js.Publish(subj, msg)
require_NoError(t, err)
}
}

// Need to grab StreamInfo by hand for now.
resp, err := nc.Request(fmt.Sprintf(JSApiStreamInfoT, "TEST"), nil, time.Second)
require_NoError(t, err)
var si StreamInfo
err = json.Unmarshal(resp.Data, &si)
require_NoError(t, err)

if si.State.NumSubjects != 3 {
t.Fatalf("Expected NumSubjects to be %d, but got %d", 3, si.State.NumSubjects)
}
}

t.Run("MemoryStore", func(t *testing.T) { testNumSubjects(t, nats.MemoryStorage) })
t.Run("FileStore", func(t *testing.T) { testNumSubjects(t, nats.FileStorage) })
}

func TestJetStreamStreamInfoSubjectsDetails(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
Expand Down
2 changes: 0 additions & 2 deletions server/memstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -754,7 +754,6 @@ func (ms *memStore) FastState(state *StreamState) {
state.NumDeleted = int((state.LastSeq - state.FirstSeq) - state.Msgs + 1)
}
state.Consumers = ms.consumers
state.NumSubjects = len(ms.fss)
ms.mu.RUnlock()
}

Expand All @@ -779,7 +778,6 @@ func (ms *memStore) State() StreamState {
}
}
}
state.NumSubjects = len(ms.fss)

return state
}
Expand Down
23 changes: 11 additions & 12 deletions server/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,18 +119,17 @@ const (

// StreamState is information about the given stream.
type StreamState struct {
Msgs uint64 `json:"messages"`
Bytes uint64 `json:"bytes"`
FirstSeq uint64 `json:"first_seq"`
FirstTime time.Time `json:"first_ts"`
LastSeq uint64 `json:"last_seq"`
LastTime time.Time `json:"last_ts"`
NumSubjects int `json:"num_subjects,omitempty"`
Subjects map[string]uint64 `json:"subjects,omitempty"`
NumDeleted int `json:"num_deleted,omitempty"`
Deleted []uint64 `json:"deleted,omitempty"`
Lost *LostStreamData `json:"lost,omitempty"`
Consumers int `json:"consumer_count"`
Msgs uint64 `json:"messages"`
Bytes uint64 `json:"bytes"`
FirstSeq uint64 `json:"first_seq"`
FirstTime time.Time `json:"first_ts"`
LastSeq uint64 `json:"last_seq"`
LastTime time.Time `json:"last_ts"`
Subjects map[string]uint64 `json:"subjects,omitempty"`
NumDeleted int `json:"num_deleted,omitempty"`
Deleted []uint64 `json:"deleted,omitempty"`
Lost *LostStreamData `json:"lost,omitempty"`
Consumers int `json:"consumer_count"`
}

// SimpleState for filtered subject specific state.
Expand Down

0 comments on commit 6bbe75e

Please sign in to comment.