Skip to content

Commit

Permalink
nsqd: instrument aggregate message bytes on topics
Browse files Browse the repository at this point in the history
  • Loading branch information
andyxning committed Jan 20, 2019
1 parent ecd3a88 commit 06baea2
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 0 deletions.
2 changes: 2 additions & 0 deletions nsqd/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type TopicStats struct {
Depth int64 `json:"depth"`
BackendDepth int64 `json:"backend_depth"`
MessageCount uint64 `json:"message_count"`
MessageBytes uint64 `json:"message_bytes"`
Paused bool `json:"paused"`

E2eProcessingLatency *quantile.Result `json:"e2e_processing_latency"`
Expand All @@ -26,6 +27,7 @@ func NewTopicStats(t *Topic, channels []ChannelStats) TopicStats {
Depth: t.Depth(),
BackendDepth: t.backend.Depth(),
MessageCount: atomic.LoadUint64(&t.messageCount),
MessageBytes: atomic.LoadUint64(&t.messageBytes),
Paused: t.IsPaused(),

E2eProcessingLatency: t.AggregateChannelE2eProcessingLatency().Result(),
Expand Down
4 changes: 4 additions & 0 deletions nsqd/statsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ func (n *NSQD) statsdLoop() {
stat := fmt.Sprintf("topic.%s.message_count", topic.TopicName)
client.Incr(stat, int64(diff))

diff = topic.MessageBytes - lastTopic.MessageBytes
stat = fmt.Sprintf("topic.%s.message_bytes", topic.TopicName)
client.Incr(stat, int64(diff))

stat = fmt.Sprintf("topic.%s.depth", topic.TopicName)
client.Gauge(stat, topic.Depth)

Expand Down
9 changes: 9 additions & 0 deletions nsqd/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
type Topic struct {
// 64bit atomic vars need to be first for proper alignment on 32bit platforms
messageCount uint64
messageBytes uint64

sync.RWMutex

Expand Down Expand Up @@ -184,6 +185,7 @@ func (t *Topic) PutMessage(m *Message) error {
return err
}
atomic.AddUint64(&t.messageCount, 1)
atomic.AddUint64(&t.messageBytes, uint64(len(m.Body)))
return nil
}

Expand All @@ -194,13 +196,20 @@ func (t *Topic) PutMessages(msgs []*Message) error {
if atomic.LoadInt32(&t.exitFlag) == 1 {
return errors.New("exiting")
}

messageTotalBytes := 0

for i, m := range msgs {
err := t.put(m)
if err != nil {
atomic.AddUint64(&t.messageCount, uint64(i))
atomic.AddUint64(&t.messageBytes, uint64(messageTotalBytes))
return err
}
messageTotalBytes += len(m.Body)
}

atomic.AddUint64(&t.messageBytes, uint64(messageTotalBytes))
atomic.AddUint64(&t.messageCount, uint64(len(msgs)))
return nil
}
Expand Down

0 comments on commit 06baea2

Please sign in to comment.