Skip to content

Commit

Permalink
Improve account polling (#129) (#135)
Browse files Browse the repository at this point in the history
  • Loading branch information
piotrpio authored Mar 15, 2023
1 parent b33957c commit da2b2f6
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 165 deletions.
16 changes: 8 additions & 8 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ require (
github.com/antonfisher/nested-logrus-formatter v1.3.1
github.com/fsnotify/fsnotify v1.6.0
github.com/nats-io/jsm.go v0.0.33
github.com/nats-io/nats-server/v2 v2.8.4
github.com/nats-io/nats.go v1.16.0
github.com/nats-io/nats-server/v2 v2.9.15-0.20230221163408-7ce23c46cecb
github.com/nats-io/nats.go v1.23.0
github.com/nats-io/nuid v1.0.1
github.com/prometheus/client_golang v1.13.0
github.com/prometheus/client_model v0.2.0
github.com/sirupsen/logrus v1.9.0
github.com/spf13/cobra v1.5.0
github.com/spf13/viper v1.12.0
golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4
golang.org/x/crypto v0.5.0
)

require (
Expand All @@ -25,13 +25,13 @@ require (
github.com/golang/protobuf v1.5.2 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/klauspost/compress v1.15.5 // indirect
github.com/klauspost/compress v1.15.15 // indirect
github.com/magiconair/properties v1.8.6 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/minio/highwayhash v1.0.2 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/nats-io/jwt/v2 v2.2.1-0.20220330180145-442af02fd36a // indirect
github.com/nats-io/nkeys v0.3.0 // indirect
github.com/nats-io/jwt/v2 v2.3.1-0.20221227170542-bdf40fa3627b // indirect
github.com/nats-io/nkeys v0.3.1-0.20221215194120-47c7408e7546 // indirect
github.com/pelletier/go-toml v1.9.5 // indirect
github.com/pelletier/go-toml/v2 v2.0.1 // indirect
github.com/prometheus/common v0.37.0 // indirect
Expand All @@ -42,8 +42,8 @@ require (
github.com/spf13/pflag v1.0.5 // indirect
github.com/subosito/gotenv v1.3.0 // indirect
golang.org/x/sys v0.4.0 // indirect
golang.org/x/text v0.3.8 // indirect
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 // indirect
golang.org/x/text v0.6.0 // indirect
golang.org/x/time v0.3.0 // indirect
google.golang.org/protobuf v1.28.1 // indirect
gopkg.in/ini.v1 v1.66.4 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
Expand Down
33 changes: 16 additions & 17 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,8 @@ github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/X
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.15.5 h1:qyCLMz2JCrKADihKOh9FxnW3houKeNsp2h5OEz0QSEA=
github.com/klauspost/compress v1.15.5/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
github.com/klauspost/compress v1.15.15 h1:EF27CXIuDsYJ6mmvtBRlEuB2UVOqHG1tAXgZ7yIO+lw=
github.com/klauspost/compress v1.15.15/go.mod h1:ZcK2JAFqKOpnBlxcLsJzYfrS9X1akm9fHZNnD9+Vo/4=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg=
Expand All @@ -194,14 +194,14 @@ github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRW
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/nats-io/jsm.go v0.0.33 h1:mNxlZEnSiHo9BwAFpjZYuopVvtwVUdtoAana2ovyWOU=
github.com/nats-io/jsm.go v0.0.33/go.mod h1:1ySvWrDbPo/Rs1v0Ccoy7QjZKBGfVhvmolfJRBX+fCg=
github.com/nats-io/jwt/v2 v2.2.1-0.20220330180145-442af02fd36a h1:lem6QCvxR0Y28gth9P+wV2K/zYUUAkJ+55U8cpS0p5I=
github.com/nats-io/jwt/v2 v2.2.1-0.20220330180145-442af02fd36a/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k=
github.com/nats-io/nats-server/v2 v2.8.4 h1:0jQzze1T9mECg8YZEl8+WYUXb9JKluJfCBriPUtluB4=
github.com/nats-io/nats-server/v2 v2.8.4/go.mod h1:8zZa+Al3WsESfmgSs98Fi06dRWLH5Bnq90m5bKD/eT4=
github.com/nats-io/nats.go v1.16.0 h1:zvLE7fGBQYW6MWaFaRdsgm9qT39PJDQoju+DS8KsO1g=
github.com/nats-io/nats.go v1.16.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
github.com/nats-io/jwt/v2 v2.3.1-0.20221227170542-bdf40fa3627b h1:exHeHbghpBp1JvdYq7muaKFvJgLD93UDcmoIbFu/9PA=
github.com/nats-io/jwt/v2 v2.3.1-0.20221227170542-bdf40fa3627b/go.mod h1:DYujvzCMZzUuqB3i1Pnpf1YtkuTwhdI84Aah9wRXkK0=
github.com/nats-io/nats-server/v2 v2.9.15-0.20230221163408-7ce23c46cecb h1:d7JNXl3VkPEedckKHK4M0j3OHF2wAk4LACxyRs76OaY=
github.com/nats-io/nats-server/v2 v2.9.15-0.20230221163408-7ce23c46cecb/go.mod h1:AT/C9XuOPGsozg2dfiS+9vK0Ge4jheffj8uL/kMGPtw=
github.com/nats-io/nats.go v1.23.0 h1:lR28r7IX44WjYgdiKz9GmUeW0uh/m33uD3yEjLZ2cOE=
github.com/nats-io/nats.go v1.23.0/go.mod h1:ki/Scsa23edbh8IRZbCuNXR9TDcbvfaSijKtaqQgw+Q=
github.com/nats-io/nkeys v0.3.1-0.20221215194120-47c7408e7546 h1:7ZylVLLiDSFqxJTSibNsO2RjVSXj3QWnDc+zKara2HE=
github.com/nats-io/nkeys v0.3.1-0.20221215194120-47c7408e7546/go.mod h1:JOEZlxMfMnmaLwr+mpmP+RGIYSxLNBFsZykCGaI2PvA=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/pelletier/go-toml v1.9.5 h1:4yBQzkHv+7BHq2PQUZF3Mx0IYxG7LsP222s7Agd3ve8=
Expand Down Expand Up @@ -286,11 +286,10 @@ golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8U
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
golang.org/x/crypto v0.0.0-20211108221036-ceb1ce70b4fa/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4 h1:kUhD7nTDoI3fVd9G4ORWrbV5NY0liEs/Jg2pv5f+bBA=
golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.5.0 h1:U/0M97KRkSFvyD/3FSmdP5W5swImpNgle/EHFhOsQPE=
golang.org/x/crypto v0.5.0/go.mod h1:NK/OQwhpMQP3MwtdjgLlYHnH9ebylxKWv3e0fK+mkQU=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
Expand Down Expand Up @@ -441,13 +440,13 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.3.8 h1:nAL+RVCQ9uMn3vJZbV+MRnydTJFPf8qqY42YiA6MrqY=
golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
golang.org/x/text v0.6.0 h1:3XmdazWV+ubf7QgHSTWeykHOci5oeekaGJBLkrkaw4k=
golang.org/x/text v0.6.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 h1:vVKdlvoWBphwdxWKrFZEuM0kGgGLxUOYcY4U/2Vjg44=
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4=
golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
Expand Down
226 changes: 86 additions & 140 deletions surveyor/collector_statz.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,178 +527,124 @@ func (sc *StatzCollector) poll() error {

func (sc *StatzCollector) pollAccountInfo() error {
nc := sc.nc
accs, err := sc.getAccounts(nc)
accs, err := sc.getAccStatz(nc)
if err != nil {
return err
}

accStats := make([]accountStats, 0, len(accs))
accStats := make(map[string]accountStats, len(accs))
for _, acc := range accs {
sts := accountStats{accountID: acc}
sts := accountStats{accountID: acc.Account}

accInfo, err := sc.getAccountInfo(nc, acc)
if err != nil {
sc.logger.Warnf("could not get info for account %q: %s", acc, err)
sts.leafCount = float64(acc.LeafNodes)
sts.subCount = float64(acc.NumSubs)
sts.connCount = float64(acc.Conns)
sts.bytesSent = float64(acc.Sent.Bytes)
sts.bytesRecv = float64(acc.Received.Bytes)
sts.msgsSent = float64(acc.Sent.Msgs)
sts.msgsRecv = float64(acc.Received.Msgs)

accStats[acc.Account] = sts
}
jsInfos := sc.getJSInfos(nc, accs)
for _, jsInfo := range jsInfos {
sts, ok := accStats[jsInfo.Id]
if !ok {
continue
}
sts.leafCount = float64(accInfo.LeafCnt)
sts.subCount = float64(accInfo.SubCnt)

if accInfo.JetStream {
sts.jetstreamEnabled = 1.0

jsInfo, err := sc.getJSInfo(nc, acc)
if err != nil {
sc.logger.Warnf("could not get JetStream info for account %q: %s", acc, err)
} else {
sts.jetstreamMemoryUsed = float64(jsInfo.Memory)
sts.jetstreamStorageUsed = float64(jsInfo.Store)
sts.jetstreamMemoryReserved = float64(jsInfo.ReservedMemory)
sts.jetstreamStorageReserved = float64(jsInfo.ReservedStore)

sts.jetstreamStreamCount = float64(len(jsInfo.Streams))
for _, stream := range jsInfo.Streams {
sts.jetstreamStreams = append(sts.jetstreamStreams, streamAccountStats{
streamName: stream.Name,
consumerCount: float64(len(stream.Consumer)),
replicaCount: float64(stream.Config.Replicas),
})
}
}
sts.jetstreamEnabled = 1.0
sts.jetstreamMemoryUsed = float64(jsInfo.Memory)
sts.jetstreamStorageUsed = float64(jsInfo.Store)
sts.jetstreamMemoryReserved = float64(jsInfo.ReservedMemory)
sts.jetstreamStorageReserved = float64(jsInfo.ReservedStore)

sts.jetstreamStreamCount = float64(len(jsInfo.Streams))
for _, stream := range jsInfo.Streams {
sts.jetstreamStreams = append(sts.jetstreamStreams, streamAccountStats{
streamName: stream.Name,
consumerCount: float64(len(stream.Consumer)),
replicaCount: float64(stream.Config.Replicas),
})
}

agg, err := sc.getConnzAggregate(nc, acc)
if err != nil {
sc.logger.Warnf("could not get connection statistics for account %q: %s", acc, err)
} else {
sts.connCount = agg.numConns
sts.bytesSent = agg.bytesSent
sts.bytesRecv = agg.bytesRecv
sts.msgsSent = agg.msgsSent
sts.msgsRecv = agg.msgsRecv
}

accStats = append(accStats, sts)
accStats[jsInfo.Id] = sts
}

sc.Lock()
sc.accStats = accStats
sc.accStats = make([]accountStats, 0, len(accStats))
for _, acc := range accStats {
sc.accStats = append(sc.accStats, acc)
}
sc.Unlock()

return nil
}

func (sc *StatzCollector) getAccounts(nc *nats.Conn) ([]string, error) {
const subj = "$SYS.REQ.SERVER.PING.ACCOUNTZ"
msg, err := nc.Request(subj, nil, sc.pollTimeout)
if err != nil {
return nil, err
}

var r server.ServerAPIResponse
var d server.Accountz
r.Data = &d
if err := json.Unmarshal(msg.Data, &r); err != nil {
return nil, err
}

sort.Strings(d.Accounts)
return d.Accounts, nil
}
func (sc *StatzCollector) getAccountInfo(nc *nats.Conn, account string) (server.AccountInfo, error) {
subj := fmt.Sprintf("$SYS.REQ.ACCOUNT.%s.INFO", account)
msg, err := nc.Request(subj, nil, sc.pollTimeout)
func (sc *StatzCollector) getJSInfos(nc *nats.Conn, accounts []*server.AccountStat) []*server.AccountDetail {
inbox := nc.NewRespInbox()
sub, err := nc.SubscribeSync(inbox)
if err != nil {
return server.AccountInfo{}, err
sc.logger.Warnf("Error creating nats subscription: %s", err)
return nil
}

var r server.ServerAPIResponse
var d server.AccountInfo
r.Data = &d
if err := json.Unmarshal(msg.Data, &r); err != nil {
return server.AccountInfo{}, err
}

return d, nil
}

func (sc *StatzCollector) getJSInfo(nc *nats.Conn, account string) (server.AccountDetail, error) {
subj := fmt.Sprintf("$SYS.REQ.ACCOUNT.%s.JSZ", account)
defer sub.Unsubscribe()
opts := []byte(`{"streams": true, "consumer": true, "config": true}`)
msg, err := nc.Request(subj, opts, sc.pollTimeout)
if err != nil {
return server.AccountDetail{}, err
reqDispatched := len(accounts)
for _, acc := range accounts {
subj := fmt.Sprintf("$SYS.REQ.ACCOUNT.%s.JSZ", acc.Account)
if err := nc.PublishRequest(subj, inbox, opts); err != nil {
reqDispatched--
sc.logger.Warnf("Unable to request JetStream info for account %s: %s", acc.Account, err.Error())
continue
}
}

var r server.ServerAPIResponse
var d server.AccountDetail
r.Data = &d
if err := json.Unmarshal(msg.Data, &r); err != nil {
return server.AccountDetail{}, err
res := make([]*server.AccountDetail, 0, len(accounts))
for i := 0; i < reqDispatched; i++ {
msg, err := sub.NextMsg(sc.pollTimeout)
if err != nil {
sc.logger.Warnf("Error fetching JetStream info: %s", err)
continue
}
var r server.ServerAPIResponse
var d server.AccountDetail
r.Data = &d
if err := json.Unmarshal(msg.Data, &r); err != nil {
sc.logger.Warnf("Error deserializing JetStream info: %s", err)
continue
}
if r.Error != nil {
if strings.Contains(r.Error.Description, "jetstream not enabled") {
// jetstream is not enabled on server
return nil
}
continue
}
res = append(res, &d)
}

return d, nil
return res
}

type connzAggregate struct {
bytesSent float64
bytesRecv float64
msgsSent float64
msgsRecv float64
numConns float64
}

func (sc *StatzCollector) getConnzAggregate(nc *nats.Conn, account string) (connzAggregate, error) {
// TODO: Replace with "$SYS.REQ.ACCOUNT.%s.CONNS" after NATS 2.8.4.
// CONNS returns bytes sent/recv at the account level without needing the
// following code.
subj := fmt.Sprintf("$SYS.REQ.ACCOUNT.%s.CONNZ", account)

rep := nc.NewRespInbox()

msg := nats.NewMsg(subj)
msg.Reply = rep
msg.Data = nil

s, err := nc.SubscribeSync(msg.Reply)
func (sc *StatzCollector) getAccStatz(nc *nats.Conn) ([]*server.AccountStat, error) {
req := &server.AccountStatzOptions{
IncludeUnused: true,
}
reqJSON, err := json.Marshal(req)
if err != nil {
return connzAggregate{}, err
return nil, err
}
defer s.Unsubscribe()

if err := nc.PublishMsg(msg); err != nil {
return connzAggregate{}, err
const subj = "$SYS.REQ.ACCOUNT.PING.STATZ"
msg, err := nc.Request(subj, reqJSON, sc.pollTimeout)
if err != nil {
return nil, err
}

var agg connzAggregate
var r server.ServerAPIResponse
var d server.Connz
var d server.AccountStatz
r.Data = &d

for i := 0; i < sc.numServers; i++ {
m, err := s.NextMsg(sc.pollTimeout)
if err != nil && err == nats.ErrTimeout {
break
}
if err != nil {
return connzAggregate{}, err
}

if err := json.Unmarshal(m.Data, &r); err != nil {
return connzAggregate{}, err
}

agg.numConns += float64(d.NumConns)

for _, c := range d.Conns {
agg.bytesSent += float64(c.InBytes)
agg.bytesRecv += float64(c.OutBytes)
agg.msgsSent += float64(c.InMsgs)
agg.msgsRecv += float64(c.OutMsgs)
}
if err := json.Unmarshal(msg.Data, &r); err != nil {
return nil, err
}

return agg, nil
return d.Accounts, nil
}

// Describe is the Prometheus interface to describe metrics for
Expand Down

0 comments on commit da2b2f6

Please sign in to comment.