diff --git a/CHANGELOG.md b/CHANGELOG.md index d92da425337..f70b76c6289 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,9 @@ - [#1955](https://github.com/influxdb/influxdb/pull/1955): Prohibit creation of databases with no name. Thanks @dullgiulio - [#1952](https://github.com/influxdb/influxdb/pull/1952): Handle delete statement with an error. Thanks again to @dullgiulio +### Features +- [#1936](https://github.com/influxdb/influxdb/pull/1936): Implement "SHOW STATS" and self-monitoring + ## v0.9.0-rc11 [2015-03-13] ### Bugfixes diff --git a/cmd/influxd/config.go b/cmd/influxd/config.go index 639969c9b2e..a711a600873 100644 --- a/cmd/influxd/config.go +++ b/cmd/influxd/config.go @@ -110,6 +110,13 @@ type Config struct { RaftTracing bool `toml:"raft-tracing"` } `toml:"logging"` + Statistics struct { + Enabled bool `toml:"enabled"` + Database string `toml:"database"` + RetentionPolicy string `toml:"retention-policy"` + WriteInterval Duration `toml:"write-interval"` + } + ContinuousQuery struct { // when continuous queries are run we'll automatically recompute previous intervals // in case lagged data came in. Set to zero if you never have lagged data. We do @@ -166,6 +173,11 @@ func NewConfig() *Config { c.ContinuousQuery.Disable = false c.ReportingDisabled = false + c.Statistics.Enabled = false + c.Statistics.Database = "_internal" + c.Statistics.RetentionPolicy = "default" + c.Statistics.WriteInterval = Duration(1 * time.Minute) + // Detect hostname (or set to localhost). if c.Hostname, _ = os.Hostname(); c.Hostname == "" { c.Hostname = "localhost" diff --git a/cmd/influxd/config_test.go b/cmd/influxd/config_test.go index 849956df8ae..66267a4837f 100644 --- a/cmd/influxd/config_test.go +++ b/cmd/influxd/config_test.go @@ -177,6 +177,12 @@ file = "influxdb.log" write-tracing = true raft-tracing = true +[statistics] +enabled = true +database = "_internal" +retention-policy = "default" +write-interval = "1m" + # Configure the admin server [admin] enabled = true diff --git a/cmd/influxd/run.go b/cmd/influxd/run.go index bb01e416a31..e333e8933a8 100644 --- a/cmd/influxd/run.go +++ b/cmd/influxd/run.go @@ -180,6 +180,27 @@ func Run(config *Config, join, version string, logWriter *os.File) (*messaging.B log.Fatalf("failed to start %s Graphite server: %s", c.Protocol, err.Error()) } } + + // Start up self-monitoring if enabled. + if config.Statistics.Enabled { + database := config.Statistics.Database + policy := config.Statistics.RetentionPolicy + interval := time.Duration(config.Statistics.WriteInterval) + + // Ensure database exists. + if err := s.CreateDatabaseIfNotExists(database); err != nil { + log.Fatalf("failed to create database %s for internal statistics: %s", database, err.Error()) + } + + // Ensure retention policy exists. + rp := influxdb.NewRetentionPolicy(policy) + if err := s.CreateRetentionPolicyIfNotExists(database, rp); err != nil { + log.Fatalf("failed to create retention policy for internal statistics: %s", err.Error()) + } + + s.StartSelfMonitoring(database, policy, interval) + log.Printf("started self-monitoring at interval of %s", interval) + } } // unless disabled, start the loop to report anonymous usage stats every 24h diff --git a/etc/config.sample.toml b/etc/config.sample.toml index 70cdb67780b..573c35e1701 100644 --- a/etc/config.sample.toml +++ b/etc/config.sample.toml @@ -88,3 +88,12 @@ dir = "/tmp/influxdb/development/state" file = "/var/log/influxdb/influxd.log" # Leave blank to redirect logs to stderr. write-tracing = false # If true, enables detailed logging of the write system. raft-tracing = false # If true, enables detailed logging of Raft consensus. + +# InfluxDB can store statistics about itself. This is useful for monitoring purposes. +# This feature is disabled by default, but if enabled, these statistics can be queried +# as any other data. +[statistics] +enabled = false +database = "_internal" # The database to which the data is written. +retention-policy = "default" # The retention policy within the database. +write-interval = "1m" # Period between writing the data. diff --git a/influxql/ast.go b/influxql/ast.go index 5405e2a188a..b0f82ed2f86 100644 --- a/influxql/ast.go +++ b/influxql/ast.go @@ -78,6 +78,7 @@ func (*ShowFieldKeysStatement) node() {} func (*ShowRetentionPoliciesStatement) node() {} func (*ShowMeasurementsStatement) node() {} func (*ShowSeriesStatement) node() {} +func (*ShowStatsStatement) node() {} func (*ShowTagKeysStatement) node() {} func (*ShowTagValuesStatement) node() {} func (*ShowUsersStatement) node() {} @@ -169,6 +170,7 @@ func (*ShowFieldKeysStatement) stmt() {} func (*ShowMeasurementsStatement) stmt() {} func (*ShowRetentionPoliciesStatement) stmt() {} func (*ShowSeriesStatement) stmt() {} +func (*ShowStatsStatement) stmt() {} func (*ShowTagKeysStatement) stmt() {} func (*ShowTagValuesStatement) stmt() {} func (*ShowUsersStatement) stmt() {} @@ -1365,6 +1367,27 @@ func (s *ShowRetentionPoliciesStatement) RequiredPrivileges() ExecutionPrivilege return ExecutionPrivileges{{Name: "", Privilege: ReadPrivilege}} } +// ShowRetentionPoliciesStatement represents a command for displaying stats for a given server. +type ShowStatsStatement struct { + // Hostname or IP of the server for stats. + Host string +} + +// String returns a string representation of a ShowStatsStatement. +func (s *ShowStatsStatement) String() string { + var buf bytes.Buffer + _, _ = buf.WriteString("SHOW STATS ") + if s.Host != "" { + _, _ = buf.WriteString(s.Host) + } + return buf.String() +} + +// RequiredPrivileges returns the privilege(s) required to execute a ShowStatsStatement +func (s *ShowStatsStatement) RequiredPrivileges() ExecutionPrivileges { + return ExecutionPrivileges{{Name: "", Privilege: AllPrivileges}} +} + // ShowTagKeysStatement represents a command for listing tag keys. type ShowTagKeysStatement struct { // Data source that fields are extracted from. diff --git a/influxql/parser.go b/influxql/parser.go index 6aacaa407a6..6c36d8a4859 100644 --- a/influxql/parser.go +++ b/influxql/parser.go @@ -119,6 +119,8 @@ func (p *Parser) parseShowStatement() (Statement, error) { return nil, newParseError(tokstr(tok, lit), []string{"POLICIES"}, pos) case SERIES: return p.parseShowSeriesStatement() + case STATS: + return p.parseShowStatsStatement() case TAG: tok, pos, lit := p.scanIgnoreWhitespace() if tok == KEYS { @@ -1172,6 +1174,21 @@ func (p *Parser) parseRetentionPolicy() (name string, dfault bool, err error) { return } +// parseShowStatsStatement parses a string and returns a ShowStatsStatement. +// This function assumes the "SHOW STATS" tokens have already been consumed. +func (p *Parser) parseShowStatsStatement() (*ShowStatsStatement, error) { + stmt := &ShowStatsStatement{} + var err error + + if tok, _, _ := p.scanIgnoreWhitespace(); tok == ON { + stmt.Host, err = p.parseString() + } else { + p.unscan() + } + + return stmt, err +} + // parseDropContinuousQueriesStatement parses a string and returns a DropContinuousQueryStatement. // This function assumes the "DROP CONTINUOUS" tokens have already been consumed. func (p *Parser) parseDropContinuousQueryStatement() (*DropContinuousQueryStatement, error) { diff --git a/influxql/parser_test.go b/influxql/parser_test.go index 00a2606069b..5a4141487ce 100644 --- a/influxql/parser_test.go +++ b/influxql/parser_test.go @@ -769,6 +769,26 @@ func TestParser_ParseStatement(t *testing.T) { stmt: newAlterRetentionPolicyStatement("policy1", "testdb", -1, 4, false), }, + // SHOW STATS + { + s: `SHOW STATS`, + stmt: &influxql.ShowStatsStatement{ + Host: "", + }, + }, + { + s: `SHOW STATS ON 'servera'`, + stmt: &influxql.ShowStatsStatement{ + Host: "servera", + }, + }, + { + s: `SHOW STATS ON '192.167.1.44'`, + stmt: &influxql.ShowStatsStatement{ + Host: "192.167.1.44", + }, + }, + // Errors {s: ``, err: `found EOF, expected SELECT at line 1, char 1`}, {s: `SELECT`, err: `found EOF, expected identifier, string, number, bool at line 1, char 8`}, @@ -798,6 +818,7 @@ func TestParser_ParseStatement(t *testing.T) { {s: `SHOW RETENTION`, err: `found EOF, expected POLICIES at line 1, char 16`}, {s: `SHOW RETENTION POLICIES`, err: `found EOF, expected identifier at line 1, char 25`}, {s: `SHOW FOO`, err: `found FOO, expected CONTINUOUS, DATABASES, FIELD, MEASUREMENTS, RETENTION, SERIES, SERVERS, TAG, USERS at line 1, char 6`}, + {s: `SHOW STATS ON`, err: `found EOF, expected string at line 1, char 15`}, {s: `DROP CONTINUOUS`, err: `found EOF, expected QUERY at line 1, char 17`}, {s: `DROP CONTINUOUS QUERY`, err: `found EOF, expected identifier at line 1, char 23`}, {s: `CREATE CONTINUOUS`, err: `found EOF, expected QUERY at line 1, char 19`}, diff --git a/influxql/token.go b/influxql/token.go index a60457d651e..6827c8c6ebc 100644 --- a/influxql/token.go +++ b/influxql/token.go @@ -106,6 +106,7 @@ const ( SERVERS SHOW SLIMIT + STATS SOFFSET TAG TO @@ -208,6 +209,7 @@ var tokens = [...]string{ SHOW: "SHOW", SLIMIT: "SLIMIT", SOFFSET: "SOFFSET", + STATS: "STATS", TAG: "TAG", TO: "TO", USER: "USER", diff --git a/server.go b/server.go index 2db0d0c145e..985d15772e5 100644 --- a/server.go +++ b/server.go @@ -72,6 +72,7 @@ type Server struct { shards map[uint64]*Shard // shards by shard id + stats *Stats Logger *log.Logger WriteTrace bool // Detailed logging of write path @@ -104,6 +105,7 @@ func NewServer() *Server { users: make(map[string]*User), shards: make(map[uint64]*Shard), + stats: NewStats("server"), Logger: log.New(os.Stderr, "[server] ", log.LstdFlags), } // Server will always return with authentication enabled. @@ -321,6 +323,32 @@ func (s *Server) load() error { }) } +func (s *Server) StartSelfMonitoring(database, retention string, interval time.Duration) error { + if interval == 0 { + return fmt.Errorf("statistics check interval must be non-zero") + } + + go func() { + tick := time.NewTicker(interval) + for { + <-tick.C + + // Create the data point and write it. + point := Point{ + Name: s.stats.Name(), + Tags: map[string]string{"raftID": strconv.FormatUint(s.id, 10)}, + Fields: make(map[string]interface{}), + } + s.stats.Walk(func(k string, v int64) { + point.Fields[k] = int(v) + }) + s.WriteSeries(database, retention, []Point{point}) + } + }() + + return nil +} + // StartRetentionPolicyEnforcement launches retention policy enforcement. func (s *Server) StartRetentionPolicyEnforcement(checkInterval time.Duration) error { if checkInterval == 0 { @@ -458,6 +486,8 @@ func (s *Server) Client() MessagingClient { // This function waits until the message has been processed by the server. // Returns the broker log index of the message or an error. func (s *Server) broadcast(typ messaging.MessageType, c interface{}) (uint64, error) { + s.stats.Inc("broadcastMessageTx") + // Encode the command. data, err := json.Marshal(c) if err != nil { @@ -764,7 +794,12 @@ func (s *Server) CreateDatabaseIfNotExists(name string) error { if s.DatabaseExists(name) { return nil } - return s.CreateDatabase(name) + + // Small chance database could have been created even though the check above said it didn't. + if err := s.CreateDatabase(name); err != nil && err != ErrDatabaseExists { + return err + } + return nil } func (s *Server) applyCreateDatabase(m *messaging.Message) (err error) { @@ -943,6 +978,7 @@ func (s *Server) applyCreateShardGroupIfNotExists(m *messaging.Message) (err err nodeIndex++ } } + s.stats.Add("shardsCreated", int64(len(g.Shards))) // Retention policy has a new shard group, so update the policy. rp.shardGroups = append(rp.shardGroups, g) @@ -1023,6 +1059,7 @@ func (s *Server) applyDeleteShardGroup(m *messaging.Message) (err error) { // Remove from metastore. rp.removeShardGroupByID(c.ID) err = s.meta.mustUpdate(m.Index, func(tx *metatx) error { + s.stats.Add("shardsDeleted", int64(len(g.Shards))) return tx.saveDatabase(db) }) return @@ -1274,6 +1311,13 @@ func (s *Server) RetentionPolicies(database string) ([]*RetentionPolicy, error) return a, nil } +// RetentionPolicyExists returns true if a retention policy exists for a given database. +func (s *Server) RetentionPolicyExists(database, retention string) bool { + s.mu.RLock() + defer s.mu.RUnlock() + return s.DatabaseExists(database) && s.databases[database].policies[retention] != nil +} + // CreateRetentionPolicy creates a retention policy for a database. func (s *Server) CreateRetentionPolicy(database string, rp *RetentionPolicy) error { // Enforce duration of at least retentionPolicyMinDuration @@ -1292,6 +1336,18 @@ func (s *Server) CreateRetentionPolicy(database string, rp *RetentionPolicy) err return err } +// CreateRetentionPolicy creates a retention policy for a database. +func (s *Server) CreateRetentionPolicyIfNotExists(database string, rp *RetentionPolicy) error { + // Ensure retention policy exists. + if !s.RetentionPolicyExists(database, rp.Name) { + // Small chance retention policy could be created after it didn't exist when checked. + if err := s.CreateRetentionPolicy(database, rp); err != nil && err != ErrRetentionPolicyExists { + return err + } + } + return nil +} + func calculateShardGroupDuration(d time.Duration) time.Duration { const ( day = time.Hour * 24 @@ -1508,7 +1564,15 @@ type Point struct { // WriteSeries writes series data to the database. // Returns the messaging index the data was written to. -func (s *Server) WriteSeries(database, retentionPolicy string, points []Point) (uint64, error) { +func (s *Server) WriteSeries(database, retentionPolicy string, points []Point) (idx uint64, err error) { + s.stats.Inc("batchWriteRx") + s.stats.Add("pointWriteRx", int64(len(points))) + defer func() { + if err != nil { + s.stats.Inc("batchWriteRxError") + } + }() + if s.WriteTrace { log.Printf("received write for database '%s', retention policy '%s', with %d points", database, retentionPolicy, len(points)) @@ -1613,7 +1677,6 @@ func (s *Server) WriteSeries(database, retentionPolicy string, points []Point) ( } // Write data for each shard to the Broker. - var err error var maxIndex uint64 for i, d := range shardData { assert(len(d) > 0, "raw series data required: topic=%d", i) @@ -1626,6 +1689,7 @@ func (s *Server) WriteSeries(database, retentionPolicy string, points []Point) ( if err != nil { return maxIndex, err } + s.stats.Inc("writeSeriesMessageTx") if index > maxIndex { maxIndex = index } @@ -1634,7 +1698,7 @@ func (s *Server) WriteSeries(database, retentionPolicy string, points []Point) ( } } - return maxIndex, err + return maxIndex, nil } // createMeasurementsIfNotExists walks the "points" and ensures that all new Series are created, and all @@ -1937,6 +2001,8 @@ func (s *Server) ExecuteQuery(q *influxql.Query, database string, user *User) Re res = s.executeShowTagValuesStatement(stmt, database, user) case *influxql.ShowFieldKeysStatement: res = s.executeShowFieldKeysStatement(stmt, database, user) + case *influxql.ShowStatsStatement: + res = s.executeShowStatsStatement(stmt, user) case *influxql.GrantStatement: res = s.executeGrantStatement(stmt, user) case *influxql.RevokeStatement: @@ -2442,6 +2508,17 @@ func (s *Server) executeShowContinuousQueriesStatement(stmt *influxql.ShowContin return &Result{Series: rows} } +func (s *Server) executeShowStatsStatement(stmt *influxql.ShowStatsStatement, user *User) *Result { + row := &influxql.Row{Columns: []string{}} + row.Name = s.stats.Name() + s.stats.Walk(func(k string, v int64) { + row.Columns = append(row.Columns, k) + row.Values = append(row.Values, []interface{}{v}) + }) + + return &Result{Series: []*influxql.Row{row}} +} + // filterMeasurementsByExpr filters a list of measurements by a tags expression. func filterMeasurementsByExpr(measurements Measurements, expr influxql.Expr) (Measurements, error) { // Create a list to hold result measurements. @@ -2816,6 +2893,7 @@ func (s *Server) processor(conn MessagingConn, done chan struct{}) { // All messages must be processed under lock. func() { + s.stats.Inc("broadcastMessageRx") s.mu.Lock() defer s.mu.Unlock() @@ -3279,6 +3357,7 @@ func (s *Server) shouldRunContinuousQuery(cq *ContinuousQuery) bool { // runContinuousQuery will execute a continuous query // TODO: make this fan out to the cluster instead of running all the queries on this single data node func (s *Server) runContinuousQuery(cq *ContinuousQuery) { + s.stats.Inc("continuousQueryExecuted") cq.mu.Lock() defer cq.mu.Unlock() diff --git a/stats.go b/stats.go new file mode 100644 index 00000000000..984f6af92ab --- /dev/null +++ b/stats.go @@ -0,0 +1,114 @@ +package influxdb + +import ( + "sync" +) + +// Int representes a 64-bit signed integer which can be updated atomically. +type Int struct { + mu sync.RWMutex + i int64 +} + +// NewInt returns a new Int +func NewInt(v int64) *Int { + return &Int{i: v} +} + +// Add atomically adds the given delta to the Int. +func (i *Int) Add(delta int64) { + i.mu.Lock() + defer i.mu.Unlock() + i.i += delta + +} + +// Stats represents a collection of metrics, as key-value pairs. +type Stats struct { + name string + m map[string]*Int + mu sync.RWMutex +} + +// NewStats returns a Stats object with the given name. +func NewStats(name string) *Stats { + return &Stats{ + name: name, + m: make(map[string]*Int), + } +} + +// Add adds delta to the stat indiciated by key. +func (s *Stats) Add(key string, delta int64) { + s.mu.RLock() + i, ok := s.m[key] + s.mu.RUnlock() + if !ok { + // check again under the write lock + s.mu.Lock() + i, ok = s.m[key] + if !ok { + i = new(Int) + s.m[key] = i + } + s.mu.Unlock() + } + + i.Add(delta) +} + +// Inc simply increments the given key by 1. +func (s *Stats) Inc(key string) { + s.Add(key, 1) +} + +// Get returns a value for a given key. +func (s *Stats) Get(key string) int64 { + s.mu.RLock() + defer s.mu.RUnlock() + return s.m[key].i +} + +// Set sets a value for the given key. +func (s *Stats) Set(key string, v int64) { + s.mu.Lock() + defer s.mu.Unlock() + s.m[key] = NewInt(v) +} + +// Name returns the name of the Stats object. +func (s *Stats) Name() string { + return s.name +} + +// Walk calls f for each entry in the stats. The stats are locked +// during the walk but existing entries may be concurrently updated. +func (s *Stats) Walk(f func(string, int64)) { + s.mu.RLock() + defer s.mu.RUnlock() + + for k, v := range s.m { + f(k, v.i) + } +} + +// Diff returns the difference between two sets of stats. The result is undefined +// if the two Stats objects do not contain the same keys. +func (s *Stats) Diff(other *Stats) *Stats { + diff := NewStats(s.name) + s.Walk(func(k string, v int64) { + diff.Set(k, v-other.Get(k)) + }) + return diff +} + +// Snapshot returns a copy of the stats object. Addition and removal of stats keys +// is blocked during the created of the snapshot, but existing entries may be +// concurrently updated. +func (s *Stats) Snapshot() *Stats { + snap := NewStats(s.name) + s.Walk(func(k string, v int64) { + snap.Set(k, s.m[k].i) + }) + return snap +} diff --git a/stats_test.go b/stats_test.go new file mode 100644 index 00000000000..57fe3380a84 --- /dev/null +++ b/stats_test.go @@ -0,0 +1,88 @@ +package influxdb_test + +import ( + "testing" + + "github.com/influxdb/influxdb" +) + +func TestStats_SetAndGet(t *testing.T) { + s := influxdb.NewStats("foo") + + s.Set("a", 100) + if s.Get("a") != 100 { + t.Fatalf("stats set failed, expected 100, got %d", s.Get("a")) + } +} + +func TestStats_Add(t *testing.T) { + s := influxdb.NewStats("foo") + + s.Add("a", 200) + if s.Get("a") != 200 { + t.Fatalf("stats set failed, expected 200, got %d", s.Get("a")) + } +} + +func TestStats_Inc(t *testing.T) { + s := influxdb.NewStats("foo") + + s.Set("a", 100) + s.Inc("a") + if s.Get("a") != 101 { + t.Fatalf("stats Inc failed, expected 101, got %d", s.Get("a")) + } + + s.Inc("b") + if s.Get("b") != 1 { + t.Fatalf("stats Inc failed, expected 1, got %d", s.Get("b")) + } +} + +func TestStats_AddNegative(t *testing.T) { + s := influxdb.NewStats("foo") + + s.Add("a", -200) + if s.Get("a") != -200 { + t.Fatalf("stats set failed, expected -200, got %d", s.Get("a")) + } +} + +func TestStats_SetAndAdd(t *testing.T) { + s := influxdb.NewStats("foo") + + s.Set("a", 100) + s.Add("a", 200) + if s.Get("a") != 300 { + t.Fatalf("stats set failed, expected 300, got %d", s.Get("a")) + } +} + +func TestStats_Diff(t *testing.T) { + foo := influxdb.NewStats("server") + bar := influxdb.NewStats("server") + + foo.Set("a", 100) + foo.Set("b", 600) + bar.Set("a", 450) + bar.Set("b", 525) + + qux := bar.Diff(foo) + if qux.Name() != "server" { + t.Fatalf("stats diff has unexpected name: %s", qux.Name()) + } + if qux.Get("a") != 350 || qux.Get("b") != -75 { + t.Fatalf("stats diff returned unexpected result: %v", qux) + } +} + +func TestStats_Snapshot(t *testing.T) { + foo := influxdb.NewStats("server") + foo.Set("a", 100) + foo.Set("b", 600) + + bar := foo.Snapshot() + if bar.Name() != "server" || bar.Get("a") != 100 || bar.Get("b") != 600 { + t.Fatalf("stats snapshot returned unexpected result: %s", bar) + } +}