From 6bbe75e3236d37bd7e3390c545061677b5ed2d50 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 31 Jan 2022 18:46:12 -0800 Subject: [PATCH] Removed NumSubjects, too costly for default Signed-off-by: Derek Collison --- server/filestore.go | 27 ++++++------------------ server/jetstream_test.go | 45 ---------------------------------------- server/memstore.go | 2 -- server/store.go | 23 ++++++++++---------- 4 files changed, 17 insertions(+), 80 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index d0b00a2cee6..485f8e399a9 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -3274,18 +3274,18 @@ func (mb *msgBlock) cacheLookup(seq uint64) (*fileStoredMsg, error) { mb.llseq = seq } - // We use the high bit to denote we have already checked the checksum. - var hh hash.Hash64 - if !hashChecked { - hh = mb.hh // This will force the hash check in msgFromBuf. - } - li := int(bi) - mb.cache.off if li >= len(mb.cache.buf) { return nil, errPartialCache } buf := mb.cache.buf[li:] + // We use the high bit to denote we have already checked the checksum. + var hh hash.Hash64 + if !hashChecked { + hh = mb.hh // This will force the hash check in msgFromBuf. + } + // Parse from the raw buffer. subj, hdr, msg, mseq, ts, err := msgFromBuf(buf, hh) if err != nil { @@ -3456,16 +3456,6 @@ func (fs *fileStore) FastState(state *StreamState) { } state.Consumers = len(fs.cfs) - fss := make(map[string]struct{}) - for _, mb := range fs.blks { - mb.mu.Lock() - for subj := range mb.fss { - fss[subj] = struct{}{} - } - mb.mu.Unlock() - } - state.NumSubjects = len(fss) - fs.mu.RUnlock() } @@ -3476,7 +3466,6 @@ func (fs *fileStore) State() StreamState { state.Consumers = len(fs.cfs) state.Deleted = nil // make sure. - fss := make(map[string]struct{}) for _, mb := range fs.blks { mb.mu.Lock() fseq := mb.first.seq @@ -3487,15 +3476,11 @@ func (fs *fileStore) State() StreamState { state.Deleted = append(state.Deleted, seq) } } - for subj := range mb.fss { - fss[subj] = struct{}{} - } mb.mu.Unlock() } fs.mu.RUnlock() state.Lost = fs.lostData() - state.NumSubjects = len(fss) // Can not be guaranteed to be sorted. if len(state.Deleted) > 0 { diff --git a/server/jetstream_test.go b/server/jetstream_test.go index a6b84b409c8..71064b4bf22 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -14645,51 +14645,6 @@ func TestJetStreamMaxMsgsPerSubjectWithDiscardNew(t *testing.T) { } } -func TestJetStreamStreamInfoNumSubjects(t *testing.T) { - s := RunBasicJetStreamServer() - defer s.Shutdown() - - if config := s.JetStreamConfig(); config != nil { - defer removeDir(t, config.StoreDir) - } - - nc, js := jsClientConnect(t, s) - defer nc.Close() - - testNumSubjects := func(t *testing.T, st nats.StorageType) { - _, err := js.AddStream(&nats.StreamConfig{ - Name: "TEST", - Subjects: []string{"*"}, - Storage: st, - }) - require_NoError(t, err) - defer js.DeleteStream("TEST") - - counts, msg := []int{22, 33, 44}, []byte("ok") - // Now place msgs, foo-22, bar-33 and baz-44. - for i, subj := range []string{"foo", "bar", "baz"} { - for n := 0; n < counts[i]; n++ { - _, err = js.Publish(subj, msg) - require_NoError(t, err) - } - } - - // Need to grab StreamInfo by hand for now. - resp, err := nc.Request(fmt.Sprintf(JSApiStreamInfoT, "TEST"), nil, time.Second) - require_NoError(t, err) - var si StreamInfo - err = json.Unmarshal(resp.Data, &si) - require_NoError(t, err) - - if si.State.NumSubjects != 3 { - t.Fatalf("Expected NumSubjects to be %d, but got %d", 3, si.State.NumSubjects) - } - } - - t.Run("MemoryStore", func(t *testing.T) { testNumSubjects(t, nats.MemoryStorage) }) - t.Run("FileStore", func(t *testing.T) { testNumSubjects(t, nats.FileStorage) }) -} - func TestJetStreamStreamInfoSubjectsDetails(t *testing.T) { s := RunBasicJetStreamServer() defer s.Shutdown() diff --git a/server/memstore.go b/server/memstore.go index b465e5a03d5..ccb10eba2f2 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -754,7 +754,6 @@ func (ms *memStore) FastState(state *StreamState) { state.NumDeleted = int((state.LastSeq - state.FirstSeq) - state.Msgs + 1) } state.Consumers = ms.consumers - state.NumSubjects = len(ms.fss) ms.mu.RUnlock() } @@ -779,7 +778,6 @@ func (ms *memStore) State() StreamState { } } } - state.NumSubjects = len(ms.fss) return state } diff --git a/server/store.go b/server/store.go index d472286eb40..5d0e38563e1 100644 --- a/server/store.go +++ b/server/store.go @@ -119,18 +119,17 @@ const ( // StreamState is information about the given stream. type StreamState struct { - Msgs uint64 `json:"messages"` - Bytes uint64 `json:"bytes"` - FirstSeq uint64 `json:"first_seq"` - FirstTime time.Time `json:"first_ts"` - LastSeq uint64 `json:"last_seq"` - LastTime time.Time `json:"last_ts"` - NumSubjects int `json:"num_subjects,omitempty"` - Subjects map[string]uint64 `json:"subjects,omitempty"` - NumDeleted int `json:"num_deleted,omitempty"` - Deleted []uint64 `json:"deleted,omitempty"` - Lost *LostStreamData `json:"lost,omitempty"` - Consumers int `json:"consumer_count"` + Msgs uint64 `json:"messages"` + Bytes uint64 `json:"bytes"` + FirstSeq uint64 `json:"first_seq"` + FirstTime time.Time `json:"first_ts"` + LastSeq uint64 `json:"last_seq"` + LastTime time.Time `json:"last_ts"` + Subjects map[string]uint64 `json:"subjects,omitempty"` + NumDeleted int `json:"num_deleted,omitempty"` + Deleted []uint64 `json:"deleted,omitempty"` + Lost *LostStreamData `json:"lost,omitempty"` + Consumers int `json:"consumer_count"` } // SimpleState for filtered subject specific state.