Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added active servers to statsz. #2276

Merged
merged 4 commits into from
Jun 10, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 14 additions & 11 deletions server/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ type ServerStats struct {
SlowConsumers int64 `json:"slow_consumers"`
Routes []*RouteStat `json:"routes,omitempty"`
Gateways []*GatewayStat `json:"gateways,omitempty"`
ActiveServers int `json:"active_servers,omitempty"`
}

// RouteStat holds route statistics.
Expand Down Expand Up @@ -489,7 +490,6 @@ func (s *Server) checkRemoteServers() {
s.Debugf("Detected orphan remote server: %q", sid)
// Simulate it going away.
s.processRemoteServerShutdown(sid)
delete(s.sys.servers, sid)
}
}
if s.sys.sweeper != nil {
Expand Down Expand Up @@ -535,7 +535,7 @@ func routeStat(r *client) *RouteStat {
// Actual send method for statz updates.
// Lock should be held.
func (s *Server) sendStatsz(subj string) {
m := ServerStatsMsg{}
var m ServerStatsMsg
s.updateServerUsage(&m.Stats)
m.Stats.Start = s.start
m.Stats.Connections = len(s.clients)
Expand All @@ -547,10 +547,11 @@ func (s *Server) sendStatsz(subj string) {
m.Stats.Sent.Bytes = atomic.LoadInt64(&s.outBytes)
m.Stats.SlowConsumers = atomic.LoadInt64(&s.slowConsumers)
m.Stats.NumSubs = s.numSubscriptions()

// Routes
for _, r := range s.routes {
m.Stats.Routes = append(m.Stats.Routes, routeStat(r))
}
// Gateways
if s.gateway.enabled {
gw := s.gateway
gw.RLock()
Expand Down Expand Up @@ -578,6 +579,11 @@ func (s *Server) sendStatsz(subj string) {
}
gw.RUnlock()
}
// Active Servers
m.Stats.ActiveServers = 1
if s.sys != nil {
m.Stats.ActiveServers += len(s.sys.servers)
}
s.sendInternalMsg(subj, _EMPTY_, &m.Server, &m)
}

Expand All @@ -598,8 +604,8 @@ func (s *Server) heartbeatStatsz() {
// This should be wrapChk() to setup common locking.
func (s *Server) startStatszTimer() {
// We will start by sending out more of these and trail off to the statsz being the max.
s.sys.cstatsz = time.Second
// Send out the first one only after a second.
s.sys.cstatsz = 250 * time.Millisecond
// Send out the first one after 250ms.
s.sys.stmr = time.AfterFunc(s.sys.cstatsz, s.wrapChk(s.heartbeatStatsz))
}

Expand Down Expand Up @@ -880,6 +886,7 @@ func (s *Server) processRemoteServerShutdown(sid string) {
}
return true
})
delete(s.sys.servers, sid)
}

// remoteServerShutdownEvent is called when we get an event from another server shutting down.
Expand Down Expand Up @@ -924,12 +931,6 @@ func (s *Server) remoteServerUpdate(sub *subscription, _ *client, subject, reply
}
si := ssm.Server
node := string(getHash(si.Name))
if _, ok := s.nodeToInfo.Load(node); !ok {
matthiashanel marked this conversation as resolved.
Show resolved Hide resolved
// Since we have not seen this one they probably have not seen us so send out our update.
s.mu.Lock()
s.sendStatsz(fmt.Sprintf(serverStatsSubj, s.info.ID))
s.mu.Unlock()
}
s.nodeToInfo.Store(node, nodeInfo{si.Name, si.Cluster, si.ID, false, si.JetStream})
}

Expand Down Expand Up @@ -962,6 +963,8 @@ func (s *Server) processNewServer(ms *ServerInfo) {
// Add to our nodeToName
node := string(getHash(ms.Name))
s.nodeToInfo.Store(node, nodeInfo{ms.Name, ms.Cluster, ms.ID, false, ms.JetStream})
// Announce ourselves..
s.sendStatsz(fmt.Sprintf(serverStatsSubj, s.info.ID))
}

// If GW is enabled on this server and there are any leaf node connections,
Expand Down
38 changes: 38 additions & 0 deletions server/jetstream_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7125,6 +7125,44 @@ func TestJetStreamClusterVarzReporting(t *testing.T) {
}
}

func TestJetStreamClusterStatszActiveServers(t *testing.T) {
sc := createJetStreamSuperCluster(t, 2, 2)
defer sc.shutdown()

checkActive := func(expected int) {
t.Helper()
checkFor(t, 10*time.Second, 500*time.Millisecond, func() error {
s := sc.randomServer()
nc, err := nats.Connect(s.ClientURL(), nats.UserInfo("admin", "s3cr3t!"))
if err != nil {
t.Fatalf("Failed to create system client: %v", err)
}
defer nc.Close()

resp, err := nc.Request(serverStatsPingReqSubj, nil, time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
var ssm ServerStatsMsg
if err := json.Unmarshal(resp.Data, &ssm); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if ssm.Stats.ActiveServers != expected {
return fmt.Errorf("Wanted %d, got %d", expected, ssm.Stats.ActiveServers)
}
return nil
})
}

checkActive(4)
c := sc.randomCluster()
ss := c.randomServer()
ss.Shutdown()
checkActive(3)
c.restartServer(ss)
checkActive(4)
}

// Support functions

// Used to setup superclusters for tests.
Expand Down
4 changes: 2 additions & 2 deletions test/norace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,11 +330,11 @@ func TestNoRaceLargeClusterMem(t *testing.T) {
checkClusterFormed(t, servers...)

// Calculate in MB what we are using now.
const max = 64 * 1024 * 1024 // 64MB
const max = 80 * 1024 * 1024 // 80MB
runtime.ReadMemStats(&m)
used := m.TotalAlloc - pta
if used > max {
t.Fatalf("Cluster using too much memory, expect < 60MB, got %dMB", used/(1024*1024))
t.Fatalf("Cluster using too much memory, expect < 80MB, got %dMB", used/(1024*1024))
}

for _, s := range servers {
Expand Down