Skip to content

Commit

Permalink
Merge pull request #1346 from jehiah/statsd_exclude_ephemeral_1346
Browse files Browse the repository at this point in the history
nsqd: configurable to skip ephemeral topics/channels in statsd output
  • Loading branch information
jehiah authored May 21, 2021
2 parents d0f7412 + a12d747 commit a493996
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 5 deletions.
1 change: 1 addition & 0 deletions apps/nsqd/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ func nsqdFlagSet(opts *nsqd.Options) *flag.FlagSet {
flagSet.Bool("statsd-mem-stats", opts.StatsdMemStats, "toggle sending memory and GC stats to statsd")
flagSet.String("statsd-prefix", opts.StatsdPrefix, "prefix used for keys sent to statsd (%s for host replacement)")
flagSet.Int("statsd-udp-packet-size", opts.StatsdUDPPacketSize, "the size in bytes of statsd UDP packets")
flagSet.Bool("statsd-exclude-ephemeral", opts.StatsdExcludeEphemeral, "Skip ephemeral topics and channels when sending stats to statsd")

// End to end percentile flags
e2eProcessingLatencyPercentiles := app.FloatArray{}
Expand Down
11 changes: 6 additions & 5 deletions nsqd/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,12 @@ type Options struct {
MaxChannelConsumers int `flag:"max-channel-consumers"`

// statsd integration
StatsdAddress string `flag:"statsd-address"`
StatsdPrefix string `flag:"statsd-prefix"`
StatsdInterval time.Duration `flag:"statsd-interval"`
StatsdMemStats bool `flag:"statsd-mem-stats"`
StatsdUDPPacketSize int `flag:"statsd-udp-packet-size"`
StatsdAddress string `flag:"statsd-address"`
StatsdPrefix string `flag:"statsd-prefix"`
StatsdInterval time.Duration `flag:"statsd-interval"`
StatsdMemStats bool `flag:"statsd-mem-stats"`
StatsdUDPPacketSize int `flag:"statsd-udp-packet-size"`
StatsdExcludeEphemeral bool `flag:"statsd-exclude-ephemeral"`

// e2e message latency
E2EProcessingLatencyWindowTime time.Duration `flag:"e2e-processing-latency-window-time"`
Expand Down
10 changes: 10 additions & 0 deletions nsqd/statsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"math"
"net"
"strings"
"time"

"github.com/nsqio/nsq/internal/statsd"
Expand Down Expand Up @@ -36,6 +37,7 @@ func (n *NSQD) statsdLoop() {
case <-ticker.C:
addr := n.getOpts().StatsdAddress
prefix := n.getOpts().StatsdPrefix
excludeEphemeral := n.getOpts().StatsdExcludeEphemeral
conn, err := net.DialTimeout("udp", addr, time.Second)
if err != nil {
n.logf(LOG_ERROR, "failed to create UDP socket to statsd(%s)", addr)
Expand All @@ -49,6 +51,10 @@ func (n *NSQD) statsdLoop() {

stats := n.GetStats("", "", false)
for _, topic := range stats.Topics {
if excludeEphemeral && strings.HasSuffix(topic.TopicName, "#ephemeral") {
continue
}

// try to find the topic in the last collection
lastTopic := TopicStats{}
for _, checkTopic := range lastStats.Topics {
Expand Down Expand Up @@ -80,6 +86,10 @@ func (n *NSQD) statsdLoop() {
}

for _, channel := range topic.Channels {
if excludeEphemeral && strings.HasSuffix(channel.ChannelName, "#ephemeral") {
continue
}

// try to find the channel in the last collection
lastChannel := ChannelStats{}
for _, checkChannel := range lastTopic.Channels {
Expand Down

0 comments on commit a493996

Please sign in to comment.