From 06baea270913d0f2371195866be70c70913dbad6 Mon Sep 17 00:00:00 2001 From: Andy Xie Date: Fri, 18 Jan 2019 10:58:05 +0800 Subject: [PATCH] nsqd: instrument aggregate message bytes on topics --- nsqd/stats.go | 2 ++ nsqd/statsd.go | 4 ++++ nsqd/topic.go | 9 +++++++++ 3 files changed, 15 insertions(+) diff --git a/nsqd/stats.go b/nsqd/stats.go index b77ec4112..351667814 100644 --- a/nsqd/stats.go +++ b/nsqd/stats.go @@ -14,6 +14,7 @@ type TopicStats struct { Depth int64 `json:"depth"` BackendDepth int64 `json:"backend_depth"` MessageCount uint64 `json:"message_count"` + MessageBytes uint64 `json:"message_bytes"` Paused bool `json:"paused"` E2eProcessingLatency *quantile.Result `json:"e2e_processing_latency"` @@ -26,6 +27,7 @@ func NewTopicStats(t *Topic, channels []ChannelStats) TopicStats { Depth: t.Depth(), BackendDepth: t.backend.Depth(), MessageCount: atomic.LoadUint64(&t.messageCount), + MessageBytes: atomic.LoadUint64(&t.messageBytes), Paused: t.IsPaused(), E2eProcessingLatency: t.AggregateChannelE2eProcessingLatency().Result(), diff --git a/nsqd/statsd.go b/nsqd/statsd.go index abc502436..582bf773c 100644 --- a/nsqd/statsd.go +++ b/nsqd/statsd.go @@ -61,6 +61,10 @@ func (n *NSQD) statsdLoop() { stat := fmt.Sprintf("topic.%s.message_count", topic.TopicName) client.Incr(stat, int64(diff)) + diff = topic.MessageBytes - lastTopic.MessageBytes + stat = fmt.Sprintf("topic.%s.message_bytes", topic.TopicName) + client.Incr(stat, int64(diff)) + stat = fmt.Sprintf("topic.%s.depth", topic.TopicName) client.Gauge(stat, topic.Depth) diff --git a/nsqd/topic.go b/nsqd/topic.go index 70c8e5e34..e41be2b0c 100644 --- a/nsqd/topic.go +++ b/nsqd/topic.go @@ -17,6 +17,7 @@ import ( type Topic struct { // 64bit atomic vars need to be first for proper alignment on 32bit platforms messageCount uint64 + messageBytes uint64 sync.RWMutex @@ -184,6 +185,7 @@ func (t *Topic) PutMessage(m *Message) error { return err } atomic.AddUint64(&t.messageCount, 1) + atomic.AddUint64(&t.messageBytes, uint64(len(m.Body))) return nil } @@ -194,13 +196,20 @@ func (t *Topic) PutMessages(msgs []*Message) error { if atomic.LoadInt32(&t.exitFlag) == 1 { return errors.New("exiting") } + + messageTotalBytes := 0 + for i, m := range msgs { err := t.put(m) if err != nil { atomic.AddUint64(&t.messageCount, uint64(i)) + atomic.AddUint64(&t.messageBytes, uint64(messageTotalBytes)) return err } + messageTotalBytes += len(m.Body) } + + atomic.AddUint64(&t.messageBytes, uint64(messageTotalBytes)) atomic.AddUint64(&t.messageCount, uint64(len(msgs))) return nil }