Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose raftz and ipqueuesz via system account #6439

Merged
merged 1 commit into from
Jan 31, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions server/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -1256,6 +1256,14 @@ func (s *Server) initEventTracking() {
optz := &ExpvarzEventOptions{}
s.zReq(c, reply, hdr, msg, &optz.EventFilterOptions, optz, func() (any, error) { return s.expvarz(optz), nil })
},
"IPQUEUESZ": func(sub *subscription, c *client, _ *Account, subject, reply string, hdr, msg []byte) {
optz := &IpqueueszEventOptions{}
s.zReq(c, reply, hdr, msg, &optz.EventFilterOptions, optz, func() (any, error) { return s.Ipqueuesz(&optz.IpqueueszOptions), nil })
},
"RAFTZ": func(sub *subscription, c *client, _ *Account, subject, reply string, hdr, msg []byte) {
optz := &RaftzEventOptions{}
s.zReq(c, reply, hdr, msg, &optz.EventFilterOptions, optz, func() (any, error) { return s.Raftz(&optz.RaftzOptions), nil })
},
}
profilez := func(_ *subscription, c *client, _ *Account, _, rply string, rmsg []byte) {
hdr, msg := c.msgParts(rmsg)
Expand Down Expand Up @@ -1993,6 +2001,18 @@ type ExpvarzEventOptions struct {
EventFilterOptions
}

// In the context of system events, IpqueueszEventOptions are options passed to Ipqueuesz
type IpqueueszEventOptions struct {
EventFilterOptions
IpqueueszOptions
}

// In the context of system events, RaftzEventOptions are options passed to Raftz
type RaftzEventOptions struct {
EventFilterOptions
RaftzOptions
}

// returns true if the request does NOT apply to this server and can be ignored.
// DO NOT hold the server lock when
func (s *Server) filterRequest(fOpts *EventFilterOptions) bool {
Expand Down Expand Up @@ -2115,6 +2135,20 @@ type ServerAPIExpvarzResponse struct {
Error *ApiError `json:"error,omitempty"`
}

// ServerAPIpqueueszResponse is the response type for ipqueuesz
type ServerAPIpqueueszResponse struct {
Server *ServerInfo `json:"server"`
Data *IpqueueszStatus `json:"data,omitempty"`
Error *ApiError `json:"error,omitempty"`
}

// ServerAPIRaftzResponse is the response type for raftz
type ServerAPIRaftzResponse struct {
Server *ServerInfo `json:"server"`
Data *RaftzStatus `json:"data,omitempty"`
Error *ApiError `json:"error,omitempty"`
}

// statszReq is a request for us to respond with current statsz.
func (s *Server) statszReq(sub *subscription, c *client, _ *Account, subject, reply string, hdr, msg []byte) {
if !s.EventsEnabled() {
Expand Down
2 changes: 1 addition & 1 deletion server/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1675,7 +1675,7 @@ func TestSystemAccountWithGateways(t *testing.T) {

// If this tests fails with wrong number after 10 seconds we may have
// added a new initial subscription for the eventing system.
checkExpectedSubs(t, 58, sa)
checkExpectedSubs(t, 62, sa)

// Create a client on B and see if we receive the event
urlb := fmt.Sprintf("nats://%s:%d", ob.Host, ob.Port)
Expand Down
79 changes: 55 additions & 24 deletions server/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1123,20 +1123,16 @@ func (s *Server) HandleStacksz(w http.ResponseWriter, r *http.Request) {
ResponseHandler(w, r, buf[:n])
}

type monitorIPQueue struct {
type IpqueueszStatusIPQ struct {
Pending int `json:"pending"`
InProgress int `json:"in_progress,omitempty"`
}

func (s *Server) HandleIPQueuesz(w http.ResponseWriter, r *http.Request) {
all, err := decodeBool(w, r, "all")
if err != nil {
return
}
qfilter := r.URL.Query().Get("queues")

queues := map[string]monitorIPQueue{}
type IpqueueszStatus map[string]IpqueueszStatusIPQ

func (s *Server) Ipqueuesz(opts *IpqueueszOptions) *IpqueueszStatus {
all, qfilter := opts.All, opts.Filter
queues := IpqueueszStatus{}
s.ipQueues.Range(func(k, v any) bool {
var pending, inProgress int
name := k.(string)
Expand All @@ -1153,9 +1149,23 @@ func (s *Server) HandleIPQueuesz(w http.ResponseWriter, r *http.Request) {
} else if qfilter != _EMPTY_ && !strings.Contains(name, qfilter) {
return true
}
queues[name] = monitorIPQueue{Pending: pending, InProgress: inProgress}
queues[name] = IpqueueszStatusIPQ{Pending: pending, InProgress: inProgress}
return true
})
return &queues
}

func (s *Server) HandleIPQueuesz(w http.ResponseWriter, r *http.Request) {
all, err := decodeBool(w, r, "all")
if err != nil {
return
}
qfilter := r.URL.Query().Get("queues")

queues := s.Ipqueuesz(&IpqueueszOptions{
All: all,
Filter: qfilter,
})

b, _ := json.MarshalIndent(queues, "", " ")
ResponseHandler(w, r, b)
Expand Down Expand Up @@ -2771,6 +2781,18 @@ type ProfilezOptions struct {
Duration time.Duration `json:"duration,omitempty"`
}

// IpqueueszOptions are options passed to Ipqueuesz
type IpqueueszOptions struct {
All bool `json:"all"`
Filter string `json:"filter"`
}

// RaftzOptions are options passed to Raftz
type RaftzOptions struct {
AccountFilter string `json:"account"`
GroupFilter string `json:"group"`
}

// StreamDetail shows information about the stream state and its consumers.
type StreamDetail struct {
Name string `json:"name"`
Expand Down Expand Up @@ -3827,27 +3849,43 @@ type RaftzGroupPeer struct {
LastSeen string `json:"last_seen,omitempty"`
}

type RaftzStatus map[string]map[string]RaftzGroup

func (s *Server) HandleRaftz(w http.ResponseWriter, r *http.Request) {
if s.raftNodes == nil {
w.WriteHeader(404)
w.Write([]byte("No Raft nodes registered"))
return
}

gfilter := r.URL.Query().Get("group")
afilter := r.URL.Query().Get("acc")
groups := s.Raftz(&RaftzOptions{
AccountFilter: r.URL.Query().Get("acc"),
GroupFilter: r.URL.Query().Get("group"),
})

if groups == nil {
w.WriteHeader(404)
w.Write([]byte("No Raft nodes returned, check supplied filters"))
return
}

b, _ := json.MarshalIndent(groups, "", " ")
ResponseHandler(w, r, b)
}

func (s *Server) Raftz(opts *RaftzOptions) *RaftzStatus {
afilter, gfilter := opts.AccountFilter, opts.GroupFilter

if afilter == _EMPTY_ {
if sys := s.SystemAccount(); sys != nil {
afilter = sys.Name
} else {
w.WriteHeader(404)
w.Write([]byte("System account not found, the server may be shutting down"))
return
return nil
}
}

groups := map[string]RaftNode{}
infos := map[string]map[string]RaftzGroup{} // account -> group ID
infos := RaftzStatus{} // account -> group ID

s.rnMu.RLock()
if gfilter != _EMPTY_ {
Expand All @@ -3873,12 +3911,6 @@ func (s *Server) HandleRaftz(w http.ResponseWriter, r *http.Request) {
}
s.rnMu.RUnlock()

if len(groups) == 0 {
w.WriteHeader(404)
w.Write([]byte("No Raft nodes found, does the specified account/group exist?"))
return
}

for name, rg := range groups {
n, ok := rg.(*raft)
if n == nil || !ok {
Expand Down Expand Up @@ -3932,6 +3964,5 @@ func (s *Server) HandleRaftz(w http.ResponseWriter, r *http.Request) {
infos[n.accName][name] = info
}

b, _ := json.MarshalIndent(infos, "", " ")
ResponseHandler(w, r, b)
return &infos
}
5 changes: 3 additions & 2 deletions server/monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5902,10 +5902,11 @@ func TestMonitorIpqzWithGenerics(t *testing.T) {
body := readBody(t, url)
require_True(t, len(body) > 0)

queues := map[string]*monitorIPQueue{}
queues := IpqueueszStatus{}
require_NoError(t, json.Unmarshal(body, &queues))
require_True(t, len(queues) >= 4)
require_True(t, queues["SendQ"] != nil)
_, ok := queues["SendQ"]
require_True(t, ok)
}

func TestMonitorVarzSyncInterval(t *testing.T) {
Expand Down