diff --git a/CHANGELOG.md b/CHANGELOG.md index f9c245330cb..41fdb7ea74f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ With this release InfluxDB is moving to Go 1.5. ### Features - [#3863](https://github.com/influxdb/influxdb/pull/3863): Move to Go 1.5 - [#3892](https://github.com/influxdb/influxdb/pull/3892): Support IF NOT EXISTS for CREATE DATABASE +- [#3916](https://github.com/influxdb/influxdb/pull/3916): New statistics and diagnostics support. Graphite first to be instrumented. ### Bugfixes - [#3804](https://github.com/influxdb/influxdb/pull/3804): init.d script fixes, fixes issue 3803. diff --git a/cmd/influxd/run/config.go b/cmd/influxd/run/config.go index 77515ca604a..322b7bf5cbb 100644 --- a/cmd/influxd/run/config.go +++ b/cmd/influxd/run/config.go @@ -13,13 +13,13 @@ import ( "github.com/influxdb/influxdb/cluster" "github.com/influxdb/influxdb/meta" + "github.com/influxdb/influxdb/monitor" "github.com/influxdb/influxdb/services/admin" "github.com/influxdb/influxdb/services/collectd" "github.com/influxdb/influxdb/services/continuous_querier" "github.com/influxdb/influxdb/services/graphite" "github.com/influxdb/influxdb/services/hh" "github.com/influxdb/influxdb/services/httpd" - "github.com/influxdb/influxdb/services/monitor" "github.com/influxdb/influxdb/services/opentsdb" "github.com/influxdb/influxdb/services/precreator" "github.com/influxdb/influxdb/services/retention" @@ -36,6 +36,7 @@ type Config struct { Precreator precreator.Config `toml:"shard-precreation"` Admin admin.Config `toml:"admin"` + Monitor monitor.Config `toml:"monitor"` HTTPD httpd.Config `toml:"http"` Graphites []graphite.Config `toml:"graphite"` Collectd collectd.Config `toml:"collectd"` @@ -43,7 +44,6 @@ type Config struct { UDPs []udp.Config `toml:"udp"` // Snapshot SnapshotConfig `toml:"snapshot"` - Monitoring monitor.Config `toml:"monitoring"` ContinuousQuery continuous_querier.Config `toml:"continuous_queries"` HintedHandoff hh.Config `toml:"hinted-handoff"` @@ -61,12 +61,12 @@ func NewConfig() *Config { c.Precreator = precreator.NewConfig() c.Admin = admin.NewConfig() + c.Monitor = monitor.NewConfig() c.HTTPD = httpd.NewConfig() c.Collectd = collectd.NewConfig() c.OpenTSDB = opentsdb.NewConfig() c.Graphites = append(c.Graphites, graphite.NewConfig()) - c.Monitoring = monitor.NewConfig() c.ContinuousQuery = continuous_querier.NewConfig() c.Retention = retention.NewConfig() c.HintedHandoff = hh.NewConfig() @@ -95,7 +95,6 @@ func NewDemoConfig() (*Config, error) { c.Data.WALDir = filepath.Join(homeDir, ".influxdb/wal") c.Admin.Enabled = true - c.Monitoring.Enabled = false return c, nil } diff --git a/cmd/influxd/run/config_test.go b/cmd/influxd/run/config_test.go index ffcaf253321..7ecbb4f658c 100644 --- a/cmd/influxd/run/config_test.go +++ b/cmd/influxd/run/config_test.go @@ -72,8 +72,6 @@ enabled = true t.Fatalf("unexpected opentsdb bind address: %s", c.OpenTSDB.BindAddress) } else if c.UDPs[0].BindAddress != ":4444" { t.Fatalf("unexpected udp bind address: %s", c.UDPs[0].BindAddress) - } else if c.Monitoring.Enabled != true { - t.Fatalf("unexpected monitoring enabled: %v", c.Monitoring.Enabled) } else if c.ContinuousQuery.Enabled != true { t.Fatalf("unexpected continuous query enabled: %v", c.ContinuousQuery.Enabled) } diff --git a/cmd/influxd/run/server.go b/cmd/influxd/run/server.go index ab65817d72c..782fd6abcb8 100644 --- a/cmd/influxd/run/server.go +++ b/cmd/influxd/run/server.go @@ -14,6 +14,7 @@ import ( "github.com/influxdb/influxdb/cluster" "github.com/influxdb/influxdb/meta" + "github.com/influxdb/influxdb/monitor" "github.com/influxdb/influxdb/services/admin" "github.com/influxdb/influxdb/services/collectd" "github.com/influxdb/influxdb/services/continuous_querier" @@ -57,6 +58,8 @@ type Server struct { ClusterService *cluster.Service SnapshotterService *snapshotter.Service + MonitorService *monitor.Service + // Server reporting reportingDisabled bool @@ -85,6 +88,16 @@ func NewServer(c *Config, version string) (*Server, error) { reportingDisabled: c.ReportingDisabled, } + // Start the monitor service. + clusterID, err := s.MetaStore.ClusterID() + if err != nil { + return nil, err + } + s.MonitorService = monitor.NewService(c.Monitor) + if err := s.MonitorService.Open(clusterID, s.MetaStore.NodeID(), s.Hostname); err != nil { + return nil, err + } + // Copy TSDB configuration. s.TSDBStore.EngineOptions.MaxWALSize = c.Data.MaxWALSize s.TSDBStore.EngineOptions.WALFlushInterval = time.Duration(c.Data.WALFlushInterval) @@ -100,6 +113,7 @@ func NewServer(c *Config, version string) (*Server, error) { s.QueryExecutor = tsdb.NewQueryExecutor(s.TSDBStore) s.QueryExecutor.MetaStore = s.MetaStore s.QueryExecutor.MetaStatementExecutor = &meta.StatementExecutor{Store: s.MetaStore} + s.QueryExecutor.MonitorStatementExecutor = s.MonitorService s.QueryExecutor.ShardMapper = s.ShardMapper // Set the shard writer @@ -230,6 +244,7 @@ func (s *Server) appendGraphiteService(c graphite.Config) error { srv.PointsWriter = s.PointsWriter srv.MetaStore = s.MetaStore + srv.MonitorService = s.MonitorService s.Services = append(s.Services, srv) return nil } diff --git a/etc/config.sample.toml b/etc/config.sample.toml index 9614277aba5..5910533a550 100644 --- a/etc/config.sample.toml +++ b/etc/config.sample.toml @@ -88,6 +88,16 @@ reporting-disabled = false enabled = true check-interval = "10m" +### +### Controls the system self-monitoring, statistics, diagnostics, and expvar data. +### + +[monitor] + store-enabled = true # Whether to record statistics in an InfluxDB system + store-database = "_internal" # The destination database for recorded statistics + store-interval = "1m" # The interval at which to record statistics + store-address = "http://127.0.0.1" # The protocol and host for the recorded data + ### ### [admin] ### @@ -207,14 +217,6 @@ reporting-disabled = false # batch-size = 1000 # will flush if this many points get buffered # batch-timeout = "1s" # will flush at least this often even if we haven't hit buffer limit -### -### [monitoring] -### - -[monitoring] - enabled = true - write-interval = "24h" - ### ### [continuous_queries] ### diff --git a/meta/statement_executor.go b/meta/statement_executor.go index 70e019afb98..291010ab316 100644 --- a/meta/statement_executor.go +++ b/meta/statement_executor.go @@ -80,8 +80,6 @@ func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement) *influxql. return e.executeDropContinuousQueryStatement(stmt) case *influxql.ShowContinuousQueriesStatement: return e.executeShowContinuousQueriesStatement(stmt) - case *influxql.ShowStatsStatement: - return e.executeShowStatsStatement(stmt) default: panic(fmt.Sprintf("unsupported statement type: %T", stmt)) } @@ -283,7 +281,3 @@ func (e *StatementExecutor) executeShowContinuousQueriesStatement(stmt *influxql } return &influxql.Result{Series: rows} } - -func (e *StatementExecutor) executeShowStatsStatement(stmt *influxql.ShowStatsStatement) *influxql.Result { - return &influxql.Result{Err: fmt.Errorf("SHOW STATS is not implemented yet")} -} diff --git a/monitor/README.md b/monitor/README.md new file mode 100644 index 00000000000..58898b294b3 --- /dev/null +++ b/monitor/README.md @@ -0,0 +1,49 @@ +# System Monitoring +_System Monitoring_ means all statistical and diagnostic information made availabe to the user of InfluxDB system, about the system itself. Its purpose is to assist with troubleshooting and performance analysis. + +## Supported Commands + + * `SHOW STATS` + * `SHOW DIAGNOSTICS` + +If statistical information is also written to an InfluxDB system, the data will also be queryable by the InfluxQL query language. + +## Statistics vs. Diagnostics +A distinction between _statistics_ and _diagnostics_ is made for the purposes of monitoring. Generally a statistical quality is something that is being counted, and for which it makes sense to store for historical analysis. Diagnostic information is not necessarily numerical, and may not make sense to store. + +An example of statistical information would be the number of points received over UDP, or the number of queries executed. Examples of diagnostic information would be a list of current Graphite TCP connections, the version of InfluxDB, or the uptime of the process. + +## Design and Implementation + +A new module named `monitor` supports all statistics and diagnostic functionality. This includes: + + * Allowing other modules to register statistics and diagnostics information, allowing it to be accessed on demand by the `monitor` module. + * Serving the statistics and diagnostic information to the user, in response to commands such as `SHOW DIAGNOSTICS`. + * Expose standard Go runtime information such as garbage collection statistics. + * Make all collected expvar data via HTTP, for collection by 3rd-party tools. + * Writing the statistical information to an InfluxDB system, for historical analysis. This may be the same system generating the statistical information, but it does not have to be. Information is written used the Line Protocol. + +To register with `monitor`, a module must implement the following interface: + +``` +type Client interface { + Statistics() (map[string]interface{}, error) + Diagnostics() (map[string]interface{}, error) +} +``` + +The module then calls `Register(name string, tags map[string]string, client Client)`. `name` is the Measurement name that will be associated with the statistics. `tags` will be the tags, though an empty map is acceptable. `client` is the module which implements the `Client` interface. + +### expvar +Statistical information is gathered by each package using [expvar](https://golang.org/pkg/expvar). Each package registers a map using its package name. + +Due to the nature of `expvar`, statistical information is reset to its initial state when a server is restarted. + +## Configuration +The `monitor` module will allow the following configuration: + + * Whether to write statistical and diagnostic information to an InfluxDB system. This is enabled by default. + * The name of the database to where this information should be written. Defaults to `_internal`. The information is written to the default retention policy for the given database. + * The name of the retention policy, along with full configuration control of the retention policy. + * The address and port of the InfluxDB system. This will default to the system generating the data. + * The rate at which this information should be written. The maximum rate will be once a second. diff --git a/monitor/config.go b/monitor/config.go new file mode 100644 index 00000000000..061a472d5ae --- /dev/null +++ b/monitor/config.go @@ -0,0 +1,40 @@ +package monitor + +import ( + "time" + + "github.com/influxdb/influxdb/toml" +) + +const ( + // DefaultStoreEnabled is whether the system writes gathered information in + // an InfluxDB system for historical analysis. + DefaultStoreEnabled = true + + // DefaultStoreDatabase is the name of the database where gathered information is written + DefaultStoreDatabase = "_internal" + + // DefaultStoreInterval is the period between storing gathered information. + DefaultStoreInterval = time.Minute + + // DefaultStoreAddress is the destination system for gathered information. + DefaultStoreAddress = "127.0.0.1:8086" +) + +// Config represents the configuration for the monitor service. +type Config struct { + StoreEnabled bool `toml:"store-enabled"` + StoreDatabase string `toml:"store-database"` + StoreInterval toml.Duration `toml:"store-interval"` + StoreAddress string `toml:"store-address"` +} + +// NewConfig returns an instance of Config with defaults. +func NewConfig() Config { + return Config{ + StoreEnabled: false, + StoreDatabase: DefaultStoreDatabase, + StoreInterval: toml.Duration(DefaultStoreInterval), + StoreAddress: DefaultStoreAddress, + } +} diff --git a/monitor/config_test.go b/monitor/config_test.go new file mode 100644 index 00000000000..0626a7cfcae --- /dev/null +++ b/monitor/config_test.go @@ -0,0 +1,33 @@ +package monitor_test + +import ( + "testing" + "time" + + "github.com/BurntSushi/toml" + "github.com/influxdb/influxdb/monitor" +) + +func TestConfig_Parse(t *testing.T) { + // Parse configuration. + var c monitor.Config + if _, err := toml.Decode(` +store-enabled=true +store-database="the_db" +store-interval="10m" +store-address="server1" +`, &c); err != nil { + t.Fatal(err) + } + + // Validate configuration. + if !c.StoreEnabled { + t.Fatalf("unexpected store-enabled: %v", c.StoreEnabled) + } else if c.StoreDatabase != "the_db" { + t.Fatalf("unexpected store-database: %s", c.StoreDatabase) + } else if time.Duration(c.StoreInterval) != 10*time.Minute { + t.Fatalf("unexpected store-interval: %s", c.StoreInterval) + } else if c.StoreAddress != "server1" { + t.Fatalf("unexpected store-address: %s", c.StoreAddress) + } +} diff --git a/monitor/go_runtime.go b/monitor/go_runtime.go new file mode 100644 index 00000000000..ce0ef8bc292 --- /dev/null +++ b/monitor/go_runtime.go @@ -0,0 +1,37 @@ +package monitor + +import ( + "runtime" +) + +// goRuntime captures Go runtime statistics and implements the monitor client interface +type goRuntime struct{} + +// Statistics returns the statistics for the goRuntime type +func (g *goRuntime) Statistics() (map[string]interface{}, error) { + var m runtime.MemStats + runtime.ReadMemStats(&m) + + return map[string]interface{}{ + "Alloc": int64(m.Alloc), + "TotalAlloc": int64(m.TotalAlloc), + "Sys": int64(m.Sys), + "Lookups": int64(m.Lookups), + "Mallocs": int64(m.Mallocs), + "Frees": int64(m.Frees), + "HeapAlloc": int64(m.HeapAlloc), + "HeapSys": int64(m.HeapSys), + "HeapIdle": int64(m.HeapIdle), + "HeapInUse": int64(m.HeapInuse), + "HeapReleased": int64(m.HeapReleased), + "HeapObjects": int64(m.HeapObjects), + "PauseTotalNs": int64(m.PauseTotalNs), + "NumGC": int64(m.NumGC), + "NumGoroutine": int64(runtime.NumGoroutine()), + }, nil +} + +// Diagnostics returns the statistics for the goRuntime type +func (g *goRuntime) Diagnostics() (map[string]interface{}, error) { + return nil, nil +} diff --git a/monitor/service.go b/monitor/service.go new file mode 100644 index 00000000000..26381e06980 --- /dev/null +++ b/monitor/service.go @@ -0,0 +1,294 @@ +package monitor + +import ( + "expvar" + "fmt" + "log" + "net/http" + "net/url" + "os" + "sort" + "strconv" + "sync" + "time" + + "github.com/influxdb/influxdb/influxql" +) + +// Client is the interface modules must implement if they wish to register with monitor. +type Client interface { + Statistics() (map[string]interface{}, error) + Diagnostics() (map[string]interface{}, error) +} + +// Service represents an instance of the monitor service. +type Service struct { + wg sync.WaitGroup + done chan struct{} + mu sync.Mutex + registrations []*clientWithMeta + + hostname string + clusterID uint64 + nodeID uint64 + + storeEnabled bool + storeDatabase string + storeAddress string + storeInterval time.Duration + + Logger *log.Logger +} + +// NewService returns a new instance of the monitor service. +func NewService(c Config) *Service { + return &Service{ + registrations: make([]*clientWithMeta, 0), + storeEnabled: c.StoreEnabled, + storeDatabase: c.StoreDatabase, + storeAddress: c.StoreAddress, + storeInterval: time.Duration(c.StoreInterval), + Logger: log.New(os.Stderr, "[monitor] ", log.LstdFlags), + } +} + +// Open opens the monitoring service, using the given clusterID, node ID, and hostname +// for identification purposes. +func (s *Service) Open(clusterID, nodeID uint64, hostname string) error { + s.Logger.Printf("starting monitor service for cluster %d, host %s", clusterID, hostname) + s.clusterID = clusterID + s.nodeID = nodeID + s.hostname = hostname + + // Self-register Go runtime stats. + s.Register("runtime", nil, &goRuntime{}) + + // If enabled, record stats in a InfluxDB system. + if s.storeEnabled { + s.Logger.Printf("storing in %s, database '%s', interval %s", + s.storeAddress, s.storeDatabase, s.storeInterval) + + s.Logger.Printf("ensuring database %s exists on %s", s.storeDatabase, s.storeAddress) + if err := ensureDatabaseExists(s.storeAddress, s.storeDatabase); err != nil { + return err + } + + // Start periodic writes to system. + s.wg.Add(1) + go s.storeStatistics() + } + + return nil +} + +// Close closes the monitor service. +func (s *Service) Close() { + s.Logger.Println("shutting down monitor service") + close(s.done) + s.wg.Wait() + s.done = nil +} + +// SetLogger sets the internal logger to the logger passed in. +func (s *Service) SetLogger(l *log.Logger) { + s.Logger = l +} + +// Register registers a client with the given name and tags. +func (s *Service) Register(name string, tags map[string]string, client Client) error { + s.mu.Lock() + defer s.mu.Unlock() + c := &clientWithMeta{ + Client: client, + name: name, + tags: tags, + } + s.registrations = append(s.registrations, c) + s.Logger.Printf(`'%s:%v' registered for monitoring`, name, tags) + return nil +} + +// ExecuteStatement executes monitor-related query statements. +func (s *Service) ExecuteStatement(stmt influxql.Statement) *influxql.Result { + switch stmt := stmt.(type) { + case *influxql.ShowStatsStatement: + return s.executeShowStatistics(stmt) + default: + panic(fmt.Sprintf("unsupported statement type: %T", stmt)) + } +} + +// executeShowStatistics returns the statistics of the registered monitor client in +// the standard form expected by users of the InfluxDB system. +func (s *Service) executeShowStatistics(q *influxql.ShowStatsStatement) *influxql.Result { + stats, _ := s.statistics() + rows := make([]*influxql.Row, len(stats)) + + for n, stat := range stats { + row := &influxql.Row{} + values := make([]interface{}, 0, len(stat.Tags)+len(stat.Values)) + + row.Columns = append(row.Columns, "name") + values = append(values, stat.Name) + + for _, k := range stat.tagNames() { + row.Columns = append(row.Columns, k) + values = append(values, stat.Tags[k]) + } + for _, k := range stat.valueNames() { + row.Columns = append(row.Columns, k) + values = append(values, stat.Values[k]) + } + row.Values = [][]interface{}{values} + rows[n] = row + } + return &influxql.Result{Series: rows} +} + +// statistics returns the combined statistics for all registered clients. +func (s *Service) statistics() ([]*statistic, error) { + s.mu.Lock() + defer s.mu.Unlock() + + statistics := make([]*statistic, 0, len(s.registrations)) + for _, r := range s.registrations { + stats, err := r.Client.Statistics() + if err != nil { + continue + } + + // If a registered client has no field data, don't include it in the results + if len(stats) == 0 { + continue + } + + statistics = append(statistics, newStatistic(r.name, r.tags, stats)) + } + return statistics, nil +} + +// storeStatistics writes the statistics to an InfluxDB system. +func (s *Service) storeStatistics() { + // XXX add tags such as local hostname and cluster ID + //a.Tags["clusterID"] = strconv.FormatUint(s.clusterID, 10) + //a.Tags["nodeID"] = strconv.FormatUint(s.nodeID, 10) + //a.Tags["hostname"] = s.hostname + defer s.wg.Done() + + tick := time.NewTicker(s.storeInterval) + defer tick.Stop() + for { + select { + case <-tick.C: + // Write stats here. + case <-s.done: + s.Logger.Printf("terminating storage of statistics to %s", s.storeAddress) + return + } + + } +} + +// statistic represents the information returned by a single monitor client. +type statistic struct { + Name string + Tags map[string]string + Values map[string]interface{} +} + +// newStatistic returns a new statistic object. It ensures that tags are always non-nil. +func newStatistic(name string, tags map[string]string, values map[string]interface{}) *statistic { + a := tags + if a == nil { + a = make(map[string]string) + } + + return &statistic{ + Name: name, + Tags: a, + Values: values, + } +} + +// tagNames returns a sorted list of the tag names, if any. +func (s *statistic) tagNames() []string { + a := make([]string, 0, len(s.Tags)) + for k, _ := range s.Tags { + a = append(a, k) + } + sort.Strings(a) + return a +} + +// valueNames returns a sorted list of the value names, if any. +func (s *statistic) valueNames() []string { + a := make([]string, 0, len(s.Values)) + for k, _ := range s.Values { + a = append(a, k) + } + sort.Strings(a) + return a +} + +// clientWithMeta wraps a registered client with its associated name and tags. +type clientWithMeta struct { + Client + name string + tags map[string]string +} + +// MonitorClient wraps a *expvar.Map so that it implements the Client interface. It is for +// use by external packages that just record stats in an expvar.Map type. +type MonitorClient struct { + ep *expvar.Map +} + +// NewMonitorClient returns a new MonitorClient using the given expvar.Map. +func NewMonitorClient(ep *expvar.Map) *MonitorClient { + return &MonitorClient{ep: ep} +} + +// Statistics implements the Client interface for a MonitorClient. +func (m MonitorClient) Statistics() (map[string]interface{}, error) { + values := make(map[string]interface{}) + m.ep.Do(func(kv expvar.KeyValue) { + var f interface{} + var err error + switch v := kv.Value.(type) { + case *expvar.Float: + f, err = strconv.ParseFloat(v.String(), 64) + if err != nil { + return + } + case *expvar.Int: + f, err = strconv.ParseUint(v.String(), 10, 64) + if err != nil { + return + } + default: + return + } + values[kv.Key] = f + }) + + return values, nil +} + +// Diagnostics implements the Client interface for a MonitorClient. +func (m MonitorClient) Diagnostics() (map[string]interface{}, error) { + return nil, nil +} + +func ensureDatabaseExists(host, database string) error { + values := url.Values{} + values.Set("q", fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s", database)) + resp, err := http.Get(host + "/query?" + values.Encode()) + if err != nil { + return fmt.Errorf("failed to create monitoring database on %s: %s", host, err.Error()) + } + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("failed to create monitoring database on %s, received code: %d", + host, resp.StatusCode) + } + return nil +} diff --git a/services/graphite/service.go b/services/graphite/service.go index 683c7581130..2cf67399985 100644 --- a/services/graphite/service.go +++ b/services/graphite/service.go @@ -2,6 +2,7 @@ package graphite import ( "bufio" + "expvar" "fmt" "log" "math" @@ -13,6 +14,7 @@ import ( "github.com/influxdb/influxdb/cluster" "github.com/influxdb/influxdb/meta" + "github.com/influxdb/influxdb/monitor" "github.com/influxdb/influxdb/tsdb" ) @@ -21,6 +23,31 @@ const ( leaderWaitTimeout = 30 * time.Second ) +// expvar stats maintained by the Graphite package. +var monitorOnce sync.Once +var statMap = expvar.NewMap("graphite") +var statMapTCP = expvar.NewMap("tcp") +var statMapUDP = expvar.NewMap("udp") + +// Build the graphite expvar hierarchy. +func init() { + statMap.Set("tcp", statMapTCP) + statMap.Set("udp", statMapUDP) +} + +// statistics gathered by the graphite package. +const ( + statPointsReceived = "points_rx" + statBytesReceived = "bytes_rx" + statPointsParseFail = "points_parse_fail" + statPointsUnsupported = "points_unsupported_fail" + statBatchesTrasmitted = "batches_tx" + statPointsTransmitted = "points_tx" + statBatchesTransmitFail = "batches_tx_fail" + statConnectionsActive = "connections_active" + statConnectionsHandled = "connections_handled" +) + type Service struct { bindAddress string database string @@ -32,7 +59,8 @@ type Service struct { batcher *tsdb.PointBatcher parser *Parser - logger *log.Logger + logger *log.Logger + statMap *expvar.Map ln net.Listener addr net.Addr @@ -40,6 +68,9 @@ type Service struct { wg sync.WaitGroup done chan struct{} + MonitorService interface { + Register(name string, tags map[string]string, client monitor.Client) error + } PointsWriter interface { WritePoints(p *cluster.WritePointsRequest) error } @@ -87,6 +118,20 @@ func NewService(c Config) (*Service, error) { func (s *Service) Open() error { s.logger.Printf("Starting graphite service, batch size %d, batch timeout %s", s.batchSize, s.batchTimeout) + // One Graphite service hooks up monitoring for all Graphite functionality. + monitorOnce.Do(func() { + if s.MonitorService == nil { + s.logger.Println("no monitor service available, no monitoring will be performed") + return + } + + t := monitor.NewMonitorClient(statMapTCP) + s.MonitorService.Register("graphite", map[string]string{"proto": "tcp"}, t) + + u := monitor.NewMonitorClient(statMapUDP) + s.MonitorService.Register("graphite", map[string]string{"proto": "udp"}, u) + }) + if err := s.MetaStore.WaitForLeader(leaderWaitTimeout); err != nil { s.logger.Printf("Failed to detect a cluster leader: %s", err.Error()) return err @@ -151,6 +196,9 @@ func (s *Service) openTCPServer() (net.Addr, error) { } s.ln = ln + // Point at the TCP stats. + s.statMap = statMapTCP + s.wg.Add(1) go func() { defer s.wg.Done() @@ -176,6 +224,9 @@ func (s *Service) openTCPServer() (net.Addr, error) { func (s *Service) handleTCPConnection(conn net.Conn) { defer conn.Close() defer s.wg.Done() + defer s.statMap.Add(statConnectionsActive, -1) + s.statMap.Add(statConnectionsActive, 1) + s.statMap.Add(statConnectionsHandled, 1) reader := bufio.NewReader(conn) @@ -189,6 +240,8 @@ func (s *Service) handleTCPConnection(conn net.Conn) { // Trim the buffer, even though there should be no padding line := strings.TrimSpace(string(buf)) + s.statMap.Add(statPointsReceived, 1) + s.statMap.Add(statBytesReceived, int64(len(buf))) s.handleLine(line) } } @@ -205,6 +258,9 @@ func (s *Service) openUDPServer() (net.Addr, error) { return nil, err } + // Point at the UDP stats. + s.statMap = statMapUDP + buf := make([]byte, udpBufferSize) s.wg.Add(1) go func() { @@ -215,9 +271,13 @@ func (s *Service) openUDPServer() (net.Addr, error) { conn.Close() return } - for _, line := range strings.Split(string(buf[:n]), "\n") { + + lines := strings.Split(string(buf[:n]), "\n") + for _, line := range lines { s.handleLine(line) } + s.statMap.Add(statPointsReceived, int64(len(lines))) + s.statMap.Add(statBytesReceived, int64(n)) } }() return conn.LocalAddr(), nil @@ -227,10 +287,12 @@ func (s *Service) handleLine(line string) { if line == "" { return } + // Parse it. point, err := s.parser.Parse(line) if err != nil { s.logger.Printf("unable to parse line: %s", err) + s.statMap.Add(statPointsParseFail, 1) return } @@ -239,6 +301,7 @@ func (s *Service) handleLine(line string) { // Drop NaN and +/-Inf data points since they are not supported values if math.IsNaN(f) || math.IsInf(f, 0) { s.logger.Printf("dropping unsupported value: '%v'", line) + s.statMap.Add(statPointsUnsupported, 1) return } } @@ -257,9 +320,14 @@ func (s *Service) processBatches(batcher *tsdb.PointBatcher) { RetentionPolicy: "", ConsistencyLevel: s.consistencyLevel, Points: batch, - }); err != nil { + }); err == nil { + s.statMap.Add(statBatchesTrasmitted, 1) + s.statMap.Add(statPointsTransmitted, int64(len(batch))) + } else { s.logger.Printf("failed to write point batch to database %q: %s", s.database, err) + s.statMap.Add(statBatchesTransmitFail, 1) } + case <-s.done: return } diff --git a/services/httpd/handler.go b/services/httpd/handler.go index 61684d6f7f9..b1011a72c7d 100644 --- a/services/httpd/handler.go +++ b/services/httpd/handler.go @@ -5,6 +5,7 @@ import ( "compress/gzip" "encoding/json" "errors" + "expvar" "fmt" "io" "io/ioutil" @@ -161,10 +162,11 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { default: pprof.Index(w, r) } - return + } else if strings.HasPrefix(r.URL.Path, "/debug/vars") { + serveExpvar(w, r) + } else { + h.mux.ServeHTTP(w, r) } - - h.mux.ServeHTTP(w, r) } func (h *Handler) serveProcessContinuousQueries(w http.ResponseWriter, r *http.Request, user *meta.UserInfo) { @@ -569,6 +571,21 @@ type Batch struct { Points []Point `json:"points"` } +// serveExpvar serves registered expvar information over HTTP. +func serveExpvar(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json; charset=utf-8") + fmt.Fprintf(w, "{\n") + first := true + expvar.Do(func(kv expvar.KeyValue) { + if !first { + fmt.Fprintf(w, ",\n") + } + first = false + fmt.Fprintf(w, "%q: %s", kv.Key, kv.Value) + }) + fmt.Fprintf(w, "\n}\n") +} + // httpError writes an error to the client in a standard format. func httpError(w http.ResponseWriter, error string, pretty bool, code int) { w.Header().Add("content-type", "application/json") diff --git a/services/monitor/config.go b/services/monitor/config.go deleted file mode 100644 index 1e3798700c6..00000000000 --- a/services/monitor/config.go +++ /dev/null @@ -1,25 +0,0 @@ -package monitor - -import ( - "time" - - "github.com/influxdb/influxdb/toml" -) - -const ( - // DefaultStatisticsWriteInterval is the interval of time between internal stats are written - DefaultStatisticsWriteInterval = 1 * time.Minute -) - -// Config represents a configuration for the monitor. -type Config struct { - Enabled bool `toml:"enabled"` - WriteInterval toml.Duration `toml:"write-interval"` -} - -func NewConfig() Config { - return Config{ - Enabled: false, - WriteInterval: toml.Duration(DefaultStatisticsWriteInterval), - } -} diff --git a/services/monitor/monitor.go b/services/monitor/monitor.go deleted file mode 100644 index 2568e6eb172..00000000000 --- a/services/monitor/monitor.go +++ /dev/null @@ -1,83 +0,0 @@ -package monitor - -// Monitor represents a TSDB monitoring service. -type Monitor struct { - Store interface{} -} - -func (m *Monitor) Open() error { return nil } -func (m *Monitor) Close() error { return nil } - -// StartSelfMonitoring starts a goroutine which monitors the InfluxDB server -// itself and stores the results in the specified database at a given interval. -/* -func (s *Server) StartSelfMonitoring(database, retention string, interval time.Duration) error { - if interval == 0 { - return fmt.Errorf("statistics check interval must be non-zero") - } - - go func() { - tick := time.NewTicker(interval) - for { - <-tick.C - - // Create the batch and tags - tags := map[string]string{"serverID": strconv.FormatUint(s.ID(), 10)} - if h, err := os.Hostname(); err == nil { - tags["host"] = h - } - batch := pointsFromStats(s.stats, tags) - - // Shard-level stats. - tags["shardID"] = strconv.FormatUint(s.id, 10) - s.mu.RLock() - for _, sh := range s.shards { - if !sh.HasDataNodeID(s.id) { - // No stats for non-local shards. - continue - } - batch = append(batch, pointsFromStats(sh.stats, tags)...) - } - s.mu.RUnlock() - - // Server diagnostics. - for _, row := range s.DiagnosticsAsRows() { - points, err := s.convertRowToPoints(row.Name, row) - if err != nil { - s.Logger.Printf("failed to write diagnostic row for %s: %s", row.Name, err.Error()) - continue - } - for _, p := range points { - p.AddTag("serverID", strconv.FormatUint(s.ID(), 10)) - } - batch = append(batch, points...) - } - - s.WriteSeries(database, retention, batch) - } - }() - return nil -} - -// Function for local use turns stats into a slice of points -func pointsFromStats(st *Stats, tags map[string]string) []tsdb.Point { - var points []tsdb.Point - now := time.Now() - st.Walk(func(k string, v int64) { - point := tsdb.NewPoint( - st.name+"_"+k, - make(map[string]string), - map[string]interface{}{"value": int(v)}, - now, - ) - // Specifically create a new map. - for k, v := range tags { - tags[k] = v - point.AddTag(k, v) - } - points = append(points, point) - }) - - return points -} -*/ diff --git a/tsdb/query_executor.go b/tsdb/query_executor.go index d4c9196ed21..07120e74172 100644 --- a/tsdb/query_executor.go +++ b/tsdb/query_executor.go @@ -36,6 +36,11 @@ type QueryExecutor struct { ExecuteStatement(stmt influxql.Statement) *influxql.Result } + // Execute statements relating to statistics and diagnostics. + MonitorStatementExecutor interface { + ExecuteStatement(stmt influxql.Statement) *influxql.Result + } + // Maps shards for queries. ShardMapper interface { CreateMapper(shard meta.ShardInfo, stmt influxql.Statement, chunkSize int) (Mapper, error) @@ -173,6 +178,9 @@ func (q *QueryExecutor) ExecuteQuery(query *influxql.Query, database string, chu case *influxql.DropDatabaseStatement: // TODO: handle this in a cluster res = q.executeDropDatabaseStatement(stmt) + case *influxql.ShowStatsStatement: + // Send monitor-related queries to the monitor service. + res = q.MonitorStatementExecutor.ExecuteStatement(stmt) default: // Delegate all other meta statements to a separate executor. They don't hit tsdb storage. res = q.MetaStatementExecutor.ExecuteStatement(stmt)