From 7d82beb2be239b361adc5b9c4c2f7022add62465 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Thu, 12 Mar 2015 15:22:48 -0700 Subject: [PATCH 01/26] Add basic stats type Modeled on BoltDB and expvar stats. --- stats.go | 85 +++++++++++++++++++++++++++++++++++++++++++++++++++ stats_test.go | 59 +++++++++++++++++++++++++++++++++++ 2 files changed, 144 insertions(+) create mode 100644 stats.go create mode 100644 stats_test.go diff --git a/stats.go b/stats.go new file mode 100644 index 00000000000..50cc561d641 --- /dev/null +++ b/stats.go @@ -0,0 +1,85 @@ +package influxdb + +import ( + "sync" +) + +type Int struct { + mu sync.RWMutex + i int64 +} + +func NewInt(v int64) *Int { + return &Int{i: v} +} + +// Add atomically adds the given delta to the Int. +func (i *Int) Add(delta int64) { + i.mu.Lock() + defer i.mu.Unlock() + i.i += delta + +} + +type Stats struct { + m map[string]*Int + mu sync.RWMutex +} + +func NewStats() *Stats { + return &Stats{ + m: make(map[string]*Int), + } +} + +// Add adds delta to the stat indiciated by key. +func (s *Stats) Add(key string, delta int64) { + s.mu.RLock() + i, ok := s.m[key] + s.mu.RUnlock() + if !ok { + // check again under the write lock + s.mu.Lock() + i, ok = s.m[key] + if !ok { + i = new(Int) + s.m[key] = i + } + s.mu.Unlock() + } + + i.Add(delta) +} + +func (s *Stats) Get(key string) int64 { + s.mu.RLock() + defer s.mu.RUnlock() + return s.m[key].i +} + +func (s *Stats) Set(key string, v int64) { + s.mu.Lock() + defer s.mu.Unlock() + s.m[key] = NewInt(v) +} + +// Walk calls f for each entry in the stats. The stats are locked +// during the walk but existing entries may be concurrently updated. +func (s *Stats) Walk(f func(string, int64)) { + s.mu.RLock() + defer s.mu.RUnlock() + + for k, v := range s.m { + f(k, v.i) + } +} + +// Diff returns the difference between two sets of stats. The result is undefined +// if the two Stats objects do not contain the same keys. +func (s *Stats) Diff(other *Stats) *Stats { + diff := NewStats() + s.Walk(func(k string, v int64) { + diff.Set(k, v-other.Get(k)) + }) + return diff +} diff --git a/stats_test.go b/stats_test.go new file mode 100644 index 00000000000..734e8cef0c1 --- /dev/null +++ b/stats_test.go @@ -0,0 +1,59 @@ +package influxdb_test + +import ( + "testing" + + "github.com/influxdb/influxdb" +) + +func TestStats_SetAndGet(t *testing.T) { + s := influxdb.NewStats() + + s.Set("a", 100) + if s.Get("a") != 100 { + t.Fatalf("stats set failed, expected 100, got %d", s.Get("a")) + } +} + +func TestStats_Add(t *testing.T) { + s := influxdb.NewStats() + + s.Add("a", 200) + if s.Get("a") != 200 { + t.Fatalf("stats set failed, expected 200, got %d", s.Get("a")) + } +} + +func TestStats_AddNegative(t *testing.T) { + s := influxdb.NewStats() + + s.Add("a", -200) + if s.Get("a") != -200 { + t.Fatalf("stats set failed, expected -200, got %d", s.Get("a")) + } +} + +func TestStats_SetAndAdd(t *testing.T) { + s := influxdb.NewStats() + + s.Set("a", 100) + s.Add("a", 200) + if s.Get("a") != 300 { + t.Fatalf("stats set failed, expected 300, got %d", s.Get("a")) + } +} + +func TestStats_Diff(t *testing.T) { + foo := influxdb.NewStats() + bar := influxdb.NewStats() + + foo.Set("a", 100) + foo.Set("b", 600) + bar.Set("a", 450) + bar.Set("b", 525) + + qux := bar.Diff(foo) + if qux.Get("a") != 350 || qux.Get("b") != -75 { + t.Fatalf("stats diff returned unexpedted result: %s", qux) + } +} From 8825580500077ea4a7087ddfa6b97511de749ab0 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Thu, 12 Mar 2015 16:02:25 -0700 Subject: [PATCH 02/26] Track server stats --- server.go | 23 ++++++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/server.go b/server.go index 2db0d0c145e..0024c27dce2 100644 --- a/server.go +++ b/server.go @@ -72,6 +72,7 @@ type Server struct { shards map[uint64]*Shard // shards by shard id + metrics *Stats Logger *log.Logger WriteTrace bool // Detailed logging of write path @@ -103,8 +104,9 @@ func NewServer() *Server { databases: make(map[string]*database), users: make(map[string]*User), - shards: make(map[uint64]*Shard), - Logger: log.New(os.Stderr, "[server] ", log.LstdFlags), + shards: make(map[uint64]*Shard), + metrics: NewStats(), + Logger: log.New(os.Stderr, "[server] ", log.LstdFlags), } // Server will always return with authentication enabled. // This ensures that disabling authentication must be an explicit decision. @@ -458,6 +460,8 @@ func (s *Server) Client() MessagingClient { // This function waits until the message has been processed by the server. // Returns the broker log index of the message or an error. func (s *Server) broadcast(typ messaging.MessageType, c interface{}) (uint64, error) { + s.metrics.Add("broadcastMessageTx", 1) + // Encode the command. data, err := json.Marshal(c) if err != nil { @@ -943,6 +947,7 @@ func (s *Server) applyCreateShardGroupIfNotExists(m *messaging.Message) (err err nodeIndex++ } } + s.metrics.Add("shardsCreated", int64(len(g.Shards))) // Retention policy has a new shard group, so update the policy. rp.shardGroups = append(rp.shardGroups, g) @@ -1023,6 +1028,7 @@ func (s *Server) applyDeleteShardGroup(m *messaging.Message) (err error) { // Remove from metastore. rp.removeShardGroupByID(c.ID) err = s.meta.mustUpdate(m.Index, func(tx *metatx) error { + s.metrics.Add("shardsDeleted", int64(len(g.Shards))) return tx.saveDatabase(db) }) return @@ -1508,7 +1514,15 @@ type Point struct { // WriteSeries writes series data to the database. // Returns the messaging index the data was written to. -func (s *Server) WriteSeries(database, retentionPolicy string, points []Point) (uint64, error) { +func (s *Server) WriteSeries(database, retentionPolicy string, points []Point) (idx uint64, err error) { + s.metrics.Add("batchWriteRx", 1) + s.metrics.Add("pointWriteRx", int64(len(points))) + defer func() { + if err != nil { + s.metrics.Add("batchWriteRxError", 1) + } + }() + if s.WriteTrace { log.Printf("received write for database '%s', retention policy '%s', with %d points", database, retentionPolicy, len(points)) @@ -1626,6 +1640,7 @@ func (s *Server) WriteSeries(database, retentionPolicy string, points []Point) ( if err != nil { return maxIndex, err } + s.metrics.Add("writeSeriesMessageTx", 1) if index > maxIndex { maxIndex = index } @@ -2816,6 +2831,7 @@ func (s *Server) processor(conn MessagingConn, done chan struct{}) { // All messages must be processed under lock. func() { + s.metrics.Add("broadcastMessageRx", 1) s.mu.Lock() defer s.mu.Unlock() @@ -3279,6 +3295,7 @@ func (s *Server) shouldRunContinuousQuery(cq *ContinuousQuery) bool { // runContinuousQuery will execute a continuous query // TODO: make this fan out to the cluster instead of running all the queries on this single data node func (s *Server) runContinuousQuery(cq *ContinuousQuery) { + s.metrics.Add("continuousQueryExecuted", 1) cq.mu.Lock() defer cq.mu.Unlock() From 228e238436fbb66cca9afd27001942f46691750d Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Thu, 12 Mar 2015 16:02:33 -0700 Subject: [PATCH 03/26] Remove unnecessary err variable --- server.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/server.go b/server.go index 0024c27dce2..df9c96f4253 100644 --- a/server.go +++ b/server.go @@ -1627,7 +1627,6 @@ func (s *Server) WriteSeries(database, retentionPolicy string, points []Point) ( } // Write data for each shard to the Broker. - var err error var maxIndex uint64 for i, d := range shardData { assert(len(d) > 0, "raw series data required: topic=%d", i) @@ -1649,7 +1648,7 @@ func (s *Server) WriteSeries(database, retentionPolicy string, points []Point) ( } } - return maxIndex, err + return maxIndex, nil } // createMeasurementsIfNotExists walks the "points" and ensures that all new Series are created, and all From e08066495a0ffffdec5ee394cd664dc37e5a1fb5 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Thu, 12 Mar 2015 16:07:41 -0700 Subject: [PATCH 04/26] Add "SHOW STATS" command --- influxql/ast.go | 23 +++++++++++++++++++++++ influxql/parser.go | 17 +++++++++++++++++ influxql/parser_test.go | 21 +++++++++++++++++++++ influxql/token.go | 2 ++ 4 files changed, 63 insertions(+) diff --git a/influxql/ast.go b/influxql/ast.go index 5405e2a188a..b0f82ed2f86 100644 --- a/influxql/ast.go +++ b/influxql/ast.go @@ -78,6 +78,7 @@ func (*ShowFieldKeysStatement) node() {} func (*ShowRetentionPoliciesStatement) node() {} func (*ShowMeasurementsStatement) node() {} func (*ShowSeriesStatement) node() {} +func (*ShowStatsStatement) node() {} func (*ShowTagKeysStatement) node() {} func (*ShowTagValuesStatement) node() {} func (*ShowUsersStatement) node() {} @@ -169,6 +170,7 @@ func (*ShowFieldKeysStatement) stmt() {} func (*ShowMeasurementsStatement) stmt() {} func (*ShowRetentionPoliciesStatement) stmt() {} func (*ShowSeriesStatement) stmt() {} +func (*ShowStatsStatement) stmt() {} func (*ShowTagKeysStatement) stmt() {} func (*ShowTagValuesStatement) stmt() {} func (*ShowUsersStatement) stmt() {} @@ -1365,6 +1367,27 @@ func (s *ShowRetentionPoliciesStatement) RequiredPrivileges() ExecutionPrivilege return ExecutionPrivileges{{Name: "", Privilege: ReadPrivilege}} } +// ShowRetentionPoliciesStatement represents a command for displaying stats for a given server. +type ShowStatsStatement struct { + // Hostname or IP of the server for stats. + Host string +} + +// String returns a string representation of a ShowStatsStatement. +func (s *ShowStatsStatement) String() string { + var buf bytes.Buffer + _, _ = buf.WriteString("SHOW STATS ") + if s.Host != "" { + _, _ = buf.WriteString(s.Host) + } + return buf.String() +} + +// RequiredPrivileges returns the privilege(s) required to execute a ShowStatsStatement +func (s *ShowStatsStatement) RequiredPrivileges() ExecutionPrivileges { + return ExecutionPrivileges{{Name: "", Privilege: AllPrivileges}} +} + // ShowTagKeysStatement represents a command for listing tag keys. type ShowTagKeysStatement struct { // Data source that fields are extracted from. diff --git a/influxql/parser.go b/influxql/parser.go index 6aacaa407a6..6c36d8a4859 100644 --- a/influxql/parser.go +++ b/influxql/parser.go @@ -119,6 +119,8 @@ func (p *Parser) parseShowStatement() (Statement, error) { return nil, newParseError(tokstr(tok, lit), []string{"POLICIES"}, pos) case SERIES: return p.parseShowSeriesStatement() + case STATS: + return p.parseShowStatsStatement() case TAG: tok, pos, lit := p.scanIgnoreWhitespace() if tok == KEYS { @@ -1172,6 +1174,21 @@ func (p *Parser) parseRetentionPolicy() (name string, dfault bool, err error) { return } +// parseShowStatsStatement parses a string and returns a ShowStatsStatement. +// This function assumes the "SHOW STATS" tokens have already been consumed. +func (p *Parser) parseShowStatsStatement() (*ShowStatsStatement, error) { + stmt := &ShowStatsStatement{} + var err error + + if tok, _, _ := p.scanIgnoreWhitespace(); tok == ON { + stmt.Host, err = p.parseString() + } else { + p.unscan() + } + + return stmt, err +} + // parseDropContinuousQueriesStatement parses a string and returns a DropContinuousQueryStatement. // This function assumes the "DROP CONTINUOUS" tokens have already been consumed. func (p *Parser) parseDropContinuousQueryStatement() (*DropContinuousQueryStatement, error) { diff --git a/influxql/parser_test.go b/influxql/parser_test.go index 00a2606069b..5a4141487ce 100644 --- a/influxql/parser_test.go +++ b/influxql/parser_test.go @@ -769,6 +769,26 @@ func TestParser_ParseStatement(t *testing.T) { stmt: newAlterRetentionPolicyStatement("policy1", "testdb", -1, 4, false), }, + // SHOW STATS + { + s: `SHOW STATS`, + stmt: &influxql.ShowStatsStatement{ + Host: "", + }, + }, + { + s: `SHOW STATS ON 'servera'`, + stmt: &influxql.ShowStatsStatement{ + Host: "servera", + }, + }, + { + s: `SHOW STATS ON '192.167.1.44'`, + stmt: &influxql.ShowStatsStatement{ + Host: "192.167.1.44", + }, + }, + // Errors {s: ``, err: `found EOF, expected SELECT at line 1, char 1`}, {s: `SELECT`, err: `found EOF, expected identifier, string, number, bool at line 1, char 8`}, @@ -798,6 +818,7 @@ func TestParser_ParseStatement(t *testing.T) { {s: `SHOW RETENTION`, err: `found EOF, expected POLICIES at line 1, char 16`}, {s: `SHOW RETENTION POLICIES`, err: `found EOF, expected identifier at line 1, char 25`}, {s: `SHOW FOO`, err: `found FOO, expected CONTINUOUS, DATABASES, FIELD, MEASUREMENTS, RETENTION, SERIES, SERVERS, TAG, USERS at line 1, char 6`}, + {s: `SHOW STATS ON`, err: `found EOF, expected string at line 1, char 15`}, {s: `DROP CONTINUOUS`, err: `found EOF, expected QUERY at line 1, char 17`}, {s: `DROP CONTINUOUS QUERY`, err: `found EOF, expected identifier at line 1, char 23`}, {s: `CREATE CONTINUOUS`, err: `found EOF, expected QUERY at line 1, char 19`}, diff --git a/influxql/token.go b/influxql/token.go index a60457d651e..6827c8c6ebc 100644 --- a/influxql/token.go +++ b/influxql/token.go @@ -106,6 +106,7 @@ const ( SERVERS SHOW SLIMIT + STATS SOFFSET TAG TO @@ -208,6 +209,7 @@ var tokens = [...]string{ SHOW: "SHOW", SLIMIT: "SLIMIT", SOFFSET: "SOFFSET", + STATS: "STATS", TAG: "TAG", TO: "TO", USER: "USER", From f730257ee4087348bf2741a5019487c06625ae78 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Thu, 12 Mar 2015 16:23:33 -0700 Subject: [PATCH 05/26] Implement self-monitoring --- cmd/influxd/config.go | 14 ++++++++++++++ cmd/influxd/config_test.go | 7 +++++++ cmd/influxd/run.go | 6 ++++++ etc/config.sample.toml | 9 +++++++++ server.go | 23 +++++++++++++++++++++++ 5 files changed, 59 insertions(+) diff --git a/cmd/influxd/config.go b/cmd/influxd/config.go index 639969c9b2e..646733c6504 100644 --- a/cmd/influxd/config.go +++ b/cmd/influxd/config.go @@ -110,6 +110,14 @@ type Config struct { RaftTracing bool `toml:"raft-tracing"` } `toml:"logging"` + Statistics struct { + Enabled bool `toml:"enabled"` + Database string `toml:"database"` + RetentionPolicy string `toml:"retention-policy"` + Duration Duration `toml:"duration"` + Interval Duration `toml:"check-interval"` + } + ContinuousQuery struct { // when continuous queries are run we'll automatically recompute previous intervals // in case lagged data came in. Set to zero if you never have lagged data. We do @@ -166,6 +174,12 @@ func NewConfig() *Config { c.ContinuousQuery.Disable = false c.ReportingDisabled = false + c.Statistics.Enabled = false + c.Statistics.Database = "_internal" + c.Statistics.RetentionPolicy = "default" + c.Statistics.Duration = Duration(7 * 24 * time.Hour) + c.Statistics.Interval = Duration(1 * time.Minute) + // Detect hostname (or set to localhost). if c.Hostname, _ = os.Hostname(); c.Hostname == "" { c.Hostname = "localhost" diff --git a/cmd/influxd/config_test.go b/cmd/influxd/config_test.go index 849956df8ae..f8c8a85833b 100644 --- a/cmd/influxd/config_test.go +++ b/cmd/influxd/config_test.go @@ -177,6 +177,13 @@ file = "influxdb.log" write-tracing = true raft-tracing = true +[statistics] +enabled = true +database = "_internal" +retention = "default" +duration = "7d" +check-interval = "1m" + # Configure the admin server [admin] enabled = true diff --git a/cmd/influxd/run.go b/cmd/influxd/run.go index bb01e416a31..d78db8190ed 100644 --- a/cmd/influxd/run.go +++ b/cmd/influxd/run.go @@ -180,6 +180,12 @@ func Run(config *Config, join, version string, logWriter *os.File) (*messaging.B log.Fatalf("failed to start %s Graphite server: %s", c.Protocol, err.Error()) } } + + // Start up self-monitoring if enabled. + if config.Statistics.Enabled { + // XXX create database and retention period + s.StartSelfMonitoring(interval) + } } // unless disabled, start the loop to report anonymous usage stats every 24h diff --git a/etc/config.sample.toml b/etc/config.sample.toml index 70cdb67780b..2ea87a2a72b 100644 --- a/etc/config.sample.toml +++ b/etc/config.sample.toml @@ -88,3 +88,12 @@ dir = "/tmp/influxdb/development/state" file = "/var/log/influxdb/influxd.log" # Leave blank to redirect logs to stderr. write-tracing = false # If true, enables detailed logging of the write system. raft-tracing = false # If true, enables detailed logging of Raft consensus. + +# InfluxDB can store statistics about itself. This is useful for monitoring purposes. +# This feature is disabled by default, but if enabled, these statistics can be queried +# as any other data. +[statistics] +enabled = false +database = "_internal" +retention-policy = "default" +duration = "7d" diff --git a/server.go b/server.go index df9c96f4253..e817b452458 100644 --- a/server.go +++ b/server.go @@ -323,6 +323,29 @@ func (s *Server) load() error { }) } +func (s *Server) StartSelfMonitoring(interval time.Duration) error { + if interval == 0 { + return fmt.Errorf("statistics check interval must be non-zero") + } + + // Grab the initial stats. + prev := NewStats() + + for { + time.Sleep(interval) + + // Grab the current stats and diff them. + stats := s.metrics + diff := stats.Diff(prev) + var _ = diff + + // XXX write diff to itself. Tag with server hostname and ID. + + // Save stats for the next loop. + prev = stats + } +} + // StartRetentionPolicyEnforcement launches retention policy enforcement. func (s *Server) StartRetentionPolicyEnforcement(checkInterval time.Duration) error { if checkInterval == 0 { From 240b7a7f7e5337682ea6725db1f9c066593f6874 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Thu, 12 Mar 2015 22:16:30 -0700 Subject: [PATCH 06/26] Add convenient Inc() method to Stats --- stats.go | 5 +++++ stats_test.go | 15 +++++++++++++++ 2 files changed, 20 insertions(+) diff --git a/stats.go b/stats.go index 50cc561d641..df2c9e4fd55 100644 --- a/stats.go +++ b/stats.go @@ -51,6 +51,11 @@ func (s *Stats) Add(key string, delta int64) { i.Add(delta) } +// Inc simply increments the given key by 1. +func (s *Stats) Inc(key string) { + s.Add(key, 1) +} + func (s *Stats) Get(key string) int64 { s.mu.RLock() defer s.mu.RUnlock() diff --git a/stats_test.go b/stats_test.go index 734e8cef0c1..af6c908cbf7 100644 --- a/stats_test.go +++ b/stats_test.go @@ -24,6 +24,21 @@ func TestStats_Add(t *testing.T) { } } +func TestStats_Inc(t *testing.T) { + s := influxdb.NewStats() + + s.Set("a", 100) + s.Inc("a") + if s.Get("a") != 101 { + t.Fatalf("stats Inc failed, expected 101, got %d", s.Get("a")) + } + + s.Inc("b") + if s.Get("b") != 1 { + t.Fatalf("stats Inc failed, expected 1, got %d", s.Get("b")) + } +} + func TestStats_AddNegative(t *testing.T) { s := influxdb.NewStats() From 6867d806dea45b0a048914b3208a039436598428 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Thu, 12 Mar 2015 22:18:54 -0700 Subject: [PATCH 07/26] Use Inc() shorthand for Stats --- server.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/server.go b/server.go index e817b452458..84eda90b97c 100644 --- a/server.go +++ b/server.go @@ -483,7 +483,7 @@ func (s *Server) Client() MessagingClient { // This function waits until the message has been processed by the server. // Returns the broker log index of the message or an error. func (s *Server) broadcast(typ messaging.MessageType, c interface{}) (uint64, error) { - s.metrics.Add("broadcastMessageTx", 1) + s.metrics.Inc("broadcastMessageTx") // Encode the command. data, err := json.Marshal(c) @@ -1538,11 +1538,11 @@ type Point struct { // WriteSeries writes series data to the database. // Returns the messaging index the data was written to. func (s *Server) WriteSeries(database, retentionPolicy string, points []Point) (idx uint64, err error) { - s.metrics.Add("batchWriteRx", 1) + s.metrics.Inc("batchWriteRx") s.metrics.Add("pointWriteRx", int64(len(points))) defer func() { if err != nil { - s.metrics.Add("batchWriteRxError", 1) + s.metrics.Inc("batchWriteRxError") } }() @@ -1662,7 +1662,7 @@ func (s *Server) WriteSeries(database, retentionPolicy string, points []Point) ( if err != nil { return maxIndex, err } - s.metrics.Add("writeSeriesMessageTx", 1) + s.metrics.Inc("writeSeriesMessageTx") if index > maxIndex { maxIndex = index } @@ -2853,7 +2853,7 @@ func (s *Server) processor(conn MessagingConn, done chan struct{}) { // All messages must be processed under lock. func() { - s.metrics.Add("broadcastMessageRx", 1) + s.metrics.Inc("broadcastMessageRx") s.mu.Lock() defer s.mu.Unlock() @@ -3317,7 +3317,7 @@ func (s *Server) shouldRunContinuousQuery(cq *ContinuousQuery) bool { // runContinuousQuery will execute a continuous query // TODO: make this fan out to the cluster instead of running all the queries on this single data node func (s *Server) runContinuousQuery(cq *ContinuousQuery) { - s.metrics.Add("continuousQueryExecuted", 1) + s.metrics.Inc("continuousQueryExecuted") cq.mu.Lock() defer cq.mu.Unlock() From bf28720c6ea660d12ca0e838b1e9c1fe73536115 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Fri, 13 Mar 2015 14:14:23 -0700 Subject: [PATCH 08/26] Add "name" to stats object This "name" becomes the Measurement name. In addition the implementation for "SHOW STATS" has been re-instated. --- server.go | 17 +++++++++++++++-- stats.go | 16 +++++++++++----- stats_test.go | 15 +++++++++------ 3 files changed, 35 insertions(+), 13 deletions(-) diff --git a/server.go b/server.go index 84eda90b97c..a93735eba85 100644 --- a/server.go +++ b/server.go @@ -105,7 +105,7 @@ func NewServer() *Server { users: make(map[string]*User), shards: make(map[uint64]*Shard), - metrics: NewStats(), + metrics: NewStats("server"), Logger: log.New(os.Stderr, "[server] ", log.LstdFlags), } // Server will always return with authentication enabled. @@ -329,7 +329,7 @@ func (s *Server) StartSelfMonitoring(interval time.Duration) error { } // Grab the initial stats. - prev := NewStats() + prev := NewStats("") for { time.Sleep(interval) @@ -1974,6 +1974,8 @@ func (s *Server) ExecuteQuery(q *influxql.Query, database string, user *User) Re res = s.executeShowTagValuesStatement(stmt, database, user) case *influxql.ShowFieldKeysStatement: res = s.executeShowFieldKeysStatement(stmt, database, user) + case *influxql.ShowStatsStatement: + res = s.executeShowStatsStatement(stmt, user) case *influxql.GrantStatement: res = s.executeGrantStatement(stmt, user) case *influxql.RevokeStatement: @@ -2479,6 +2481,17 @@ func (s *Server) executeShowContinuousQueriesStatement(stmt *influxql.ShowContin return &Result{Series: rows} } +func (s *Server) executeShowStatsStatement(stmt *influxql.ShowStatsStatement, user *User) *Result { + row := &influxql.Row{Columns: []string{}} + row.Name = s.metrics.Name() + s.metrics.Walk(func(k string, v int64) { + row.Columns = append(row.Columns, k) + row.Values = append(row.Values, []interface{}{v}) + }) + + return &Result{Series: []*influxql.Row{row}} +} + // filterMeasurementsByExpr filters a list of measurements by a tags expression. func filterMeasurementsByExpr(measurements Measurements, expr influxql.Expr) (Measurements, error) { // Create a list to hold result measurements. diff --git a/stats.go b/stats.go index df2c9e4fd55..a057b5160c4 100644 --- a/stats.go +++ b/stats.go @@ -22,13 +22,15 @@ func (i *Int) Add(delta int64) { } type Stats struct { - m map[string]*Int - mu sync.RWMutex + name string + m map[string]*Int + mu sync.RWMutex } -func NewStats() *Stats { +func NewStats(name string) *Stats { return &Stats{ - m: make(map[string]*Int), + name: name, + m: make(map[string]*Int), } } @@ -68,6 +70,10 @@ func (s *Stats) Set(key string, v int64) { s.m[key] = NewInt(v) } +func (s *Stats) Name() string { + return s.name +} + // Walk calls f for each entry in the stats. The stats are locked // during the walk but existing entries may be concurrently updated. func (s *Stats) Walk(f func(string, int64)) { @@ -82,7 +88,7 @@ func (s *Stats) Walk(f func(string, int64)) { // Diff returns the difference between two sets of stats. The result is undefined // if the two Stats objects do not contain the same keys. func (s *Stats) Diff(other *Stats) *Stats { - diff := NewStats() + diff := NewStats(s.name) s.Walk(func(k string, v int64) { diff.Set(k, v-other.Get(k)) }) diff --git a/stats_test.go b/stats_test.go index af6c908cbf7..d216725b4a7 100644 --- a/stats_test.go +++ b/stats_test.go @@ -7,7 +7,7 @@ import ( ) func TestStats_SetAndGet(t *testing.T) { - s := influxdb.NewStats() + s := influxdb.NewStats("foo") s.Set("a", 100) if s.Get("a") != 100 { @@ -16,7 +16,7 @@ func TestStats_SetAndGet(t *testing.T) { } func TestStats_Add(t *testing.T) { - s := influxdb.NewStats() + s := influxdb.NewStats("foo") s.Add("a", 200) if s.Get("a") != 200 { @@ -40,7 +40,7 @@ func TestStats_Inc(t *testing.T) { } func TestStats_AddNegative(t *testing.T) { - s := influxdb.NewStats() + s := influxdb.NewStats("foo") s.Add("a", -200) if s.Get("a") != -200 { @@ -49,7 +49,7 @@ func TestStats_AddNegative(t *testing.T) { } func TestStats_SetAndAdd(t *testing.T) { - s := influxdb.NewStats() + s := influxdb.NewStats("foo") s.Set("a", 100) s.Add("a", 200) @@ -59,8 +59,8 @@ func TestStats_SetAndAdd(t *testing.T) { } func TestStats_Diff(t *testing.T) { - foo := influxdb.NewStats() - bar := influxdb.NewStats() + foo := influxdb.NewStats("server") + bar := influxdb.NewStats("server") foo.Set("a", 100) foo.Set("b", 600) @@ -68,6 +68,9 @@ func TestStats_Diff(t *testing.T) { bar.Set("b", 525) qux := bar.Diff(foo) + if qux.Name() != "server" { + t.Fatalf("stats diff has unexpected name: %s", qux.Name()) + } if qux.Get("a") != 350 || qux.Get("b") != -75 { t.Fatalf("stats diff returned unexpedted result: %s", qux) } From 95f7a006a30f69a6350f47fd00e44835792f54ca Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Fri, 13 Mar 2015 14:31:21 -0700 Subject: [PATCH 09/26] Fix test config for _internal database --- cmd/influxd/config_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/influxd/config_test.go b/cmd/influxd/config_test.go index f8c8a85833b..979c30c1285 100644 --- a/cmd/influxd/config_test.go +++ b/cmd/influxd/config_test.go @@ -181,7 +181,7 @@ raft-tracing = true enabled = true database = "_internal" retention = "default" -duration = "7d" +duration = "168h" check-interval = "1m" # Configure the admin server From f6ae0f9470cb22d0af0cc226e458d45b28416e02 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Fri, 13 Mar 2015 14:34:25 -0700 Subject: [PATCH 10/26] Fix up statistics 'check-interval' in sample config --- etc/config.sample.toml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/etc/config.sample.toml b/etc/config.sample.toml index 2ea87a2a72b..e4c93ed0e09 100644 --- a/etc/config.sample.toml +++ b/etc/config.sample.toml @@ -96,4 +96,5 @@ raft-tracing = false # If true, enables detailed logging of Raft consensus. enabled = false database = "_internal" retention-policy = "default" -duration = "7d" +duration = "168h" +check-interval = "1m" From a8b928ae6b70079a89089a0e71e2d97be27a6302 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Fri, 13 Mar 2015 14:36:19 -0700 Subject: [PATCH 11/26] Add explanatory comments re statistics config --- etc/config.sample.toml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/etc/config.sample.toml b/etc/config.sample.toml index e4c93ed0e09..b6152219c7f 100644 --- a/etc/config.sample.toml +++ b/etc/config.sample.toml @@ -94,7 +94,7 @@ raft-tracing = false # If true, enables detailed logging of Raft consensus. # as any other data. [statistics] enabled = false -database = "_internal" -retention-policy = "default" -duration = "168h" -check-interval = "1m" +database = "_internal" # The database to which the data is written. +retention-policy = "default" # The retention policy within the database. +duration = "168h" # How long the data is kept around for. +check-interval = "1m" # Period between writing the data. From d95c9454d2b824aaa8bc41a7f1fd5d2937e7be46 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Fri, 13 Mar 2015 14:39:47 -0700 Subject: [PATCH 12/26] Better names for statistics config options --- cmd/influxd/config.go | 8 ++++---- cmd/influxd/config_test.go | 6 +++--- etc/config.sample.toml | 4 ++-- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/cmd/influxd/config.go b/cmd/influxd/config.go index 646733c6504..714dade6e00 100644 --- a/cmd/influxd/config.go +++ b/cmd/influxd/config.go @@ -114,8 +114,8 @@ type Config struct { Enabled bool `toml:"enabled"` Database string `toml:"database"` RetentionPolicy string `toml:"retention-policy"` - Duration Duration `toml:"duration"` - Interval Duration `toml:"check-interval"` + RetentionPeriod Duration `toml:"retention-period"` + WriteInterval Duration `toml:"write-interval"` } ContinuousQuery struct { @@ -177,8 +177,8 @@ func NewConfig() *Config { c.Statistics.Enabled = false c.Statistics.Database = "_internal" c.Statistics.RetentionPolicy = "default" - c.Statistics.Duration = Duration(7 * 24 * time.Hour) - c.Statistics.Interval = Duration(1 * time.Minute) + c.Statistics.RetentionPeriod = Duration(7 * 24 * time.Hour) + c.Statistics.WriteInterval = Duration(1 * time.Minute) // Detect hostname (or set to localhost). if c.Hostname, _ = os.Hostname(); c.Hostname == "" { diff --git a/cmd/influxd/config_test.go b/cmd/influxd/config_test.go index 979c30c1285..e635e39754e 100644 --- a/cmd/influxd/config_test.go +++ b/cmd/influxd/config_test.go @@ -180,9 +180,9 @@ raft-tracing = true [statistics] enabled = true database = "_internal" -retention = "default" -duration = "168h" -check-interval = "1m" +retention-policy = "default" +retention-period = "168h" +write-interval = "1m" # Configure the admin server [admin] diff --git a/etc/config.sample.toml b/etc/config.sample.toml index b6152219c7f..dfbd4e13125 100644 --- a/etc/config.sample.toml +++ b/etc/config.sample.toml @@ -96,5 +96,5 @@ raft-tracing = false # If true, enables detailed logging of Raft consensus. enabled = false database = "_internal" # The database to which the data is written. retention-policy = "default" # The retention policy within the database. -duration = "168h" # How long the data is kept around for. -check-interval = "1m" # Period between writing the data. +retention-period = "168h" # How long the data is kept around for. +write-interval = "1m" # Period between writing the data. From a0e57a801ab23671b3a9d204d80299b7bf4dff48 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Fri, 13 Mar 2015 15:01:53 -0700 Subject: [PATCH 13/26] Allow stats to be snapshot --- stats.go | 11 +++++++++++ stats_test.go | 13 ++++++++++++- 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/stats.go b/stats.go index a057b5160c4..ed0077a045a 100644 --- a/stats.go +++ b/stats.go @@ -94,3 +94,14 @@ func (s *Stats) Diff(other *Stats) *Stats { }) return diff } + +// Snapshot returns a copy of the stats object. Addition and removal of stats keys +// is blocked during the created of the snapshot, but existing entries may be +// concurrently updated. +func (s *Stats) Snapshot() *Stats { + snap := NewStats(s.name) + s.Walk(func(k string, v int64) { + snap.Set(k, s.m[k].i) + }) + return snap +} diff --git a/stats_test.go b/stats_test.go index d216725b4a7..8c5335a4240 100644 --- a/stats_test.go +++ b/stats_test.go @@ -72,6 +72,17 @@ func TestStats_Diff(t *testing.T) { t.Fatalf("stats diff has unexpected name: %s", qux.Name()) } if qux.Get("a") != 350 || qux.Get("b") != -75 { - t.Fatalf("stats diff returned unexpedted result: %s", qux) + t.Fatalf("stats diff returned unexpected result: %s", qux) + } +} + +func TestStats_Snapshot(t *testing.T) { + foo := influxdb.NewStats("server") + foo.Set("a", 100) + foo.Set("b", 600) + + bar := foo.Snapshot() + if bar.Name() != "server" || bar.Get("a") != 100 || bar.Get("b") != 600 { + t.Fatalf("stats snapshot returned unexpected result: %s", bar) } } From 524015151734fd4aa5d4acc458f5dd2a0ad260b7 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Fri, 13 Mar 2015 15:26:04 -0700 Subject: [PATCH 14/26] Actually start self-monitoring if requested --- cmd/influxd/run.go | 11 +++++++++-- server.go | 17 +++++++++++++---- 2 files changed, 22 insertions(+), 6 deletions(-) diff --git a/cmd/influxd/run.go b/cmd/influxd/run.go index d78db8190ed..5a693bf499d 100644 --- a/cmd/influxd/run.go +++ b/cmd/influxd/run.go @@ -183,8 +183,15 @@ func Run(config *Config, join, version string, logWriter *os.File) (*messaging.B // Start up self-monitoring if enabled. if config.Statistics.Enabled { - // XXX create database and retention period - s.StartSelfMonitoring(interval) + database := config.Statistics.Database + policy := config.Statistics.RetentionPolicy + interval := config.Statistics.WriteInterval + + if err := s.CreateDatabaseIfNotExists(database); err != nil { + log.Fatalf("failed to create database % for internal statistics: %s", + config.Statistics.Database, err.Error()) + } + s.StartSelfMonitoring(database, policy, time.Duration(interval)) } } diff --git a/server.go b/server.go index a93735eba85..8912d39b014 100644 --- a/server.go +++ b/server.go @@ -323,23 +323,32 @@ func (s *Server) load() error { }) } -func (s *Server) StartSelfMonitoring(interval time.Duration) error { +func (s *Server) StartSelfMonitoring(database, retention string, interval time.Duration) error { if interval == 0 { return fmt.Errorf("statistics check interval must be non-zero") } // Grab the initial stats. - prev := NewStats("") + prev := s.metrics.Snapshot() for { time.Sleep(interval) // Grab the current stats and diff them. - stats := s.metrics + stats := s.metrics.Snapshot() diff := stats.Diff(prev) - var _ = diff + + point := Point{ + Name: diff.Name(), + Tags: map[string]string{"id": strconv.FormatUint(s.id, 10)}, + Fields: make(map[string]interface{}), + } + diff.Walk(func(k string, v int64) { + point.Fields[k] = v + }) // XXX write diff to itself. Tag with server hostname and ID. + s.WriteSeries(database, retention, []Point{point}) // Save stats for the next loop. prev = stats From 511be11fc116fe0952b171c00d8d8912e3d31069 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Fri, 13 Mar 2015 15:43:19 -0700 Subject: [PATCH 15/26] Final tweaks to stats unit tests --- stats_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stats_test.go b/stats_test.go index 8c5335a4240..78243cf8692 100644 --- a/stats_test.go +++ b/stats_test.go @@ -25,7 +25,7 @@ func TestStats_Add(t *testing.T) { } func TestStats_Inc(t *testing.T) { - s := influxdb.NewStats() + s := influxdb.NewStats("foo") s.Set("a", 100) s.Inc("a") From ea7b7be534382146f0f06ce30f9c24045f049af2 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Fri, 13 Mar 2015 15:51:14 -0700 Subject: [PATCH 16/26] Fix 'go vet' errors --- cmd/influxd/run.go | 2 +- stats_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/influxd/run.go b/cmd/influxd/run.go index 5a693bf499d..cc6e92a73f2 100644 --- a/cmd/influxd/run.go +++ b/cmd/influxd/run.go @@ -188,7 +188,7 @@ func Run(config *Config, join, version string, logWriter *os.File) (*messaging.B interval := config.Statistics.WriteInterval if err := s.CreateDatabaseIfNotExists(database); err != nil { - log.Fatalf("failed to create database % for internal statistics: %s", + log.Fatalf("failed to create database %s for internal statistics: %s", config.Statistics.Database, err.Error()) } s.StartSelfMonitoring(database, policy, time.Duration(interval)) diff --git a/stats_test.go b/stats_test.go index 78243cf8692..57fe3380a84 100644 --- a/stats_test.go +++ b/stats_test.go @@ -72,7 +72,7 @@ func TestStats_Diff(t *testing.T) { t.Fatalf("stats diff has unexpected name: %s", qux.Name()) } if qux.Get("a") != 350 || qux.Get("b") != -75 { - t.Fatalf("stats diff returned unexpected result: %s", qux) + t.Fatalf("stats diff returned unexpected result: %v", qux) } } From ae3b3d5252dde809df3b3c65f2447d8a3e340937 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Fri, 13 Mar 2015 15:58:55 -0700 Subject: [PATCH 17/26] Rename "metrics" to "stats" --- cmd/influxd/run.go | 1 + server.go | 37 ++++++++++++++++++------------------- 2 files changed, 19 insertions(+), 19 deletions(-) diff --git a/cmd/influxd/run.go b/cmd/influxd/run.go index cc6e92a73f2..3c5d01f389d 100644 --- a/cmd/influxd/run.go +++ b/cmd/influxd/run.go @@ -192,6 +192,7 @@ func Run(config *Config, join, version string, logWriter *os.File) (*messaging.B config.Statistics.Database, err.Error()) } s.StartSelfMonitoring(database, policy, time.Duration(interval)) + log.Printf("started self-monitoring at interval of %s", interval) } } diff --git a/server.go b/server.go index 8912d39b014..4e163643330 100644 --- a/server.go +++ b/server.go @@ -72,7 +72,7 @@ type Server struct { shards map[uint64]*Shard // shards by shard id - metrics *Stats + stats *Stats Logger *log.Logger WriteTrace bool // Detailed logging of write path @@ -104,9 +104,9 @@ func NewServer() *Server { databases: make(map[string]*database), users: make(map[string]*User), - shards: make(map[uint64]*Shard), - metrics: NewStats("server"), - Logger: log.New(os.Stderr, "[server] ", log.LstdFlags), + shards: make(map[uint64]*Shard), + stats: NewStats("server"), + Logger: log.New(os.Stderr, "[server] ", log.LstdFlags), } // Server will always return with authentication enabled. // This ensures that disabling authentication must be an explicit decision. @@ -329,15 +329,16 @@ func (s *Server) StartSelfMonitoring(database, retention string, interval time.D } // Grab the initial stats. - prev := s.metrics.Snapshot() + prev := s.stats.Snapshot() for { time.Sleep(interval) // Grab the current stats and diff them. - stats := s.metrics.Snapshot() + stats := s.stats.Snapshot() diff := stats.Diff(prev) + // Create the data point and write it. point := Point{ Name: diff.Name(), Tags: map[string]string{"id": strconv.FormatUint(s.id, 10)}, @@ -346,8 +347,6 @@ func (s *Server) StartSelfMonitoring(database, retention string, interval time.D diff.Walk(func(k string, v int64) { point.Fields[k] = v }) - - // XXX write diff to itself. Tag with server hostname and ID. s.WriteSeries(database, retention, []Point{point}) // Save stats for the next loop. @@ -492,7 +491,7 @@ func (s *Server) Client() MessagingClient { // This function waits until the message has been processed by the server. // Returns the broker log index of the message or an error. func (s *Server) broadcast(typ messaging.MessageType, c interface{}) (uint64, error) { - s.metrics.Inc("broadcastMessageTx") + s.stats.Inc("broadcastMessageTx") // Encode the command. data, err := json.Marshal(c) @@ -979,7 +978,7 @@ func (s *Server) applyCreateShardGroupIfNotExists(m *messaging.Message) (err err nodeIndex++ } } - s.metrics.Add("shardsCreated", int64(len(g.Shards))) + s.stats.Add("shardsCreated", int64(len(g.Shards))) // Retention policy has a new shard group, so update the policy. rp.shardGroups = append(rp.shardGroups, g) @@ -1060,7 +1059,7 @@ func (s *Server) applyDeleteShardGroup(m *messaging.Message) (err error) { // Remove from metastore. rp.removeShardGroupByID(c.ID) err = s.meta.mustUpdate(m.Index, func(tx *metatx) error { - s.metrics.Add("shardsDeleted", int64(len(g.Shards))) + s.stats.Add("shardsDeleted", int64(len(g.Shards))) return tx.saveDatabase(db) }) return @@ -1547,11 +1546,11 @@ type Point struct { // WriteSeries writes series data to the database. // Returns the messaging index the data was written to. func (s *Server) WriteSeries(database, retentionPolicy string, points []Point) (idx uint64, err error) { - s.metrics.Inc("batchWriteRx") - s.metrics.Add("pointWriteRx", int64(len(points))) + s.stats.Inc("batchWriteRx") + s.stats.Add("pointWriteRx", int64(len(points))) defer func() { if err != nil { - s.metrics.Inc("batchWriteRxError") + s.stats.Inc("batchWriteRxError") } }() @@ -1671,7 +1670,7 @@ func (s *Server) WriteSeries(database, retentionPolicy string, points []Point) ( if err != nil { return maxIndex, err } - s.metrics.Inc("writeSeriesMessageTx") + s.stats.Inc("writeSeriesMessageTx") if index > maxIndex { maxIndex = index } @@ -2492,8 +2491,8 @@ func (s *Server) executeShowContinuousQueriesStatement(stmt *influxql.ShowContin func (s *Server) executeShowStatsStatement(stmt *influxql.ShowStatsStatement, user *User) *Result { row := &influxql.Row{Columns: []string{}} - row.Name = s.metrics.Name() - s.metrics.Walk(func(k string, v int64) { + row.Name = s.stats.Name() + s.stats.Walk(func(k string, v int64) { row.Columns = append(row.Columns, k) row.Values = append(row.Values, []interface{}{v}) }) @@ -2875,7 +2874,7 @@ func (s *Server) processor(conn MessagingConn, done chan struct{}) { // All messages must be processed under lock. func() { - s.metrics.Inc("broadcastMessageRx") + s.stats.Inc("broadcastMessageRx") s.mu.Lock() defer s.mu.Unlock() @@ -3339,7 +3338,7 @@ func (s *Server) shouldRunContinuousQuery(cq *ContinuousQuery) bool { // runContinuousQuery will execute a continuous query // TODO: make this fan out to the cluster instead of running all the queries on this single data node func (s *Server) runContinuousQuery(cq *ContinuousQuery) { - s.metrics.Inc("continuousQueryExecuted") + s.stats.Inc("continuousQueryExecuted") cq.mu.Lock() defer cq.mu.Unlock() From f4b3e3da9f3192ad946d9c12ae26b15e453d818a Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Fri, 13 Mar 2015 16:10:40 -0700 Subject: [PATCH 18/26] Fix final 'go vet' errors --- cmd/influxd/run.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/influxd/run.go b/cmd/influxd/run.go index 3c5d01f389d..6fad6cbb6ca 100644 --- a/cmd/influxd/run.go +++ b/cmd/influxd/run.go @@ -185,13 +185,13 @@ func Run(config *Config, join, version string, logWriter *os.File) (*messaging.B if config.Statistics.Enabled { database := config.Statistics.Database policy := config.Statistics.RetentionPolicy - interval := config.Statistics.WriteInterval + interval := time.Duration(config.Statistics.WriteInterval) if err := s.CreateDatabaseIfNotExists(database); err != nil { log.Fatalf("failed to create database %s for internal statistics: %s", config.Statistics.Database, err.Error()) } - s.StartSelfMonitoring(database, policy, time.Duration(interval)) + s.StartSelfMonitoring(database, policy, interval) log.Printf("started self-monitoring at interval of %s", interval) } } From a99cd373bb55f660c02bed62d006c7a62e6e6403 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Fri, 13 Mar 2015 16:11:50 -0700 Subject: [PATCH 19/26] Update CHANGELOG --- CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index d92da425337..f70b76c6289 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,9 @@ - [#1955](https://github.com/influxdb/influxdb/pull/1955): Prohibit creation of databases with no name. Thanks @dullgiulio - [#1952](https://github.com/influxdb/influxdb/pull/1952): Handle delete statement with an error. Thanks again to @dullgiulio +### Features +- [#1936](https://github.com/influxdb/influxdb/pull/1936): Implement "SHOW STATS" and self-monitoring + ## v0.9.0-rc11 [2015-03-13] ### Bugfixes From 5aa49ef823fcbb343110bd6327edbccd84abd146 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Fri, 13 Mar 2015 16:17:22 -0700 Subject: [PATCH 20/26] Add GoDoc comments to Stats type --- stats.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/stats.go b/stats.go index ed0077a045a..984f6af92ab 100644 --- a/stats.go +++ b/stats.go @@ -4,11 +4,13 @@ import ( "sync" ) +// Int representes a 64-bit signed integer which can be updated atomically. type Int struct { mu sync.RWMutex i int64 } +// NewInt returns a new Int func NewInt(v int64) *Int { return &Int{i: v} } @@ -21,12 +23,14 @@ func (i *Int) Add(delta int64) { } +// Stats represents a collection of metrics, as key-value pairs. type Stats struct { name string m map[string]*Int mu sync.RWMutex } +// NewStats returns a Stats object with the given name. func NewStats(name string) *Stats { return &Stats{ name: name, @@ -58,18 +62,21 @@ func (s *Stats) Inc(key string) { s.Add(key, 1) } +// Get returns a value for a given key. func (s *Stats) Get(key string) int64 { s.mu.RLock() defer s.mu.RUnlock() return s.m[key].i } +// Set sets a value for the given key. func (s *Stats) Set(key string, v int64) { s.mu.Lock() defer s.mu.Unlock() s.m[key] = NewInt(v) } +// Name returns the name of the Stats object. func (s *Stats) Name() string { return s.name } From d627634991674de7eb0b1bc96954a7d20170a97d Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Fri, 13 Mar 2015 16:42:06 -0700 Subject: [PATCH 21/26] Ensure internal retention policy exists --- cmd/influxd/config.go | 2 -- cmd/influxd/config_test.go | 1 - cmd/influxd/run.go | 12 +++++++++--- etc/config.sample.toml | 1 - server.go | 19 +++++++++++++++++++ 5 files changed, 28 insertions(+), 7 deletions(-) diff --git a/cmd/influxd/config.go b/cmd/influxd/config.go index 714dade6e00..a711a600873 100644 --- a/cmd/influxd/config.go +++ b/cmd/influxd/config.go @@ -114,7 +114,6 @@ type Config struct { Enabled bool `toml:"enabled"` Database string `toml:"database"` RetentionPolicy string `toml:"retention-policy"` - RetentionPeriod Duration `toml:"retention-period"` WriteInterval Duration `toml:"write-interval"` } @@ -177,7 +176,6 @@ func NewConfig() *Config { c.Statistics.Enabled = false c.Statistics.Database = "_internal" c.Statistics.RetentionPolicy = "default" - c.Statistics.RetentionPeriod = Duration(7 * 24 * time.Hour) c.Statistics.WriteInterval = Duration(1 * time.Minute) // Detect hostname (or set to localhost). diff --git a/cmd/influxd/config_test.go b/cmd/influxd/config_test.go index e635e39754e..66267a4837f 100644 --- a/cmd/influxd/config_test.go +++ b/cmd/influxd/config_test.go @@ -181,7 +181,6 @@ raft-tracing = true enabled = true database = "_internal" retention-policy = "default" -retention-period = "168h" write-interval = "1m" # Configure the admin server diff --git a/cmd/influxd/run.go b/cmd/influxd/run.go index 6fad6cbb6ca..e6e6e57b8e1 100644 --- a/cmd/influxd/run.go +++ b/cmd/influxd/run.go @@ -185,12 +185,18 @@ func Run(config *Config, join, version string, logWriter *os.File) (*messaging.B if config.Statistics.Enabled { database := config.Statistics.Database policy := config.Statistics.RetentionPolicy - interval := time.Duration(config.Statistics.WriteInterval) + // Ensure database exists. if err := s.CreateDatabaseIfNotExists(database); err != nil { - log.Fatalf("failed to create database %s for internal statistics: %s", - config.Statistics.Database, err.Error()) + log.Fatalf("failed to create database %s for internal statistics: %s", database, err.Error()) } + + // Ensure retention policy exists. + rp := influxdb.NewRetentionPolicy(policy) + if err := s.CreateRetentionPolicyIfNotExists(database, rp); err != nil { + log.Fatalf("failed to create retention policy for internal statistics: %s", err.Error()) + } + s.StartSelfMonitoring(database, policy, interval) log.Printf("started self-monitoring at interval of %s", interval) } diff --git a/etc/config.sample.toml b/etc/config.sample.toml index dfbd4e13125..573c35e1701 100644 --- a/etc/config.sample.toml +++ b/etc/config.sample.toml @@ -96,5 +96,4 @@ raft-tracing = false # If true, enables detailed logging of Raft consensus. enabled = false database = "_internal" # The database to which the data is written. retention-policy = "default" # The retention policy within the database. -retention-period = "168h" # How long the data is kept around for. write-interval = "1m" # Period between writing the data. diff --git a/server.go b/server.go index 4e163643330..ecc4fb353b8 100644 --- a/server.go +++ b/server.go @@ -1311,6 +1311,13 @@ func (s *Server) RetentionPolicies(database string) ([]*RetentionPolicy, error) return a, nil } +// RetentionPolicyExists returns true if a retention policy exists for a given database. +func (s *Server) RetentionPolicyExists(database, retention string) bool { + s.mu.RLock() + defer s.mu.RUnlock() + return s.DatabaseExists(database) && s.databases[database].policies[retention] != nil +} + // CreateRetentionPolicy creates a retention policy for a database. func (s *Server) CreateRetentionPolicy(database string, rp *RetentionPolicy) error { // Enforce duration of at least retentionPolicyMinDuration @@ -1329,6 +1336,18 @@ func (s *Server) CreateRetentionPolicy(database string, rp *RetentionPolicy) err return err } +// CreateRetentionPolicy creates a retention policy for a database. +func (s *Server) CreateRetentionPolicyIfNotExists(database string, rp *RetentionPolicy) error { + // Ensure retention policy exists. + if !s.RetentionPolicyExists(database, rp.Name) { + // Small chance retention policy could be created after it didn't exist when checked. + if err := s.CreateRetentionPolicy(database, rp); err != nil && err != ErrRetentionPolicyExists { + return err + } + } + return nil +} + func calculateShardGroupDuration(d time.Duration) time.Duration { const ( day = time.Hour * 24 From 43161bb5da57ba89e501f5d11a24db48d2678736 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Fri, 13 Mar 2015 16:43:47 -0700 Subject: [PATCH 22/26] Small chance that a database exists --- server.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/server.go b/server.go index ecc4fb353b8..3af638e1148 100644 --- a/server.go +++ b/server.go @@ -799,7 +799,12 @@ func (s *Server) CreateDatabaseIfNotExists(name string) error { if s.DatabaseExists(name) { return nil } - return s.CreateDatabase(name) + + // Small chance database could have been created even though the check above said it didn't. + if err := s.CreateDatabase(name); err != nil && err != ErrDatabaseExists { + return err + } + return nil } func (s *Server) applyCreateDatabase(m *messaging.Message) (err error) { From 032cfaa9808d8d13f70597c181d8789a830335fc Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Fri, 13 Mar 2015 16:57:09 -0700 Subject: [PATCH 23/26] Launch self-monitoring in a go-routine --- server.go | 38 +++++++++++++++++++++----------------- 1 file changed, 21 insertions(+), 17 deletions(-) diff --git a/server.go b/server.go index 3af638e1148..9f87c9d43b1 100644 --- a/server.go +++ b/server.go @@ -331,27 +331,31 @@ func (s *Server) StartSelfMonitoring(database, retention string, interval time.D // Grab the initial stats. prev := s.stats.Snapshot() - for { - time.Sleep(interval) + go func() { + for { + time.Sleep(interval) - // Grab the current stats and diff them. - stats := s.stats.Snapshot() - diff := stats.Diff(prev) + // Grab the current stats and diff them. + stats := s.stats.Snapshot() + diff := stats.Diff(prev) + + // Create the data point and write it. + point := Point{ + Name: diff.Name(), + Tags: map[string]string{"id": strconv.FormatUint(s.id, 10)}, + Fields: make(map[string]interface{}), + } + diff.Walk(func(k string, v int64) { + point.Fields[k] = v + }) + s.WriteSeries(database, retention, []Point{point}) - // Create the data point and write it. - point := Point{ - Name: diff.Name(), - Tags: map[string]string{"id": strconv.FormatUint(s.id, 10)}, - Fields: make(map[string]interface{}), + // Save stats for the next loop. + prev = stats } - diff.Walk(func(k string, v int64) { - point.Fields[k] = v - }) - s.WriteSeries(database, retention, []Point{point}) + }() - // Save stats for the next loop. - prev = stats - } + return nil } // StartRetentionPolicyEnforcement launches retention policy enforcement. From 50d24700418da3952e779d13757691a14d138c70 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Fri, 13 Mar 2015 17:23:55 -0700 Subject: [PATCH 24/26] Write totals, not diff, of internal stats --- cmd/influxd/run.go | 1 + server.go | 16 +++------------- 2 files changed, 4 insertions(+), 13 deletions(-) diff --git a/cmd/influxd/run.go b/cmd/influxd/run.go index e6e6e57b8e1..e333e8933a8 100644 --- a/cmd/influxd/run.go +++ b/cmd/influxd/run.go @@ -185,6 +185,7 @@ func Run(config *Config, join, version string, logWriter *os.File) (*messaging.B if config.Statistics.Enabled { database := config.Statistics.Database policy := config.Statistics.RetentionPolicy + interval := time.Duration(config.Statistics.WriteInterval) // Ensure database exists. if err := s.CreateDatabaseIfNotExists(database); err != nil { diff --git a/server.go b/server.go index 9f87c9d43b1..cfa17d5f1a1 100644 --- a/server.go +++ b/server.go @@ -328,30 +328,20 @@ func (s *Server) StartSelfMonitoring(database, retention string, interval time.D return fmt.Errorf("statistics check interval must be non-zero") } - // Grab the initial stats. - prev := s.stats.Snapshot() - go func() { for { time.Sleep(interval) - // Grab the current stats and diff them. - stats := s.stats.Snapshot() - diff := stats.Diff(prev) - // Create the data point and write it. point := Point{ - Name: diff.Name(), + Name: s.stats.Name(), Tags: map[string]string{"id": strconv.FormatUint(s.id, 10)}, Fields: make(map[string]interface{}), } - diff.Walk(func(k string, v int64) { - point.Fields[k] = v + s.stats.Walk(func(k string, v int64) { + point.Fields[k] = int(v) }) s.WriteSeries(database, retention, []Point{point}) - - // Save stats for the next loop. - prev = stats } }() From eec2cf6bf6c2d79d0358deba933afe289af97c5d Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Fri, 13 Mar 2015 17:30:18 -0700 Subject: [PATCH 25/26] Use a better ID for the server ID in stats --- server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server.go b/server.go index cfa17d5f1a1..339cf33e765 100644 --- a/server.go +++ b/server.go @@ -335,7 +335,7 @@ func (s *Server) StartSelfMonitoring(database, retention string, interval time.D // Create the data point and write it. point := Point{ Name: s.stats.Name(), - Tags: map[string]string{"id": strconv.FormatUint(s.id, 10)}, + Tags: map[string]string{"raftID": strconv.FormatUint(s.id, 10)}, Fields: make(map[string]interface{}), } s.stats.Walk(func(k string, v int64) { From 69530b7e8f0afc39dc981653abe4240981c3c388 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Fri, 13 Mar 2015 18:28:48 -0700 Subject: [PATCH 26/26] Use a time.Ticker object for precise self-monitoring --- server.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server.go b/server.go index 339cf33e765..985d15772e5 100644 --- a/server.go +++ b/server.go @@ -329,8 +329,9 @@ func (s *Server) StartSelfMonitoring(database, retention string, interval time.D } go func() { + tick := time.NewTicker(interval) for { - time.Sleep(interval) + <-tick.C // Create the data point and write it. point := Point{