diff --git a/jetstream/jetstream.go b/jetstream/jetstream.go index 8f4e545d7..f7a30a476 100644 --- a/jetstream/jetstream.go +++ b/jetstream/jetstream.go @@ -158,6 +158,7 @@ type ( } streamInfoResponse struct { apiResponse + apiPaged *StreamInfo } diff --git a/jetstream/options.go b/jetstream/options.go index ae2941082..52f9054bd 100644 --- a/jetstream/options.go +++ b/jetstream/options.go @@ -264,7 +264,11 @@ func WithDeletedDetails(deletedDetails bool) StreamInfoOpt { } } -// WithSubjectFilter can be used to display the information about messages stored on given subjects +// WithSubjectFilter can be used to display the information about messages +// stored on given subjects. +// NOTE: if the subject filter matches over 100k +// subjects, this will result in multiple requests to the server to retrieve all +// the information, and all of the returned subjects will be kept in memory. func WithSubjectFilter(subject string) StreamInfoOpt { return func(req *streamInfoRequest) error { req.SubjectFilter = subject diff --git a/jetstream/stream.go b/jetstream/stream.go index e864cf164..a5c6492e8 100644 --- a/jetstream/stream.go +++ b/jetstream/stream.go @@ -102,6 +102,7 @@ type ( StreamInfoOpt func(*streamInfoRequest) error streamInfoRequest struct { + apiPaged DeletedDetails bool `json:"deleted_details,omitempty"` SubjectFilter string `json:"subjects_filter,omitempty"` } @@ -259,28 +260,56 @@ func (s *stream) Info(ctx context.Context, opts ...StreamInfoOpt) (*StreamInfo, } var req []byte var err error - if infoReq != nil { - req, err = json.Marshal(infoReq) - if err != nil { - return nil, err - } - } + var subjectMap map[string]uint64 + var offset int infoSubject := apiSubj(s.jetStream.apiPrefix, fmt.Sprintf(apiStreamInfoT, s.name)) - var resp streamInfoResponse - - if _, err = s.jetStream.apiRequestJSON(ctx, infoSubject, &resp, req); err != nil { - return nil, err - } - if resp.Error != nil { - if resp.Error.ErrorCode == JSErrCodeConsumerNotFound { - return nil, ErrStreamNotFound + var info *StreamInfo + for { + if infoReq != nil { + if infoReq.SubjectFilter != "" { + if subjectMap == nil { + subjectMap = make(map[string]uint64) + } + infoReq.Offset = offset + } + req, err = json.Marshal(infoReq) + if err != nil { + return nil, err + } + } + var resp streamInfoResponse + if _, err = s.jetStream.apiRequestJSON(ctx, infoSubject, &resp, req); err != nil { + return nil, err + } + if resp.Error != nil { + if resp.Error.ErrorCode == JSErrCodeStreamNotFound { + return nil, ErrStreamNotFound + } + return nil, resp.Error + } + info = resp.StreamInfo + var total int + if resp.Total != 0 { + total = resp.Total + } + if len(resp.StreamInfo.State.Subjects) > 0 { + for subj, msgs := range resp.StreamInfo.State.Subjects { + subjectMap[subj] = msgs + } + offset = len(subjectMap) + } + if total == 0 || total <= offset { + info.State.Subjects = nil + // we don't want to store subjects in cache + cached := *info + s.info = &cached + info.State.Subjects = subjectMap + break } - return nil, resp.Error } - s.info = resp.StreamInfo - return resp.StreamInfo, nil + return info, nil } // CachedInfo returns *StreamInfo cached on a stream struct diff --git a/jetstream/test/stream_test.go b/jetstream/test/stream_test.go index bb85f67d9..ca1009084 100644 --- a/jetstream/test/stream_test.go +++ b/jetstream/test/stream_test.go @@ -610,6 +610,48 @@ func TestStreamInfo(t *testing.T) { } } +func TestSubjectsFilterPaging(t *testing.T) { + srv := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, srv) + + nc, err := nats.Connect(srv.ClientURL()) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer nc.Close() + js, err := jetstream.New(nc) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + s, err := js.CreateStream(context.Background(), jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + for i := 0; i < 110000; i++ { + if _, err := js.PublishAsync(fmt.Sprintf("FOO.%d", i), nil); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + } + select { + case <-js.PublishAsyncComplete(): + case <-time.After(5 * time.Second): + t.Fatal("PublishAsyncComplete timeout") + } + + info, err := s.Info(context.Background(), jetstream.WithSubjectFilter("FOO.*")) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if len(info.State.Subjects) != 110000 { + t.Fatalf("Unexpected number of subjects; want: 110000; got: %d", len(info.State.Subjects)) + } + cInfo := s.CachedInfo() + if len(cInfo.State.Subjects) != 0 { + t.Fatalf("Unexpected number of subjects; want: 0; got: %d", len(cInfo.State.Subjects)) + } +} + func TestStreamCachedInfo(t *testing.T) { srv := RunBasicJetStreamServer() defer shutdownJSServerAndRemoveStorage(t, srv)