From 0a0787553afa80214bf96fb34fee7933e0498f87 Mon Sep 17 00:00:00 2001 From: "R.I.Pienaar" Date: Wed, 24 Jul 2024 13:21:50 +0300 Subject: [PATCH] Export js limits in varz Signed-off-by: R.I.Pienaar --- server/events.go | 1 + server/monitor.go | 3 ++ server/monitor_test.go | 113 +++++++++++++++++++++++++---------------- 3 files changed, 73 insertions(+), 44 deletions(-) diff --git a/server/events.go b/server/events.go index 83f9a850310..d1ebeca2449 100644 --- a/server/events.go +++ b/server/events.go @@ -953,6 +953,7 @@ func (s *Server) sendStatsz(subj string) { } } } + jStat.Limits = &s.getOpts().JetStreamLimits m.Stats.JetStream = jStat s.mu.RLock() } diff --git a/server/monitor.go b/server/monitor.go index 20fab4ff54f..2a22520ce7c 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -1229,6 +1229,7 @@ type JetStreamVarz struct { Config *JetStreamConfig `json:"config,omitempty"` Stats *JetStreamStats `json:"stats,omitempty"` Meta *MetaClusterInfo `json:"meta,omitempty"` + Limits *JSLimitOpts `json:"limits,omitempty"` } // ClusterOptsVarz contains monitoring cluster information @@ -1454,6 +1455,7 @@ func (s *Server) updateJszVarz(js *jetStream, v *JetStreamVarz, doConfig bool) { js.mu.RUnlock() } v.Stats = js.usageStats() + v.Limits = &s.getOpts().JetStreamLimits if mg := js.getMetaGroup(); mg != nil { if ci := s.raftNodeToClusterInfo(mg); ci != nil { v.Meta = &MetaClusterInfo{Name: ci.Name, Leader: ci.Leader, Peer: getHash(ci.Leader), Size: mg.ClusterSize()} @@ -1828,6 +1830,7 @@ func (s *Server) HandleVarz(w http.ResponseWriter, r *http.Request) { } sv.Stats = v.Stats sv.Meta = v.Meta + sv.Limits = v.Limits s.mu.RUnlock() } diff --git a/server/monitor_test.go b/server/monitor_test.go index 60faf3616ba..c94c4b8065e 100644 --- a/server/monitor_test.go +++ b/server/monitor_test.go @@ -25,6 +25,7 @@ import ( "net/http" "net/url" "os" + "path/filepath" "reflect" "runtime" "sort" @@ -57,6 +58,47 @@ func DefaultMonitorOptions() *Options { } } +func runMonitorJSServer(t *testing.T, clientPort int, monitorPort int, clusterPort int, routePort int) (*Server, *Options) { + resetPreviousHTTPConnections() + tmpDir := t.TempDir() + cf := createConfFile(t, []byte(fmt.Sprintf(` + listen: 127.0.0.1:%d + http: 127.0.0.1:%d + system_account: SYS + accounts { + SYS { + users [{user: sys, password: pwd}] + } + ACC { + users [{user: usr, password: pwd}] + // In clustered mode, these reservations will not impact any one server. + jetstream: {max_store: 4Mb, max_memory: 5Mb} + } + BCC_TO_HAVE_ONE_EXTRA { + users [{user: usr2, password: pwd}] + jetstream: enabled + } + } + jetstream: { + max_mem_store: 10Mb + max_file_store: 10Mb + store_dir: '%s' + unique_tag: az + limits: { + max_ha_assets: 1000 + } + } + cluster { + name: cluster_name + listen: 127.0.0.1:%d + routes: [nats-route://127.0.0.1:%d] + } + server_name: server_%d + server_tags: [ "az:%d", "tag" ] `, clientPort, monitorPort, tmpDir, clusterPort, routePort, clientPort, clientPort))) + + return RunServerWithConfig(cf) +} + func runMonitorServer() *Server { resetPreviousHTTPConnections() opts := DefaultMonitorOptions() @@ -229,7 +271,7 @@ func TestVarzSubscriptionsResetProperly(t *testing.T) { } func TestHandleVarz(t *testing.T) { - s := runMonitorServer() + s, _ := runMonitorJSServer(t, -1, -1, 0, 0) defer s.Shutdown() url := fmt.Sprintf("http://127.0.0.1:%d/", s.MonitorAddr().Port) @@ -245,7 +287,7 @@ func TestHandleVarz(t *testing.T) { time.Sleep(100 * time.Millisecond) - nc := createClientConnSubscribeAndPublish(t, s) + nc := createClientConnWithUserSubscribeAndPublish(t, s, "sys", "pwd") defer nc.Close() for mode := 0; mode < 2; mode++ { @@ -269,14 +311,33 @@ func TestHandleVarz(t *testing.T) { if v.OutBytes != 5 { t.Fatalf("Expected OutBytes of 5, got %v\n", v.OutBytes) } - if v.Subscriptions != 0 { - t.Fatalf("Expected Subscriptions of 0, got %v\n", v.Subscriptions) + if v.Subscriptions <= 10 { + t.Fatalf("Expected Subscriptions of at least 10, got %v\n", v.Subscriptions) } - if v.Name != "monitor_server" { - t.Fatal("Expected ServerName to be 'monitor_server'") + if v.Name != "server_-1" { + t.Fatalf("Expected ServerName to be 'monitor_server' got %q", v.Name) } if !v.Tags.Contains("tag") { - t.Fatal("Expected tags to be 'tag'") + t.Fatalf("Expected tags to be 'tag' got %v", v.Tags) + } + if v.JetStream.Config == nil { + t.Fatalf("JS Config not set") + } + sd := filepath.Join(s.opts.StoreDir, "jetstream") + if v.JetStream.Config.StoreDir != sd { + t.Fatalf("JS Config is invalid expected %q got %q", sd, v.JetStream.Config.StoreDir) + } + if v.JetStream.Stats == nil { + t.Fatalf("JS Stats not set") + } + if v.JetStream.Stats.Accounts != 2 { + t.Fatalf("Invalid stats expected 2 accounts got %d", v.JetStream.Stats.Accounts) + } + if v.JetStream.Limits == nil { + t.Fatalf("JS limits not set") + } + if v.JetStream.Limits.MaxHAAssets != 1000 { + t.Fatalf("Expected 1000 max_ha_assets got %q", v.JetStream.Limits.MaxHAAssets) } } @@ -4274,43 +4335,7 @@ func TestMonitorJsz(t *testing.T) { {7500, 7501, 7502, 5502}, {5500, 5501, 5502, 7502}, } { - tmpDir := t.TempDir() - cf := createConfFile(t, []byte(fmt.Sprintf(` - listen: 127.0.0.1:%d - http: 127.0.0.1:%d - system_account: SYS - accounts { - SYS { - users [{user: sys, password: pwd}] - } - ACC { - users [{user: usr, password: pwd}] - // In clustered mode, these reservations will not impact any one server. - jetstream: {max_store: 4Mb, max_memory: 5Mb} - } - BCC_TO_HAVE_ONE_EXTRA { - users [{user: usr2, password: pwd}] - jetstream: enabled - } - } - jetstream: { - max_mem_store: 10Mb - max_file_store: 10Mb - store_dir: '%s' - unique_tag: az - limits: { - max_ha_assets: 1000 - } - } - cluster { - name: cluster_name - listen: 127.0.0.1:%d - routes: [nats-route://127.0.0.1:%d] - } - server_name: server_%d - server_tags: [ "az:%d" ] `, test.port, test.mport, tmpDir, test.cport, test.routed, test.port, test.port))) - - s, _ := RunServerWithConfig(cf) + s, _ := runMonitorJSServer(t, test.port, test.mport, test.cport, test.routed) defer s.Shutdown() srvs = append(srvs, s) }