Skip to content

Commit

Permalink
Merge pull request #7203 from influxdata/er-stats
Browse files Browse the repository at this point in the history
Ensure we don't mutate provided statistics tags
  • Loading branch information
e-dard authored Aug 24, 2016
2 parents 3ecc913 + 6cafdbc commit 8bb3fcb
Show file tree
Hide file tree
Showing 9 changed files with 122 additions and 83 deletions.
25 changes: 25 additions & 0 deletions models/statistic.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,28 @@ func NewStatistic(name string) Statistic {
Values: make(map[string]interface{}),
}
}

// StatisticTags is a map that can be merged with others without causing
// mutations to either map.
type StatisticTags map[string]string

// Merge creates a new map containing the merged contents of tags and t.
// If both tags and the receiver map contain the same key, the value in tags
// is used in the resulting map.
//
// Merge always returns a usable map.
func (t StatisticTags) Merge(tags map[string]string) map[string]string {
// Add everything in tags to the result.
out := make(map[string]string, len(tags))
for k, v := range tags {
out[k] = v
}

// Only add values from t that don't appear in tags.
for k, v := range t {
if _, ok := tags[k]; !ok {
out[k] = v
}
}
return out
}
55 changes: 55 additions & 0 deletions models/statistic_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package models_test

import (
"reflect"
"testing"

"github.com/influxdata/influxdb/models"
)

func TestTags_Merge(t *testing.T) {
examples := []struct {
Base map[string]string
Arg map[string]string
Result map[string]string
}{
{
Base: nil,
Arg: nil,
Result: map[string]string{},
},
{
Base: nil,
Arg: map[string]string{"foo": "foo"},
Result: map[string]string{"foo": "foo"},
},
{
Base: map[string]string{"foo": "foo"},
Arg: nil,
Result: map[string]string{"foo": "foo"},
},
{
Base: map[string]string{"foo": "foo"},
Arg: map[string]string{"bar": "bar"},
Result: map[string]string{"foo": "foo", "bar": "bar"},
},
{
Base: map[string]string{"foo": "foo", "bar": "bar"},
Arg: map[string]string{"zoo": "zoo"},
Result: map[string]string{"foo": "foo", "bar": "bar", "zoo": "zoo"},
},
{
Base: map[string]string{"foo": "foo", "bar": "bar"},
Arg: map[string]string{"bar": "newbar"},
Result: map[string]string{"foo": "foo", "bar": "newbar"},
},
}

for i, example := range examples {
i++
result := models.StatisticTags(example.Base).Merge(example.Arg)
if got, exp := result, example.Result; !reflect.DeepEqual(got, exp) {
t.Errorf("[Example %d] got %#v, expected %#v", i, got, exp)
}
}
}
20 changes: 7 additions & 13 deletions services/collectd/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ type Service struct {
addr net.Addr

// expvar-based stats.
stats *Statistics
statTags map[string]string
stats *Statistics
defaultTags models.StatisticTags
}

// NewService returns a new instance of the collectd service.
Expand All @@ -67,10 +67,10 @@ func NewService(c Config) *Service {
// Use defaults where necessary.
Config: c.WithDefaults(),

Logger: log.New(os.Stderr, "[collectd] ", log.LstdFlags),
err: make(chan error),
stats: &Statistics{},
statTags: map[string]string{"bind": c.BindAddress},
Logger: log.New(os.Stderr, "[collectd] ", log.LstdFlags),
err: make(chan error),
stats: &Statistics{},
defaultTags: models.StatisticTags{"bind": c.BindAddress},
}

return &s
Expand Down Expand Up @@ -224,15 +224,9 @@ type Statistics struct {

// Statistics returns statistics for periodic monitoring.
func (s *Service) Statistics(tags map[string]string) []models.Statistic {
// Insert any missing deault tag values.
for k, v := range s.statTags {
if _, ok := tags[k]; !ok {
tags[k] = v
}
}
return []models.Statistic{{
Name: "collectd",
Tags: tags,
Tags: s.defaultTags.Merge(tags),
Values: map[string]interface{}{
statPointsReceived: atomic.LoadInt64(&s.stats.PointsReceived),
statBytesReceived: atomic.LoadInt64(&s.stats.BytesReceived),
Expand Down
16 changes: 5 additions & 11 deletions services/graphite/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ type Service struct {
batcher *tsdb.PointBatcher
parser *Parser

logger *log.Logger
stats *Statistics
statTags map[string]string
logger *log.Logger
stats *Statistics
defaultTags models.StatisticTags

tcpConnectionsMu sync.Mutex
tcpConnections map[string]*tcpConnection
Expand Down Expand Up @@ -106,7 +106,7 @@ func NewService(c Config) (*Service, error) {
batchTimeout: time.Duration(d.BatchTimeout),
logger: log.New(os.Stderr, fmt.Sprintf("[graphite] %s ", d.BindAddress), log.LstdFlags),
stats: &Statistics{},
statTags: map[string]string{"proto": d.Protocol, "bind": d.BindAddress},
defaultTags: models.StatisticTags{"proto": d.Protocol, "bind": d.BindAddress},
tcpConnections: make(map[string]*tcpConnection),
done: make(chan struct{}),
diagsKey: strings.Join([]string{"graphite", d.Protocol, d.BindAddress}, ":"),
Expand Down Expand Up @@ -232,15 +232,9 @@ type Statistics struct {

// Statistics returns statistics for periodic monitoring.
func (s *Service) Statistics(tags map[string]string) []models.Statistic {
// Insert any missing deault tag values.
for k, v := range s.statTags {
if _, ok := tags[k]; !ok {
tags[k] = v
}
}
return []models.Statistic{{
Name: "graphite",
Tags: tags,
Tags: s.defaultTags.Merge(tags),
Values: map[string]interface{}{
statPointsReceived: atomic.LoadInt64(&s.stats.PointsReceived),
statBytesReceived: atomic.LoadInt64(&s.stats.BytesReceived),
Expand Down
15 changes: 4 additions & 11 deletions services/opentsdb/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ type Service struct {
LogPointErrors bool
Logger *log.Logger

stats *Statistics
statTags map[string]string
stats *Statistics
defaultTags models.StatisticTags
}

// NewService returns a new instance of Service.
Expand All @@ -96,7 +96,7 @@ func NewService(c Config) (*Service, error) {
Logger: log.New(os.Stderr, "[opentsdb] ", log.LstdFlags),
LogPointErrors: d.LogPointErrors,
stats: &Statistics{},
statTags: map[string]string{"bind": d.BindAddress},
defaultTags: models.StatisticTags{"bind": d.BindAddress},
}
return s, nil
}
Expand Down Expand Up @@ -200,16 +200,9 @@ type Statistics struct {

// Statistics returns statistics for periodic monitoring.
func (s *Service) Statistics(tags map[string]string) []models.Statistic {
// Insert any missing deault tag values.
for k, v := range s.statTags {
if _, ok := tags[k]; !ok {
tags[k] = v
}
}

return []models.Statistic{{
Name: "opentsdb",
Tags: tags,
Tags: s.defaultTags.Merge(tags),
Values: map[string]interface{}{
statHTTPConnectionsHandled: atomic.LoadInt64(&s.stats.HTTPConnectionsHandled),
statTelnetConnectionsActive: atomic.LoadInt64(&s.stats.ActiveTelnetConnections),
Expand Down
21 changes: 7 additions & 14 deletions services/subscriber/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func (s *Service) createSubscription(se subEntry, mode string, destinations []st
bm: bm,
writers: writers,
stats: stats,
tags: map[string]string{
defaultTags: models.StatisticTags{
"database": se.db,
"retention_policy": se.rp,
"name": se.name,
Expand Down Expand Up @@ -383,11 +383,11 @@ type writerStats struct {

// balances writes across PointsWriters according to BalanceMode
type balancewriter struct {
bm BalanceMode
writers []PointsWriter
stats []writerStats
tags map[string]string
i int
bm BalanceMode
writers []PointsWriter
stats []writerStats
defaultTags models.StatisticTags
i int
}

func (b *balancewriter) WritePoints(p *coordinator.WritePointsRequest) error {
Expand Down Expand Up @@ -415,19 +415,12 @@ func (b *balancewriter) WritePoints(p *coordinator.WritePointsRequest) error {

// Statistics returns statistics for periodic monitoring.
func (b *balancewriter) Statistics(tags map[string]string) []models.Statistic {
// Insert any missing default tag values.
for k, v := range b.tags {
if _, ok := tags[k]; !ok {
tags[k] = v
}
}

statistics := make([]models.Statistic, len(b.stats))
for i := range b.stats {
tags["destination"] = b.stats[i].dest
statistics[i] = models.Statistic{
Name: "subscriber",
Tags: tags,
Tags: b.defaultTags.Merge(tags),
Values: map[string]interface{}{
statPointsWritten: atomic.LoadInt64(&b.stats[i].pointsWritten),
statWriteFailures: atomic.LoadInt64(&b.stats[i].failures),
Expand Down
29 changes: 11 additions & 18 deletions services/udp/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,22 +56,22 @@ type Service struct {
CreateDatabase(name string) (*meta.DatabaseInfo, error)
}

Logger *log.Logger
stats *Statistics
statTags map[string]string
Logger *log.Logger
stats *Statistics
defaultTags models.StatisticTags
}

// NewService returns a new instance of Service.
func NewService(c Config) *Service {
d := *c.WithDefaults()
return &Service{
config: d,
done: make(chan struct{}),
parserChan: make(chan []byte, parserChanLen),
batcher: tsdb.NewPointBatcher(d.BatchSize, d.BatchPending, time.Duration(d.BatchTimeout)),
Logger: log.New(os.Stderr, "[udp] ", log.LstdFlags),
stats: &Statistics{},
statTags: map[string]string{"bind": d.BindAddress},
config: d,
done: make(chan struct{}),
parserChan: make(chan []byte, parserChanLen),
batcher: tsdb.NewPointBatcher(d.BatchSize, d.BatchPending, time.Duration(d.BatchTimeout)),
Logger: log.New(os.Stderr, "[udp] ", log.LstdFlags),
stats: &Statistics{},
defaultTags: models.StatisticTags{"bind": d.BindAddress},
}
}

Expand Down Expand Up @@ -132,16 +132,9 @@ type Statistics struct {

// Statistics returns statistics for periodic monitoring.
func (s *Service) Statistics(tags map[string]string) []models.Statistic {
// Insert any missing deault tag values.
for k, v := range s.statTags {
if _, ok := tags[k]; !ok {
tags[k] = v
}
}

return []models.Statistic{{
Name: "udp",
Tags: tags,
Tags: s.defaultTags.Merge(tags),
Values: map[string]interface{}{
statPointsReceived: atomic.LoadInt64(&s.stats.PointsReceived),
statBytesReceived: atomic.LoadInt64(&s.stats.BytesReceived),
Expand Down
9 changes: 4 additions & 5 deletions tsdb/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ type DatabaseIndex struct {

name string // name of the database represented by this index

stats *IndexStatistics
stats *IndexStatistics
defaultTags models.StatisticTags
}

// NewDatabaseIndex returns a new initialized DatabaseIndex.
Expand All @@ -44,6 +45,7 @@ func NewDatabaseIndex(name string) *DatabaseIndex {
series: make(map[string]*Series),
name: name,
stats: &IndexStatistics{},
defaultTags: models.StatisticTags{"database": name},
}
}

Expand All @@ -55,12 +57,9 @@ type IndexStatistics struct {

// Statistics returns statistics for periodic monitoring.
func (d *DatabaseIndex) Statistics(tags map[string]string) []models.Statistic {
if _, ok := tags["database"]; !ok {
tags["database"] = d.name
}
return []models.Statistic{{
Name: "database",
Tags: tags,
Tags: d.defaultTags.Merge(tags),
Values: map[string]interface{}{
statDatabaseSeries: atomic.LoadInt64(&d.stats.NumSeries),
statDatabaseMeasurements: atomic.LoadInt64(&d.stats.NumMeasurements),
Expand Down
15 changes: 4 additions & 11 deletions tsdb/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ type Shard struct {
enabled bool

// expvar-based stats.
stats *ShardStatistics
statTags map[string]string
stats *ShardStatistics
defaultTags models.StatisticTags

logger *log.Logger
// used by logger. Referenced so it can be passed down to new caches.
Expand All @@ -118,7 +118,7 @@ func NewShard(id uint64, index *DatabaseIndex, path string, walPath string, opti
closing: make(chan struct{}),

stats: &ShardStatistics{},
statTags: map[string]string{
defaultTags: models.StatisticTags{
"path": path,
"id": fmt.Sprintf("%d", id),
"database": db,
Expand Down Expand Up @@ -179,16 +179,9 @@ func (s *Shard) Statistics(tags map[string]string) []models.Statistic {
return nil
}

// Insert any missing default tag values.
for k, v := range s.statTags {
if _, ok := tags[k]; !ok {
tags[k] = v
}
}

statistics := []models.Statistic{{
Name: "shard",
Tags: tags,
Tags: s.defaultTags.Merge(tags),
Values: map[string]interface{}{
statWriteReq: atomic.LoadInt64(&s.stats.WriteReq),
statSeriesCreate: atomic.LoadInt64(&s.stats.SeriesCreated),
Expand Down

0 comments on commit 8bb3fcb

Please sign in to comment.