From 1691b059a7f1bd0d3b1848dfc599e2ded164e3ea Mon Sep 17 00:00:00 2001 From: Piotr Piotrowski Date: Mon, 27 Feb 2023 16:25:07 +0100 Subject: [PATCH] Improve account polling (#129) --- go.mod | 16 +-- go.sum | 32 ++--- surveyor/collector_statz.go | 226 ++++++++++++++---------------------- 3 files changed, 110 insertions(+), 164 deletions(-) diff --git a/go.mod b/go.mod index f9a8c7b..7eaad22 100644 --- a/go.mod +++ b/go.mod @@ -6,15 +6,15 @@ require ( github.com/antonfisher/nested-logrus-formatter v1.3.1 github.com/fsnotify/fsnotify v1.6.0 github.com/nats-io/jsm.go v0.0.33 - github.com/nats-io/nats-server/v2 v2.8.4 - github.com/nats-io/nats.go v1.16.0 + github.com/nats-io/nats-server/v2 v2.9.15-0.20230221163408-7ce23c46cecb + github.com/nats-io/nats.go v1.23.0 github.com/nats-io/nuid v1.0.1 github.com/prometheus/client_golang v1.13.0 github.com/prometheus/client_model v0.2.0 github.com/sirupsen/logrus v1.9.0 github.com/spf13/cobra v1.5.0 github.com/spf13/viper v1.12.0 - golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4 + golang.org/x/crypto v0.5.0 ) require ( @@ -25,13 +25,13 @@ require ( github.com/golang/protobuf v1.5.2 // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/inconshreveable/mousetrap v1.0.0 // indirect - github.com/klauspost/compress v1.15.5 // indirect + github.com/klauspost/compress v1.15.15 // indirect github.com/magiconair/properties v1.8.6 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect github.com/minio/highwayhash v1.0.2 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect - github.com/nats-io/jwt/v2 v2.2.1-0.20220330180145-442af02fd36a // indirect - github.com/nats-io/nkeys v0.3.0 // indirect + github.com/nats-io/jwt/v2 v2.3.1-0.20221227170542-bdf40fa3627b // indirect + github.com/nats-io/nkeys v0.3.1-0.20221215194120-47c7408e7546 // indirect github.com/pelletier/go-toml v1.9.5 // indirect github.com/pelletier/go-toml/v2 v2.0.1 // indirect github.com/prometheus/common v0.37.0 // indirect @@ -42,8 +42,8 @@ require ( github.com/spf13/pflag v1.0.5 // indirect github.com/subosito/gotenv v1.3.0 // indirect golang.org/x/sys v0.4.0 // indirect - golang.org/x/text v0.3.7 // indirect - golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 // indirect + golang.org/x/text v0.6.0 // indirect + golang.org/x/time v0.3.0 // indirect google.golang.org/protobuf v1.28.1 // indirect gopkg.in/ini.v1 v1.66.4 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect diff --git a/go.sum b/go.sum index 144d54f..11037b7 100644 --- a/go.sum +++ b/go.sum @@ -166,8 +166,8 @@ github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/X github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/klauspost/compress v1.15.5 h1:qyCLMz2JCrKADihKOh9FxnW3houKeNsp2h5OEz0QSEA= -github.com/klauspost/compress v1.15.5/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= +github.com/klauspost/compress v1.15.15 h1:EF27CXIuDsYJ6mmvtBRlEuB2UVOqHG1tAXgZ7yIO+lw= +github.com/klauspost/compress v1.15.15/go.mod h1:ZcK2JAFqKOpnBlxcLsJzYfrS9X1akm9fHZNnD9+Vo/4= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg= @@ -194,14 +194,14 @@ github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRW github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/nats-io/jsm.go v0.0.33 h1:mNxlZEnSiHo9BwAFpjZYuopVvtwVUdtoAana2ovyWOU= github.com/nats-io/jsm.go v0.0.33/go.mod h1:1ySvWrDbPo/Rs1v0Ccoy7QjZKBGfVhvmolfJRBX+fCg= -github.com/nats-io/jwt/v2 v2.2.1-0.20220330180145-442af02fd36a h1:lem6QCvxR0Y28gth9P+wV2K/zYUUAkJ+55U8cpS0p5I= -github.com/nats-io/jwt/v2 v2.2.1-0.20220330180145-442af02fd36a/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k= -github.com/nats-io/nats-server/v2 v2.8.4 h1:0jQzze1T9mECg8YZEl8+WYUXb9JKluJfCBriPUtluB4= -github.com/nats-io/nats-server/v2 v2.8.4/go.mod h1:8zZa+Al3WsESfmgSs98Fi06dRWLH5Bnq90m5bKD/eT4= -github.com/nats-io/nats.go v1.16.0 h1:zvLE7fGBQYW6MWaFaRdsgm9qT39PJDQoju+DS8KsO1g= -github.com/nats-io/nats.go v1.16.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= -github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= -github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= +github.com/nats-io/jwt/v2 v2.3.1-0.20221227170542-bdf40fa3627b h1:exHeHbghpBp1JvdYq7muaKFvJgLD93UDcmoIbFu/9PA= +github.com/nats-io/jwt/v2 v2.3.1-0.20221227170542-bdf40fa3627b/go.mod h1:DYujvzCMZzUuqB3i1Pnpf1YtkuTwhdI84Aah9wRXkK0= +github.com/nats-io/nats-server/v2 v2.9.15-0.20230221163408-7ce23c46cecb h1:d7JNXl3VkPEedckKHK4M0j3OHF2wAk4LACxyRs76OaY= +github.com/nats-io/nats-server/v2 v2.9.15-0.20230221163408-7ce23c46cecb/go.mod h1:AT/C9XuOPGsozg2dfiS+9vK0Ge4jheffj8uL/kMGPtw= +github.com/nats-io/nats.go v1.23.0 h1:lR28r7IX44WjYgdiKz9GmUeW0uh/m33uD3yEjLZ2cOE= +github.com/nats-io/nats.go v1.23.0/go.mod h1:ki/Scsa23edbh8IRZbCuNXR9TDcbvfaSijKtaqQgw+Q= +github.com/nats-io/nkeys v0.3.1-0.20221215194120-47c7408e7546 h1:7ZylVLLiDSFqxJTSibNsO2RjVSXj3QWnDc+zKara2HE= +github.com/nats-io/nkeys v0.3.1-0.20221215194120-47c7408e7546/go.mod h1:JOEZlxMfMnmaLwr+mpmP+RGIYSxLNBFsZykCGaI2PvA= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/pelletier/go-toml v1.9.5 h1:4yBQzkHv+7BHq2PQUZF3Mx0IYxG7LsP222s7Agd3ve8= @@ -286,11 +286,10 @@ golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20211108221036-ceb1ce70b4fa/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4 h1:kUhD7nTDoI3fVd9G4ORWrbV5NY0liEs/Jg2pv5f+bBA= -golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/crypto v0.5.0 h1:U/0M97KRkSFvyD/3FSmdP5W5swImpNgle/EHFhOsQPE= +golang.org/x/crypto v0.5.0/go.mod h1:NK/OQwhpMQP3MwtdjgLlYHnH9ebylxKWv3e0fK+mkQU= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -440,13 +439,14 @@ golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.6.0 h1:3XmdazWV+ubf7QgHSTWeykHOci5oeekaGJBLkrkaw4k= +golang.org/x/text v0.6.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= -golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 h1:vVKdlvoWBphwdxWKrFZEuM0kGgGLxUOYcY4U/2Vjg44= -golang.org/x/time v0.0.0-20220210224613-90d013bbcef8/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4= +golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= diff --git a/surveyor/collector_statz.go b/surveyor/collector_statz.go index 411b1b6..14f2f69 100644 --- a/surveyor/collector_statz.go +++ b/surveyor/collector_statz.go @@ -505,178 +505,124 @@ func (sc *StatzCollector) poll() error { func (sc *StatzCollector) pollAccountInfo() error { nc := sc.nc - accs, err := sc.getAccounts(nc) + accs, err := sc.getAccStatz(nc) if err != nil { return err } - accStats := make([]accountStats, 0, len(accs)) + accStats := make(map[string]accountStats, len(accs)) for _, acc := range accs { - sts := accountStats{accountID: acc} + sts := accountStats{accountID: acc.Account} - accInfo, err := sc.getAccountInfo(nc, acc) - if err != nil { - sc.logger.Warnf("could not get info for account %q: %s", acc, err) + sts.leafCount = float64(acc.LeafNodes) + sts.subCount = float64(acc.NumSubs) + sts.connCount = float64(acc.Conns) + sts.bytesSent = float64(acc.Sent.Bytes) + sts.bytesRecv = float64(acc.Received.Bytes) + sts.msgsSent = float64(acc.Sent.Msgs) + sts.msgsRecv = float64(acc.Received.Msgs) + + accStats[acc.Account] = sts + } + jsInfos := sc.getJSInfos(nc, accs) + for _, jsInfo := range jsInfos { + sts, ok := accStats[jsInfo.Id] + if !ok { continue } - sts.leafCount = float64(accInfo.LeafCnt) - sts.subCount = float64(accInfo.SubCnt) - - if accInfo.JetStream { - sts.jetstreamEnabled = 1.0 - - jsInfo, err := sc.getJSInfo(nc, acc) - if err != nil { - sc.logger.Warnf("could not get JetStream info for account %q: %s", acc, err) - } else { - sts.jetstreamMemoryUsed = float64(jsInfo.Memory) - sts.jetstreamStorageUsed = float64(jsInfo.Store) - sts.jetstreamMemoryReserved = float64(jsInfo.ReservedMemory) - sts.jetstreamStorageReserved = float64(jsInfo.ReservedStore) - - sts.jetstreamStreamCount = float64(len(jsInfo.Streams)) - for _, stream := range jsInfo.Streams { - sts.jetstreamStreams = append(sts.jetstreamStreams, streamAccountStats{ - streamName: stream.Name, - consumerCount: float64(len(stream.Consumer)), - replicaCount: float64(stream.Config.Replicas), - }) - } - } + sts.jetstreamEnabled = 1.0 + sts.jetstreamMemoryUsed = float64(jsInfo.Memory) + sts.jetstreamStorageUsed = float64(jsInfo.Store) + sts.jetstreamMemoryReserved = float64(jsInfo.ReservedMemory) + sts.jetstreamStorageReserved = float64(jsInfo.ReservedStore) + + sts.jetstreamStreamCount = float64(len(jsInfo.Streams)) + for _, stream := range jsInfo.Streams { + sts.jetstreamStreams = append(sts.jetstreamStreams, streamAccountStats{ + streamName: stream.Name, + consumerCount: float64(len(stream.Consumer)), + replicaCount: float64(stream.Config.Replicas), + }) } - - agg, err := sc.getConnzAggregate(nc, acc) - if err != nil { - sc.logger.Warnf("could not get connection statistics for account %q: %s", acc, err) - } else { - sts.connCount = agg.numConns - sts.bytesSent = agg.bytesSent - sts.bytesRecv = agg.bytesRecv - sts.msgsSent = agg.msgsSent - sts.msgsRecv = agg.msgsRecv - } - - accStats = append(accStats, sts) + accStats[jsInfo.Id] = sts } sc.Lock() - sc.accStats = accStats + sc.accStats = make([]accountStats, 0, len(accStats)) + for _, acc := range accStats { + sc.accStats = append(sc.accStats, acc) + } sc.Unlock() return nil } -func (sc *StatzCollector) getAccounts(nc *nats.Conn) ([]string, error) { - const subj = "$SYS.REQ.SERVER.PING.ACCOUNTZ" - msg, err := nc.Request(subj, nil, sc.pollTimeout) - if err != nil { - return nil, err - } - - var r server.ServerAPIResponse - var d server.Accountz - r.Data = &d - if err := json.Unmarshal(msg.Data, &r); err != nil { - return nil, err - } - - sort.Strings(d.Accounts) - return d.Accounts, nil -} -func (sc *StatzCollector) getAccountInfo(nc *nats.Conn, account string) (server.AccountInfo, error) { - subj := fmt.Sprintf("$SYS.REQ.ACCOUNT.%s.INFO", account) - msg, err := nc.Request(subj, nil, sc.pollTimeout) +func (sc *StatzCollector) getJSInfos(nc *nats.Conn, accounts []*server.AccountStat) []*server.AccountDetail { + inbox := nc.NewRespInbox() + sub, err := nc.SubscribeSync(inbox) if err != nil { - return server.AccountInfo{}, err + sc.logger.Warnf("Error creating nats subscription: %s", err) + return nil } - - var r server.ServerAPIResponse - var d server.AccountInfo - r.Data = &d - if err := json.Unmarshal(msg.Data, &r); err != nil { - return server.AccountInfo{}, err - } - - return d, nil -} - -func (sc *StatzCollector) getJSInfo(nc *nats.Conn, account string) (server.AccountDetail, error) { - subj := fmt.Sprintf("$SYS.REQ.ACCOUNT.%s.JSZ", account) + defer sub.Unsubscribe() opts := []byte(`{"streams": true, "consumer": true, "config": true}`) - msg, err := nc.Request(subj, opts, sc.pollTimeout) - if err != nil { - return server.AccountDetail{}, err + reqDispatched := len(accounts) + for _, acc := range accounts { + subj := fmt.Sprintf("$SYS.REQ.ACCOUNT.%s.JSZ", acc.Account) + if err := nc.PublishRequest(subj, inbox, opts); err != nil { + reqDispatched-- + sc.logger.Warnf("Unable to request JetStream info for account %s: %s", acc.Account, err.Error()) + continue + } } - var r server.ServerAPIResponse - var d server.AccountDetail - r.Data = &d - if err := json.Unmarshal(msg.Data, &r); err != nil { - return server.AccountDetail{}, err + res := make([]*server.AccountDetail, 0, len(accounts)) + for i := 0; i < reqDispatched; i++ { + msg, err := sub.NextMsg(sc.pollTimeout) + if err != nil { + sc.logger.Warnf("Error fetching JetStream info: %s", err) + continue + } + var r server.ServerAPIResponse + var d server.AccountDetail + r.Data = &d + if err := json.Unmarshal(msg.Data, &r); err != nil { + sc.logger.Warnf("Error deserializing JetStream info: %s", err) + continue + } + if r.Error != nil { + if strings.Contains(r.Error.Description, "jetstream not enabled") { + // jetstream is not enabled on server + return nil + } + continue + } + res = append(res, &d) } - - return d, nil + return res } -type connzAggregate struct { - bytesSent float64 - bytesRecv float64 - msgsSent float64 - msgsRecv float64 - numConns float64 -} - -func (sc *StatzCollector) getConnzAggregate(nc *nats.Conn, account string) (connzAggregate, error) { - // TODO: Replace with "$SYS.REQ.ACCOUNT.%s.CONNS" after NATS 2.8.4. - // CONNS returns bytes sent/recv at the account level without needing the - // following code. - subj := fmt.Sprintf("$SYS.REQ.ACCOUNT.%s.CONNZ", account) - - rep := nc.NewRespInbox() - - msg := nats.NewMsg(subj) - msg.Reply = rep - msg.Data = nil - - s, err := nc.SubscribeSync(msg.Reply) +func (sc *StatzCollector) getAccStatz(nc *nats.Conn) ([]*server.AccountStat, error) { + req := &server.AccountStatzOptions{ + IncludeUnused: true, + } + reqJSON, err := json.Marshal(req) if err != nil { - return connzAggregate{}, err + return nil, err } - defer s.Unsubscribe() - - if err := nc.PublishMsg(msg); err != nil { - return connzAggregate{}, err + const subj = "$SYS.REQ.ACCOUNT.PING.STATZ" + msg, err := nc.Request(subj, reqJSON, sc.pollTimeout) + if err != nil { + return nil, err } - - var agg connzAggregate var r server.ServerAPIResponse - var d server.Connz + var d server.AccountStatz r.Data = &d - - for i := 0; i < sc.numServers; i++ { - m, err := s.NextMsg(sc.pollTimeout) - if err != nil && err == nats.ErrTimeout { - break - } - if err != nil { - return connzAggregate{}, err - } - - if err := json.Unmarshal(m.Data, &r); err != nil { - return connzAggregate{}, err - } - - agg.numConns += float64(d.NumConns) - - for _, c := range d.Conns { - agg.bytesSent += float64(c.InBytes) - agg.bytesRecv += float64(c.OutBytes) - agg.msgsSent += float64(c.InMsgs) - agg.msgsRecv += float64(c.OutMsgs) - } + if err := json.Unmarshal(msg.Data, &r); err != nil { + return nil, err } - return agg, nil + return d.Accounts, nil } // Describe is the Prometheus interface to describe metrics for