Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

nsqd: instrument aggregate message bytes on topics #1127

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions nsqd/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type TopicStats struct {
Depth int64 `json:"depth"`
BackendDepth int64 `json:"backend_depth"`
MessageCount uint64 `json:"message_count"`
MessageBytes uint64 `json:"message_bytes"`
Paused bool `json:"paused"`

E2eProcessingLatency *quantile.Result `json:"e2e_processing_latency"`
Expand All @@ -26,6 +27,7 @@ func NewTopicStats(t *Topic, channels []ChannelStats) TopicStats {
Depth: t.Depth(),
BackendDepth: t.backend.Depth(),
MessageCount: atomic.LoadUint64(&t.messageCount),
MessageBytes: atomic.LoadUint64(&t.messageBytes),
Paused: t.IsPaused(),

E2eProcessingLatency: t.AggregateChannelE2eProcessingLatency().Result(),
Expand Down
4 changes: 4 additions & 0 deletions nsqd/statsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ func (n *NSQD) statsdLoop() {
stat := fmt.Sprintf("topic.%s.message_count", topic.TopicName)
client.Incr(stat, int64(diff))

diff = topic.MessageBytes - lastTopic.MessageBytes
stat = fmt.Sprintf("topic.%s.message_bytes", topic.TopicName)
client.Incr(stat, int64(diff))

stat = fmt.Sprintf("topic.%s.depth", topic.TopicName)
client.Gauge(stat, topic.Depth)

Expand Down
9 changes: 9 additions & 0 deletions nsqd/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
type Topic struct {
// 64bit atomic vars need to be first for proper alignment on 32bit platforms
messageCount uint64
messageBytes uint64

sync.RWMutex

Expand Down Expand Up @@ -184,6 +185,7 @@ func (t *Topic) PutMessage(m *Message) error {
return err
}
atomic.AddUint64(&t.messageCount, 1)
atomic.AddUint64(&t.messageBytes, uint64(len(m.Body)))
return nil
}

Expand All @@ -194,13 +196,20 @@ func (t *Topic) PutMessages(msgs []*Message) error {
if atomic.LoadInt32(&t.exitFlag) == 1 {
return errors.New("exiting")
}

messageTotalBytes := 0

for i, m := range msgs {
err := t.put(m)
if err != nil {
atomic.AddUint64(&t.messageCount, uint64(i))
atomic.AddUint64(&t.messageBytes, uint64(messageTotalBytes))
return err
}
messageTotalBytes += len(m.Body)
}

atomic.AddUint64(&t.messageBytes, uint64(messageTotalBytes))
atomic.AddUint64(&t.messageCount, uint64(len(msgs)))
return nil
}
Expand Down