diff --git a/server/events.go b/server/events.go index 695cb6b1e0f..51620cbc07e 100644 --- a/server/events.go +++ b/server/events.go @@ -1256,6 +1256,14 @@ func (s *Server) initEventTracking() { optz := &ExpvarzEventOptions{} s.zReq(c, reply, hdr, msg, &optz.EventFilterOptions, optz, func() (any, error) { return s.expvarz(optz), nil }) }, + "IPQUEUESZ": func(sub *subscription, c *client, _ *Account, subject, reply string, hdr, msg []byte) { + optz := &IpqueueszEventOptions{} + s.zReq(c, reply, hdr, msg, &optz.EventFilterOptions, optz, func() (any, error) { return s.Ipqueuesz(&optz.IpqueueszOptions), nil }) + }, + "RAFTZ": func(sub *subscription, c *client, _ *Account, subject, reply string, hdr, msg []byte) { + optz := &RaftzEventOptions{} + s.zReq(c, reply, hdr, msg, &optz.EventFilterOptions, optz, func() (any, error) { return s.Raftz(&optz.RaftzOptions), nil }) + }, } profilez := func(_ *subscription, c *client, _ *Account, _, rply string, rmsg []byte) { hdr, msg := c.msgParts(rmsg) @@ -1993,6 +2001,18 @@ type ExpvarzEventOptions struct { EventFilterOptions } +// In the context of system events, IpqueueszEventOptions are options passed to Ipqueuesz +type IpqueueszEventOptions struct { + EventFilterOptions + IpqueueszOptions +} + +// In the context of system events, RaftzEventOptions are options passed to Raftz +type RaftzEventOptions struct { + EventFilterOptions + RaftzOptions +} + // returns true if the request does NOT apply to this server and can be ignored. // DO NOT hold the server lock when func (s *Server) filterRequest(fOpts *EventFilterOptions) bool { @@ -2115,6 +2135,20 @@ type ServerAPIExpvarzResponse struct { Error *ApiError `json:"error,omitempty"` } +// ServerAPIpqueueszResponse is the response type for ipqueuesz +type ServerAPIpqueueszResponse struct { + Server *ServerInfo `json:"server"` + Data *IpqueueszStatus `json:"data,omitempty"` + Error *ApiError `json:"error,omitempty"` +} + +// ServerAPIRaftzResponse is the response type for raftz +type ServerAPIRaftzResponse struct { + Server *ServerInfo `json:"server"` + Data *RaftzStatus `json:"data,omitempty"` + Error *ApiError `json:"error,omitempty"` +} + // statszReq is a request for us to respond with current statsz. func (s *Server) statszReq(sub *subscription, c *client, _ *Account, subject, reply string, hdr, msg []byte) { if !s.EventsEnabled() { diff --git a/server/events_test.go b/server/events_test.go index b3c7a1ef604..71447512939 100644 --- a/server/events_test.go +++ b/server/events_test.go @@ -1675,7 +1675,7 @@ func TestSystemAccountWithGateways(t *testing.T) { // If this tests fails with wrong number after 10 seconds we may have // added a new initial subscription for the eventing system. - checkExpectedSubs(t, 58, sa) + checkExpectedSubs(t, 62, sa) // Create a client on B and see if we receive the event urlb := fmt.Sprintf("nats://%s:%d", ob.Host, ob.Port) diff --git a/server/monitor.go b/server/monitor.go index df14a22c7c3..b38a07ebb6d 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -1123,20 +1123,16 @@ func (s *Server) HandleStacksz(w http.ResponseWriter, r *http.Request) { ResponseHandler(w, r, buf[:n]) } -type monitorIPQueue struct { +type IpqueueszStatusIPQ struct { Pending int `json:"pending"` InProgress int `json:"in_progress,omitempty"` } -func (s *Server) HandleIPQueuesz(w http.ResponseWriter, r *http.Request) { - all, err := decodeBool(w, r, "all") - if err != nil { - return - } - qfilter := r.URL.Query().Get("queues") - - queues := map[string]monitorIPQueue{} +type IpqueueszStatus map[string]IpqueueszStatusIPQ +func (s *Server) Ipqueuesz(opts *IpqueueszOptions) *IpqueueszStatus { + all, qfilter := opts.All, opts.Filter + queues := IpqueueszStatus{} s.ipQueues.Range(func(k, v any) bool { var pending, inProgress int name := k.(string) @@ -1153,9 +1149,23 @@ func (s *Server) HandleIPQueuesz(w http.ResponseWriter, r *http.Request) { } else if qfilter != _EMPTY_ && !strings.Contains(name, qfilter) { return true } - queues[name] = monitorIPQueue{Pending: pending, InProgress: inProgress} + queues[name] = IpqueueszStatusIPQ{Pending: pending, InProgress: inProgress} return true }) + return &queues +} + +func (s *Server) HandleIPQueuesz(w http.ResponseWriter, r *http.Request) { + all, err := decodeBool(w, r, "all") + if err != nil { + return + } + qfilter := r.URL.Query().Get("queues") + + queues := s.Ipqueuesz(&IpqueueszOptions{ + All: all, + Filter: qfilter, + }) b, _ := json.MarshalIndent(queues, "", " ") ResponseHandler(w, r, b) @@ -2771,6 +2781,18 @@ type ProfilezOptions struct { Duration time.Duration `json:"duration,omitempty"` } +// IpqueueszOptions are options passed to Ipqueuesz +type IpqueueszOptions struct { + All bool `json:"all"` + Filter string `json:"filter"` +} + +// RaftzOptions are options passed to Raftz +type RaftzOptions struct { + AccountFilter string `json:"account"` + GroupFilter string `json:"group"` +} + // StreamDetail shows information about the stream state and its consumers. type StreamDetail struct { Name string `json:"name"` @@ -3827,6 +3849,8 @@ type RaftzGroupPeer struct { LastSeen string `json:"last_seen,omitempty"` } +type RaftzStatus map[string]map[string]RaftzGroup + func (s *Server) HandleRaftz(w http.ResponseWriter, r *http.Request) { if s.raftNodes == nil { w.WriteHeader(404) @@ -3834,20 +3858,34 @@ func (s *Server) HandleRaftz(w http.ResponseWriter, r *http.Request) { return } - gfilter := r.URL.Query().Get("group") - afilter := r.URL.Query().Get("acc") + groups := s.Raftz(&RaftzOptions{ + AccountFilter: r.URL.Query().Get("acc"), + GroupFilter: r.URL.Query().Get("group"), + }) + + if groups == nil { + w.WriteHeader(404) + w.Write([]byte("No Raft nodes returned, check supplied filters")) + return + } + + b, _ := json.MarshalIndent(groups, "", " ") + ResponseHandler(w, r, b) +} + +func (s *Server) Raftz(opts *RaftzOptions) *RaftzStatus { + afilter, gfilter := opts.AccountFilter, opts.GroupFilter + if afilter == _EMPTY_ { if sys := s.SystemAccount(); sys != nil { afilter = sys.Name } else { - w.WriteHeader(404) - w.Write([]byte("System account not found, the server may be shutting down")) - return + return nil } } groups := map[string]RaftNode{} - infos := map[string]map[string]RaftzGroup{} // account -> group ID + infos := RaftzStatus{} // account -> group ID s.rnMu.RLock() if gfilter != _EMPTY_ { @@ -3873,12 +3911,6 @@ func (s *Server) HandleRaftz(w http.ResponseWriter, r *http.Request) { } s.rnMu.RUnlock() - if len(groups) == 0 { - w.WriteHeader(404) - w.Write([]byte("No Raft nodes found, does the specified account/group exist?")) - return - } - for name, rg := range groups { n, ok := rg.(*raft) if n == nil || !ok { @@ -3932,6 +3964,5 @@ func (s *Server) HandleRaftz(w http.ResponseWriter, r *http.Request) { infos[n.accName][name] = info } - b, _ := json.MarshalIndent(infos, "", " ") - ResponseHandler(w, r, b) + return &infos } diff --git a/server/monitor_test.go b/server/monitor_test.go index 0d6209a082c..3b9b84ff091 100644 --- a/server/monitor_test.go +++ b/server/monitor_test.go @@ -5902,10 +5902,11 @@ func TestMonitorIpqzWithGenerics(t *testing.T) { body := readBody(t, url) require_True(t, len(body) > 0) - queues := map[string]*monitorIPQueue{} + queues := IpqueueszStatus{} require_NoError(t, json.Unmarshal(body, &queues)) require_True(t, len(queues) >= 4) - require_True(t, queues["SendQ"] != nil) + _, ok := queues["SendQ"] + require_True(t, ok) } func TestMonitorVarzSyncInterval(t *testing.T) {