Skip to content

Commit

Permalink
nsqd: introspection of producer connections
Browse files Browse the repository at this point in the history
  • Loading branch information
sparklxb authored and mreiferson committed Aug 2, 2018
1 parent 8c9a66a commit 996be6d
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 8 deletions.
25 changes: 25 additions & 0 deletions nsqd/client_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ type clientV2 struct {
FinishCount uint64
RequeueCount uint64

pubCounts map[string]uint64

writeLock sync.RWMutex
metaLock sync.RWMutex

Expand Down Expand Up @@ -141,6 +143,8 @@ func newClientV2(id int64, conn net.Conn, ctx *context) *clientV2 {

// heartbeats are client configurable but default to 30s
HeartbeatInterval: ctx.nsqd.getOpts().ClientTimeout / 2,

pubCounts: make(map[string]uint64),
}
c.lenSlice = c.lenBuf[:]
return c
Expand Down Expand Up @@ -211,6 +215,13 @@ func (c *clientV2) Stats() ClientStats {
identity = c.AuthState.Identity
identityURL = c.AuthState.IdentityURL
}
pubCounts := make([]PubCount, 0, len(c.pubCounts))
for topic, count := range c.pubCounts {
pubCounts = append(pubCounts, PubCount{
Topic: topic,
Count: count,
})
}
c.metaLock.RUnlock()
stats := ClientStats{
Version: "V2",
Expand All @@ -232,6 +243,7 @@ func (c *clientV2) Stats() ClientStats {
Authed: c.HasAuthorizations(),
AuthIdentity: identity,
AuthIdentityURL: identityURL,
PubCounts: pubCounts,
}
if stats.TLS {
p := prettyConnectionState{c.tlsConn.ConnectionState()}
Expand All @@ -243,6 +255,13 @@ func (c *clientV2) Stats() ClientStats {
return stats
}

func (c *clientV2) IsProducer() bool {
c.metaLock.RLock()
retval := len(c.pubCounts) > 0
c.metaLock.RUnlock()
return retval
}

// struct to convert from integers to the human readable strings
type prettyConnectionState struct {
tls.ConnectionState
Expand Down Expand Up @@ -343,6 +362,12 @@ func (c *clientV2) SendingMessage() {
atomic.AddUint64(&c.MessageCount, 1)
}

func (c *clientV2) PublishedMessage(topic string, count uint64) {
c.metaLock.Lock()
c.pubCounts[topic] += count
c.metaLock.Unlock()
}

func (c *clientV2) TimedOutMessage() {
atomic.AddInt64(&c.InFlightCount, -1)
c.tryUpdateReadyState()
Expand Down
69 changes: 61 additions & 8 deletions nsqd/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,8 @@ func (s *httpServer) doStats(w http.ResponseWriter, req *http.Request, ps httpro
channelName, _ := reqParams.Get("channel")
jsonFormat := formatString == "json"

producerStats := s.ctx.nsqd.GetProducerStats()

stats := s.ctx.nsqd.GetStats(topicName, channelName)
health := s.ctx.nsqd.GetHealth()
startTime := s.ctx.nsqd.GetStartTime()
Expand All @@ -503,23 +505,46 @@ func (s *httpServer) doStats(w http.ResponseWriter, req *http.Request, ps httpro
break
}
}

filteredProducerStats := make([]ClientStats, 0)
for _, clientStat := range producerStats {
var found bool
var count uint64
for _, v := range clientStat.PubCounts {
if v.Topic == topicName {
count = v.Count
found = true
break
}
}
if !found {
continue
}
clientStat.PubCounts = []PubCount{PubCount{
Topic: topicName,
Count: count,
}}
filteredProducerStats = append(filteredProducerStats, clientStat)
}
producerStats = filteredProducerStats
}

ms := getMemStats()
if !jsonFormat {
return s.printStats(stats, ms, health, startTime, uptime), nil
return s.printStats(stats, producerStats, ms, health, startTime, uptime), nil
}

return struct {
Version string `json:"version"`
Health string `json:"health"`
StartTime int64 `json:"start_time"`
Topics []TopicStats `json:"topics"`
Memory memStats `json:"memory"`
}{version.Binary, health, startTime.Unix(), stats, ms}, nil
Version string `json:"version"`
Health string `json:"health"`
StartTime int64 `json:"start_time"`
Topics []TopicStats `json:"topics"`
Memory memStats `json:"memory"`
Producers []ClientStats `json:"producers"`
}{version.Binary, health, startTime.Unix(), stats, ms, producerStats}, nil
}

func (s *httpServer) printStats(stats []TopicStats, ms memStats, health string, startTime time.Time, uptime time.Duration) []byte {
func (s *httpServer) printStats(stats []TopicStats, producerStats []ClientStats, ms memStats, health string, startTime time.Time, uptime time.Duration) []byte {
var buf bytes.Buffer
w := &buf

Expand Down Expand Up @@ -599,6 +624,34 @@ func (s *httpServer) printStats(stats []TopicStats, ms memStats, health string,
}
}
}

if len(producerStats) == 0 {
fmt.Fprintf(w, "\nProducers: None\n")
} else {
fmt.Fprintf(w, "\nProducers:")
for _, client := range producerStats {
connectTime := time.Unix(client.ConnectTime, 0)
// truncate to the second
duration := time.Duration(int64(now.Sub(connectTime).Seconds())) * time.Second
var totalPubCount uint64
for _, v := range client.PubCounts {
totalPubCount += v.Count
}
fmt.Fprintf(w, "\n [%s %-21s] msgs: %-8d connected: %s\n",
client.Version,
client.ClientID,
totalPubCount,
duration,
)
for _, v := range client.PubCounts {
fmt.Fprintf(w, " [%-15s] msgs: %-8d\n",
v.Topic,
v.Count,
)
}
}
}

return buf.Bytes()
}

Expand Down
26 changes: 26 additions & 0 deletions nsqd/nsqd.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ type errStore struct {
err error
}

type Client interface {
Stats() ClientStats
IsProducer() bool
}

type NSQD struct {
// 64bit atomic vars need to be first for proper alignment on 32bit platforms
clientIDSequence int64
Expand All @@ -54,6 +59,9 @@ type NSQD struct {

topicMap map[string]*Topic

clientLock sync.RWMutex
clients map[int64]Client

lookupPeers atomic.Value

tcpListener net.Listener
Expand Down Expand Up @@ -84,6 +92,7 @@ func New(opts *Options) *NSQD {
n := &NSQD{
startTime: time.Now(),
topicMap: make(map[string]*Topic),
clients: make(map[int64]Client),
exitChan: make(chan int),
notifyChan: make(chan interface{}),
optsNotificationChan: make(chan struct{}, 1),
Expand Down Expand Up @@ -215,6 +224,23 @@ func (n *NSQD) GetStartTime() time.Time {
return n.startTime
}

func (n *NSQD) AddClient(clientID int64, client Client) {
n.clientLock.Lock()
n.clients[clientID] = client
n.clientLock.Unlock()
}

func (n *NSQD) RemoveClient(clientID int64) {
n.clientLock.Lock()
_, ok := n.clients[clientID]
if !ok {
n.clientLock.Unlock()
return
}
delete(n.clients, clientID)
n.clientLock.Unlock()
}

func (n *NSQD) Main() {
var err error
ctx := &context{n}
Expand Down
8 changes: 8 additions & 0 deletions nsqd/protocol_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func (p *protocolV2) IOLoop(conn net.Conn) error {

clientID := atomic.AddInt64(&p.ctx.nsqd.clientIDSequence, 1)
client := newClientV2(clientID, conn, p.ctx)
p.ctx.nsqd.AddClient(client.ID, client)

// synchronize the startup of messagePump in order
// to guarantee that it gets a chance to initialize
Expand Down Expand Up @@ -117,6 +118,7 @@ func (p *protocolV2) IOLoop(conn net.Conn) error {
client.Channel.RemoveClient(client.ID)
}

p.ctx.nsqd.RemoveClient(client.ID)
return err
}

Expand Down Expand Up @@ -799,6 +801,8 @@ func (p *protocolV2) PUB(client *clientV2, params [][]byte) ([]byte, error) {
return nil, protocol.NewFatalClientErr(err, "E_PUB_FAILED", "PUB failed "+err.Error())
}

client.PublishedMessage(topicName, 1)

return okBytes, nil
}

Expand Down Expand Up @@ -850,6 +854,8 @@ func (p *protocolV2) MPUB(client *clientV2, params [][]byte) ([]byte, error) {
return nil, protocol.NewFatalClientErr(err, "E_MPUB_FAILED", "MPUB failed "+err.Error())
}

client.PublishedMessage(topicName, uint64(len(messages)))

return okBytes, nil
}

Expand Down Expand Up @@ -912,6 +918,8 @@ func (p *protocolV2) DPUB(client *clientV2, params [][]byte) ([]byte, error) {
return nil, protocol.NewFatalClientErr(err, "E_DPUB_FAILED", "DPUB failed "+err.Error())
}

client.PublishedMessage(topicName, 1)

return okBytes, nil
}

Expand Down
23 changes: 23 additions & 0 deletions nsqd/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ func NewChannelStats(c *Channel, clients []ClientStats) ChannelStats {
}
}

type PubCount struct {
Topic string `json:"topic"`
Count uint64 `json:"count"`
}

type ClientStats struct {
ClientID string `json:"client_id"`
Hostname string `json:"hostname"`
Expand All @@ -91,6 +96,8 @@ type ClientStats struct {
AuthIdentity string `json:"auth_identity,omitempty"`
AuthIdentityURL string `json:"auth_identity_url,omitempty"`

PubCounts []PubCount `json:"pub_counts,omitempty"`

TLS bool `json:"tls"`
CipherSuite string `json:"tls_cipher_suite"`
TLSVersion string `json:"tls_version"`
Expand Down Expand Up @@ -168,6 +175,22 @@ func (n *NSQD) GetStats(topic string, channel string) []TopicStats {
return topics
}

func (n *NSQD) GetProducerStats() []ClientStats {
n.clientLock.RLock()
var producers []Client
for _, c := range n.clients {
if c.IsProducer() {
producers = append(producers, c)
}
}
n.clientLock.RUnlock()
producerStats := make([]ClientStats, 0, len(producers))
for _, p := range producers {
producerStats = append(producerStats, p.Stats())
}
return producerStats
}

type memStats struct {
HeapObjects uint64 `json:"heap_objects"`
HeapIdleBytes uint64 `json:"heap_idle_bytes"`
Expand Down

0 comments on commit 996be6d

Please sign in to comment.