From 18d15c3f6ae34cbbfa368dbf3b4a9b5e0288fbe7 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Mon, 31 Aug 2015 20:15:40 -0700 Subject: [PATCH 1/8] Add initial monitor README --- monitor/README.md | 49 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) create mode 100644 monitor/README.md diff --git a/monitor/README.md b/monitor/README.md new file mode 100644 index 00000000000..58898b294b3 --- /dev/null +++ b/monitor/README.md @@ -0,0 +1,49 @@ +# System Monitoring +_System Monitoring_ means all statistical and diagnostic information made availabe to the user of InfluxDB system, about the system itself. Its purpose is to assist with troubleshooting and performance analysis. + +## Supported Commands + + * `SHOW STATS` + * `SHOW DIAGNOSTICS` + +If statistical information is also written to an InfluxDB system, the data will also be queryable by the InfluxQL query language. + +## Statistics vs. Diagnostics +A distinction between _statistics_ and _diagnostics_ is made for the purposes of monitoring. Generally a statistical quality is something that is being counted, and for which it makes sense to store for historical analysis. Diagnostic information is not necessarily numerical, and may not make sense to store. + +An example of statistical information would be the number of points received over UDP, or the number of queries executed. Examples of diagnostic information would be a list of current Graphite TCP connections, the version of InfluxDB, or the uptime of the process. + +## Design and Implementation + +A new module named `monitor` supports all statistics and diagnostic functionality. This includes: + + * Allowing other modules to register statistics and diagnostics information, allowing it to be accessed on demand by the `monitor` module. + * Serving the statistics and diagnostic information to the user, in response to commands such as `SHOW DIAGNOSTICS`. + * Expose standard Go runtime information such as garbage collection statistics. + * Make all collected expvar data via HTTP, for collection by 3rd-party tools. + * Writing the statistical information to an InfluxDB system, for historical analysis. This may be the same system generating the statistical information, but it does not have to be. Information is written used the Line Protocol. + +To register with `monitor`, a module must implement the following interface: + +``` +type Client interface { + Statistics() (map[string]interface{}, error) + Diagnostics() (map[string]interface{}, error) +} +``` + +The module then calls `Register(name string, tags map[string]string, client Client)`. `name` is the Measurement name that will be associated with the statistics. `tags` will be the tags, though an empty map is acceptable. `client` is the module which implements the `Client` interface. + +### expvar +Statistical information is gathered by each package using [expvar](https://golang.org/pkg/expvar). Each package registers a map using its package name. + +Due to the nature of `expvar`, statistical information is reset to its initial state when a server is restarted. + +## Configuration +The `monitor` module will allow the following configuration: + + * Whether to write statistical and diagnostic information to an InfluxDB system. This is enabled by default. + * The name of the database to where this information should be written. Defaults to `_internal`. The information is written to the default retention policy for the given database. + * The name of the retention policy, along with full configuration control of the retention policy. + * The address and port of the InfluxDB system. This will default to the system generating the data. + * The rate at which this information should be written. The maximum rate will be once a second. From 294b685e416e704827f29c56a8a27b3c9cbfdd58 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Mon, 31 Aug 2015 20:17:13 -0700 Subject: [PATCH 2/8] Add new monitor service --- monitor/config.go | 41 ++++++ monitor/config_test.go | 36 +++++ monitor/go_runtime.go | 37 +++++ monitor/service.go | 305 +++++++++++++++++++++++++++++++++++++++++ 4 files changed, 419 insertions(+) create mode 100644 monitor/config.go create mode 100644 monitor/config_test.go create mode 100644 monitor/go_runtime.go create mode 100644 monitor/service.go diff --git a/monitor/config.go b/monitor/config.go new file mode 100644 index 00000000000..324a62dea5e --- /dev/null +++ b/monitor/config.go @@ -0,0 +1,41 @@ +package monitor + +import ( + "time" + + "github.com/influxdb/influxdb/toml" +) + +const ( + // DefaultStoreEnabled is whether the system writes gathered information in + // an InfluxDB system for historical analysis. + DefaultStoreEnabled = true + + // DefaultStoreDatabase is the name of the database where gathered information is written + DefaultStoreDatabase = "_internal" + + // DefaultStoreInterval is the period between storing gathered information. + DefaultStoreInterval = time.Minute + + // DefaultStoreAddress is the destination system for gathered information. + DefaultStoreAddress = "127.0.0.1:8086" +) + +// Config represents the configuration for the monitor service. +type Config struct { + StoreEnabled bool `toml:"store-enabled"` + StoreDatabase string `toml:"store-database"` + StoreInterval toml.Duration `toml:"store-interval"` + StoreAddress string `toml:"store-address"` + ExpvarAddress string `toml:"expvar-address"` +} + +// NewConfig returns an instance of Config with defaults. +func NewConfig() Config { + return Config{ + StoreEnabled: false, + StoreDatabase: DefaultStoreDatabase, + StoreInterval: toml.Duration(DefaultStoreInterval), + StoreAddress: DefaultStoreAddress, + } +} diff --git a/monitor/config_test.go b/monitor/config_test.go new file mode 100644 index 00000000000..d3ec3fda14b --- /dev/null +++ b/monitor/config_test.go @@ -0,0 +1,36 @@ +package monitor_test + +import ( + "testing" + "time" + + "github.com/BurntSushi/toml" + "github.com/influxdb/influxdb/monitor" +) + +func TestConfig_Parse(t *testing.T) { + // Parse configuration. + var c monitor.Config + if _, err := toml.Decode(` +store-enabled=true +store-database="the_db" +store-interval="10m" +store-address="server1" +expvar-address="127.0.0.1:9950" +`, &c); err != nil { + t.Fatal(err) + } + + // Validate configuration. + if !c.StoreEnabled { + t.Fatalf("unexpected store-enabled: %s", c.StoreEnabled) + } else if c.StoreDatabase != "the_db" { + t.Fatalf("unexpected store-database: %s", c.StoreDatabase) + } else if time.Duration(c.StoreInterval) != 10*time.Minute { + t.Fatalf("unexpected store-interval: %s", c.StoreInterval) + } else if c.StoreAddress != "server1" { + t.Fatalf("unexpected store-address: %s", c.StoreAddress) + } else if c.ExpvarAddress != "127.0.0.1:9950" { + t.Fatalf("unexpected expvar-address: %s", c.ExpvarAddress) + } +} diff --git a/monitor/go_runtime.go b/monitor/go_runtime.go new file mode 100644 index 00000000000..ce0ef8bc292 --- /dev/null +++ b/monitor/go_runtime.go @@ -0,0 +1,37 @@ +package monitor + +import ( + "runtime" +) + +// goRuntime captures Go runtime statistics and implements the monitor client interface +type goRuntime struct{} + +// Statistics returns the statistics for the goRuntime type +func (g *goRuntime) Statistics() (map[string]interface{}, error) { + var m runtime.MemStats + runtime.ReadMemStats(&m) + + return map[string]interface{}{ + "Alloc": int64(m.Alloc), + "TotalAlloc": int64(m.TotalAlloc), + "Sys": int64(m.Sys), + "Lookups": int64(m.Lookups), + "Mallocs": int64(m.Mallocs), + "Frees": int64(m.Frees), + "HeapAlloc": int64(m.HeapAlloc), + "HeapSys": int64(m.HeapSys), + "HeapIdle": int64(m.HeapIdle), + "HeapInUse": int64(m.HeapInuse), + "HeapReleased": int64(m.HeapReleased), + "HeapObjects": int64(m.HeapObjects), + "PauseTotalNs": int64(m.PauseTotalNs), + "NumGC": int64(m.NumGC), + "NumGoroutine": int64(runtime.NumGoroutine()), + }, nil +} + +// Diagnostics returns the statistics for the goRuntime type +func (g *goRuntime) Diagnostics() (map[string]interface{}, error) { + return nil, nil +} diff --git a/monitor/service.go b/monitor/service.go new file mode 100644 index 00000000000..2c78e4275f3 --- /dev/null +++ b/monitor/service.go @@ -0,0 +1,305 @@ +package monitor + +import ( + "expvar" + "fmt" + "log" + "net" + "net/http" + "net/url" + "os" + "sort" + "strconv" + "sync" + "time" + + "github.com/influxdb/influxdb/influxql" +) + +// Client is the interface modules must implement if they wish to register with monitor. +type Client interface { + Statistics() (map[string]interface{}, error) + Diagnostics() (map[string]interface{}, error) +} + +// Service represents an instance of the monitor service. +type Service struct { + wg sync.WaitGroup + done chan struct{} + mu sync.Mutex + registrations []*clientWithMeta + + hostname string + clusterID uint64 + nodeID uint64 + + storeEnabled bool + storeDatabase string + storeAddress string + storeInterval time.Duration + + expvarAddress string + + Logger *log.Logger +} + +// NewService returns a new instance of the monitor service. +func NewService(c Config) *Service { + return &Service{ + registrations: make([]*clientWithMeta, 0), + storeEnabled: c.StoreEnabled, + storeDatabase: c.StoreDatabase, + storeAddress: c.StoreAddress, + storeInterval: time.Duration(c.StoreInterval), + expvarAddress: c.ExpvarAddress, + Logger: log.New(os.Stderr, "[monitor] ", log.LstdFlags), + } +} + +// Open opens the monitoring service, using the given clusterID, node ID, and hostname +// for identification purposes. +func (s *Service) Open(clusterID, nodeID uint64, hostname string) error { + s.Logger.Printf("starting monitor service for cluster %d, host %s", clusterID, hostname) + s.clusterID = clusterID + s.nodeID = nodeID + s.hostname = hostname + + // Self-register Go runtime stats. + s.Register("runtime", nil, &goRuntime{}) + + // If enabled, record stats in a InfluxDB system. + if s.storeEnabled { + s.Logger.Printf("storing in %s, database '%s', interval %s", + s.storeAddress, s.storeDatabase, s.storeInterval) + + s.Logger.Printf("ensuring database %s exists on %s", s.storeDatabase, s.storeAddress) + if err := ensureDatabaseExists(s.storeAddress, s.storeDatabase); err != nil { + return err + } + + // Start periodic writes to system. + s.wg.Add(1) + go s.storeStatistics() + } + + // If enabled, expose all expvar data over HTTP. + if s.expvarAddress != "" { + listener, err := net.Listen("tcp", s.expvarAddress) + if err != nil { + return err + } + + go func() { + http.Serve(listener, nil) + }() + s.Logger.Printf("expvar information available on %s", s.expvarAddress) + } + return nil +} + +// Close closes the monitor service. +func (s *Service) Close() { + s.Logger.Println("shutting down monitor service") + close(s.done) + s.wg.Wait() + s.done = nil +} + +// SetLogger sets the internal logger to the logger passed in. +func (s *Service) SetLogger(l *log.Logger) { + s.Logger = l +} + +// Register registers a client with the given name and tags. +func (s *Service) Register(name string, tags map[string]string, client Client) error { + s.mu.Lock() + defer s.mu.Unlock() + c := &clientWithMeta{ + Client: client, + name: name, + tags: tags, + } + s.registrations = append(s.registrations, c) + s.Logger.Printf(`'%s:%v' registered for monitoring`, name, tags) + return nil +} + +// ExecuteStatement executes monitor-related query statements. +func (s *Service) ExecuteStatement(stmt influxql.Statement) *influxql.Result { + switch stmt := stmt.(type) { + case *influxql.ShowStatsStatement: + return s.executeShowStatistics(stmt) + default: + panic(fmt.Sprintf("unsupported statement type: %T", stmt)) + } +} + +// executeShowStatistics returns the statistics of the registered monitor client in +// the standard form expected by users of the InfluxDB system. +func (s *Service) executeShowStatistics(q *influxql.ShowStatsStatement) *influxql.Result { + stats, _ := s.statistics() + rows := make([]*influxql.Row, len(stats)) + + for n, stat := range stats { + row := &influxql.Row{} + values := make([]interface{}, 0, len(stat.Tags)+len(stat.Values)) + + row.Columns = append(row.Columns, "name") + values = append(values, stat.Name) + + for _, k := range stat.tagNames() { + row.Columns = append(row.Columns, k) + values = append(values, stat.Tags[k]) + } + for _, k := range stat.valueNames() { + row.Columns = append(row.Columns, k) + values = append(values, stat.Values[k]) + } + row.Values = [][]interface{}{values} + rows[n] = row + } + return &influxql.Result{Series: rows} +} + +// statistics returns the combined statistics for all registered clients. +func (s *Service) statistics() ([]*statistic, error) { + s.mu.Lock() + defer s.mu.Unlock() + + statistics := make([]*statistic, 0, len(s.registrations)) + for _, r := range s.registrations { + stats, err := r.Client.Statistics() + if err != nil { + continue + } + statistics[i] = newStatistic(r.name, r.tags, stats) + } + return statistics, nil +} + +// storeStatistics writes the statistics to an InfluxDB system. +func (s *Service) storeStatistics() { + // XXX add tags such as local hostname and cluster ID + //a.Tags["clusterID"] = strconv.FormatUint(s.clusterID, 10) + //a.Tags["nodeID"] = strconv.FormatUint(s.nodeID, 10) + //a.Tags["hostname"] = s.hostname + defer s.wg.Done() + + tick := time.NewTicker(s.storeInterval) + defer tick.Stop() + for { + select { + case <-tick.C: + // Write stats here. + case <-s.done: + s.Logger.Printf("terminating storage of statistics to %s", s.storeAddress) + return + } + + } + return +} + +// statistic represents the information returned by a single monitor client. +type statistic struct { + Name string + Tags map[string]string + Values map[string]interface{} +} + +// newStatistic returns a new statistic object. It ensures that tags are always non-nil. +func newStatistic(name string, tags map[string]string, values map[string]interface{}) *statistic { + a := tags + if a == nil { + a = make(map[string]string) + } + + return &statistic{ + Name: name, + Tags: a, + Values: values, + } +} + +// tagNames returns a sorted list of the tag names, if any. +func (s *statistic) tagNames() []string { + a := make([]string, 0, len(s.Tags)) + for k, _ := range s.Tags { + a = append(a, k) + } + sort.Strings(a) + return a +} + +// valueNames returns a sorted list of the value names, if any. +func (s *statistic) valueNames() []string { + a := make([]string, 0, len(s.Values)) + for k, _ := range s.Values { + a = append(a, k) + } + sort.Strings(a) + return a +} + +// clientWithMeta wraps a registered client with its associated name and tags. +type clientWithMeta struct { + Client + name string + tags map[string]string +} + +// MonitorClient wraps a *expvar.Map so that it implements the Client interface. It is for +// use by external packages that just record stats in an expvar.Map type. +type MonitorClient struct { + ep *expvar.Map +} + +// NewMonitorClient returns a new MonitorClient using the given expvar.Map. +func NewMonitorClient(ep *expvar.Map) *MonitorClient { + return &MonitorClient{ep: ep} +} + +// Statistics implements the Client interface for a MonitorClient. +func (m MonitorClient) Statistics() (map[string]interface{}, error) { + values := make(map[string]interface{}) + m.ep.Do(func(kv expvar.KeyValue) { + var f interface{} + var err error + switch v := kv.Value.(type) { + case *expvar.Float: + f, err = strconv.ParseFloat(v.String(), 64) + if err != nil { + return + } + case *expvar.Int: + f, err = strconv.ParseUint(v.String(), 10, 64) + if err != nil { + return + } + default: + return + } + values[kv.Key] = f + }) + + return values, nil +} + +// Diagnostics implements the Client interface for a MonitorClient. +func (m MonitorClient) Diagnostics() (map[string]interface{}, error) { + return nil, nil +} + +func ensureDatabaseExists(host, database string) error { + values := url.Values{} + values.Set("q", fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s", database)) + resp, err := http.Get(host + "/query?" + values.Encode()) + if err != nil { + return fmt.Errorf("failed to create monitoring database on %s: %s", host, err.Error()) + } + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("failed to create monitoring database on %s, received code: %d", + host, resp.StatusCode) + } + return nil +} From d87e668c78f9d9294b2bd5687ea15512318fb6dc Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Mon, 31 Aug 2015 20:20:35 -0700 Subject: [PATCH 3/8] Remove obsolete monitoring code --- cmd/influxd/run/config.go | 4 -- cmd/influxd/run/config_test.go | 2 - etc/config.sample.toml | 8 ---- services/monitor/config.go | 25 ---------- services/monitor/monitor.go | 83 ---------------------------------- 5 files changed, 122 deletions(-) delete mode 100644 services/monitor/config.go delete mode 100644 services/monitor/monitor.go diff --git a/cmd/influxd/run/config.go b/cmd/influxd/run/config.go index 77515ca604a..02eb7713f1d 100644 --- a/cmd/influxd/run/config.go +++ b/cmd/influxd/run/config.go @@ -19,7 +19,6 @@ import ( "github.com/influxdb/influxdb/services/graphite" "github.com/influxdb/influxdb/services/hh" "github.com/influxdb/influxdb/services/httpd" - "github.com/influxdb/influxdb/services/monitor" "github.com/influxdb/influxdb/services/opentsdb" "github.com/influxdb/influxdb/services/precreator" "github.com/influxdb/influxdb/services/retention" @@ -43,7 +42,6 @@ type Config struct { UDPs []udp.Config `toml:"udp"` // Snapshot SnapshotConfig `toml:"snapshot"` - Monitoring monitor.Config `toml:"monitoring"` ContinuousQuery continuous_querier.Config `toml:"continuous_queries"` HintedHandoff hh.Config `toml:"hinted-handoff"` @@ -66,7 +64,6 @@ func NewConfig() *Config { c.OpenTSDB = opentsdb.NewConfig() c.Graphites = append(c.Graphites, graphite.NewConfig()) - c.Monitoring = monitor.NewConfig() c.ContinuousQuery = continuous_querier.NewConfig() c.Retention = retention.NewConfig() c.HintedHandoff = hh.NewConfig() @@ -95,7 +92,6 @@ func NewDemoConfig() (*Config, error) { c.Data.WALDir = filepath.Join(homeDir, ".influxdb/wal") c.Admin.Enabled = true - c.Monitoring.Enabled = false return c, nil } diff --git a/cmd/influxd/run/config_test.go b/cmd/influxd/run/config_test.go index ffcaf253321..7ecbb4f658c 100644 --- a/cmd/influxd/run/config_test.go +++ b/cmd/influxd/run/config_test.go @@ -72,8 +72,6 @@ enabled = true t.Fatalf("unexpected opentsdb bind address: %s", c.OpenTSDB.BindAddress) } else if c.UDPs[0].BindAddress != ":4444" { t.Fatalf("unexpected udp bind address: %s", c.UDPs[0].BindAddress) - } else if c.Monitoring.Enabled != true { - t.Fatalf("unexpected monitoring enabled: %v", c.Monitoring.Enabled) } else if c.ContinuousQuery.Enabled != true { t.Fatalf("unexpected continuous query enabled: %v", c.ContinuousQuery.Enabled) } diff --git a/etc/config.sample.toml b/etc/config.sample.toml index 9614277aba5..e3dd6acbe22 100644 --- a/etc/config.sample.toml +++ b/etc/config.sample.toml @@ -207,14 +207,6 @@ reporting-disabled = false # batch-size = 1000 # will flush if this many points get buffered # batch-timeout = "1s" # will flush at least this often even if we haven't hit buffer limit -### -### [monitoring] -### - -[monitoring] - enabled = true - write-interval = "24h" - ### ### [continuous_queries] ### diff --git a/services/monitor/config.go b/services/monitor/config.go deleted file mode 100644 index 1e3798700c6..00000000000 --- a/services/monitor/config.go +++ /dev/null @@ -1,25 +0,0 @@ -package monitor - -import ( - "time" - - "github.com/influxdb/influxdb/toml" -) - -const ( - // DefaultStatisticsWriteInterval is the interval of time between internal stats are written - DefaultStatisticsWriteInterval = 1 * time.Minute -) - -// Config represents a configuration for the monitor. -type Config struct { - Enabled bool `toml:"enabled"` - WriteInterval toml.Duration `toml:"write-interval"` -} - -func NewConfig() Config { - return Config{ - Enabled: false, - WriteInterval: toml.Duration(DefaultStatisticsWriteInterval), - } -} diff --git a/services/monitor/monitor.go b/services/monitor/monitor.go deleted file mode 100644 index 2568e6eb172..00000000000 --- a/services/monitor/monitor.go +++ /dev/null @@ -1,83 +0,0 @@ -package monitor - -// Monitor represents a TSDB monitoring service. -type Monitor struct { - Store interface{} -} - -func (m *Monitor) Open() error { return nil } -func (m *Monitor) Close() error { return nil } - -// StartSelfMonitoring starts a goroutine which monitors the InfluxDB server -// itself and stores the results in the specified database at a given interval. -/* -func (s *Server) StartSelfMonitoring(database, retention string, interval time.Duration) error { - if interval == 0 { - return fmt.Errorf("statistics check interval must be non-zero") - } - - go func() { - tick := time.NewTicker(interval) - for { - <-tick.C - - // Create the batch and tags - tags := map[string]string{"serverID": strconv.FormatUint(s.ID(), 10)} - if h, err := os.Hostname(); err == nil { - tags["host"] = h - } - batch := pointsFromStats(s.stats, tags) - - // Shard-level stats. - tags["shardID"] = strconv.FormatUint(s.id, 10) - s.mu.RLock() - for _, sh := range s.shards { - if !sh.HasDataNodeID(s.id) { - // No stats for non-local shards. - continue - } - batch = append(batch, pointsFromStats(sh.stats, tags)...) - } - s.mu.RUnlock() - - // Server diagnostics. - for _, row := range s.DiagnosticsAsRows() { - points, err := s.convertRowToPoints(row.Name, row) - if err != nil { - s.Logger.Printf("failed to write diagnostic row for %s: %s", row.Name, err.Error()) - continue - } - for _, p := range points { - p.AddTag("serverID", strconv.FormatUint(s.ID(), 10)) - } - batch = append(batch, points...) - } - - s.WriteSeries(database, retention, batch) - } - }() - return nil -} - -// Function for local use turns stats into a slice of points -func pointsFromStats(st *Stats, tags map[string]string) []tsdb.Point { - var points []tsdb.Point - now := time.Now() - st.Walk(func(k string, v int64) { - point := tsdb.NewPoint( - st.name+"_"+k, - make(map[string]string), - map[string]interface{}{"value": int(v)}, - now, - ) - // Specifically create a new map. - for k, v := range tags { - tags[k] = v - point.AddTag(k, v) - } - points = append(points, point) - }) - - return points -} -*/ From f05dc20b5892e44db4365a4f34db1bcc2c4e1eb7 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Mon, 31 Aug 2015 20:28:24 -0700 Subject: [PATCH 4/8] Hook new monitor service to server u --- cmd/influxd/run/config.go | 3 +++ cmd/influxd/run/server.go | 14 ++++++++++++++ etc/config.sample.toml | 11 +++++++++++ meta/statement_executor.go | 6 ------ monitor/service.go | 1 - tsdb/query_executor.go | 8 ++++++++ 6 files changed, 36 insertions(+), 7 deletions(-) diff --git a/cmd/influxd/run/config.go b/cmd/influxd/run/config.go index 02eb7713f1d..322b7bf5cbb 100644 --- a/cmd/influxd/run/config.go +++ b/cmd/influxd/run/config.go @@ -13,6 +13,7 @@ import ( "github.com/influxdb/influxdb/cluster" "github.com/influxdb/influxdb/meta" + "github.com/influxdb/influxdb/monitor" "github.com/influxdb/influxdb/services/admin" "github.com/influxdb/influxdb/services/collectd" "github.com/influxdb/influxdb/services/continuous_querier" @@ -35,6 +36,7 @@ type Config struct { Precreator precreator.Config `toml:"shard-precreation"` Admin admin.Config `toml:"admin"` + Monitor monitor.Config `toml:"monitor"` HTTPD httpd.Config `toml:"http"` Graphites []graphite.Config `toml:"graphite"` Collectd collectd.Config `toml:"collectd"` @@ -59,6 +61,7 @@ func NewConfig() *Config { c.Precreator = precreator.NewConfig() c.Admin = admin.NewConfig() + c.Monitor = monitor.NewConfig() c.HTTPD = httpd.NewConfig() c.Collectd = collectd.NewConfig() c.OpenTSDB = opentsdb.NewConfig() diff --git a/cmd/influxd/run/server.go b/cmd/influxd/run/server.go index ab65817d72c..d86aa5ce452 100644 --- a/cmd/influxd/run/server.go +++ b/cmd/influxd/run/server.go @@ -14,6 +14,7 @@ import ( "github.com/influxdb/influxdb/cluster" "github.com/influxdb/influxdb/meta" + "github.com/influxdb/influxdb/monitor" "github.com/influxdb/influxdb/services/admin" "github.com/influxdb/influxdb/services/collectd" "github.com/influxdb/influxdb/services/continuous_querier" @@ -57,6 +58,8 @@ type Server struct { ClusterService *cluster.Service SnapshotterService *snapshotter.Service + MonitorService *monitor.Service + // Server reporting reportingDisabled bool @@ -85,6 +88,16 @@ func NewServer(c *Config, version string) (*Server, error) { reportingDisabled: c.ReportingDisabled, } + // Start the monitor service. + clusterID, err := s.MetaStore.ClusterID() + if err != nil { + return nil, err + } + s.MonitorService = monitor.NewService(c.Monitor) + if err := s.MonitorService.Open(clusterID, s.MetaStore.NodeID(), s.Hostname); err != nil { + return nil, err + } + // Copy TSDB configuration. s.TSDBStore.EngineOptions.MaxWALSize = c.Data.MaxWALSize s.TSDBStore.EngineOptions.WALFlushInterval = time.Duration(c.Data.WALFlushInterval) @@ -100,6 +113,7 @@ func NewServer(c *Config, version string) (*Server, error) { s.QueryExecutor = tsdb.NewQueryExecutor(s.TSDBStore) s.QueryExecutor.MetaStore = s.MetaStore s.QueryExecutor.MetaStatementExecutor = &meta.StatementExecutor{Store: s.MetaStore} + s.QueryExecutor.MonitorStatementExecutor = s.MonitorService s.QueryExecutor.ShardMapper = s.ShardMapper // Set the shard writer diff --git a/etc/config.sample.toml b/etc/config.sample.toml index e3dd6acbe22..e5964c42de2 100644 --- a/etc/config.sample.toml +++ b/etc/config.sample.toml @@ -88,6 +88,17 @@ reporting-disabled = false enabled = true check-interval = "10m" +### +### Controls the system self-monitoring, statistics, diagnostics, and expvar data. +### + +[monitor] + store-enabled = true # Whether to record statistics in an InfluxDB system + store-database = "_internal" # The destination database for recorded statistics + store-interval = "1m" # The interval at which to record statistics + store-address = "http://127.0.0.1" # The protocol and host for the recorded data + # expvar-address = "127.0.0.1:9950" # Unset to expose expvar data over HTTP. + ### ### [admin] ### diff --git a/meta/statement_executor.go b/meta/statement_executor.go index 70e019afb98..291010ab316 100644 --- a/meta/statement_executor.go +++ b/meta/statement_executor.go @@ -80,8 +80,6 @@ func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement) *influxql. return e.executeDropContinuousQueryStatement(stmt) case *influxql.ShowContinuousQueriesStatement: return e.executeShowContinuousQueriesStatement(stmt) - case *influxql.ShowStatsStatement: - return e.executeShowStatsStatement(stmt) default: panic(fmt.Sprintf("unsupported statement type: %T", stmt)) } @@ -283,7 +281,3 @@ func (e *StatementExecutor) executeShowContinuousQueriesStatement(stmt *influxql } return &influxql.Result{Series: rows} } - -func (e *StatementExecutor) executeShowStatsStatement(stmt *influxql.ShowStatsStatement) *influxql.Result { - return &influxql.Result{Err: fmt.Errorf("SHOW STATS is not implemented yet")} -} diff --git a/monitor/service.go b/monitor/service.go index 2c78e4275f3..6ae86756f94 100644 --- a/monitor/service.go +++ b/monitor/service.go @@ -197,7 +197,6 @@ func (s *Service) storeStatistics() { } } - return } // statistic represents the information returned by a single monitor client. diff --git a/tsdb/query_executor.go b/tsdb/query_executor.go index d4c9196ed21..07120e74172 100644 --- a/tsdb/query_executor.go +++ b/tsdb/query_executor.go @@ -36,6 +36,11 @@ type QueryExecutor struct { ExecuteStatement(stmt influxql.Statement) *influxql.Result } + // Execute statements relating to statistics and diagnostics. + MonitorStatementExecutor interface { + ExecuteStatement(stmt influxql.Statement) *influxql.Result + } + // Maps shards for queries. ShardMapper interface { CreateMapper(shard meta.ShardInfo, stmt influxql.Statement, chunkSize int) (Mapper, error) @@ -173,6 +178,9 @@ func (q *QueryExecutor) ExecuteQuery(query *influxql.Query, database string, chu case *influxql.DropDatabaseStatement: // TODO: handle this in a cluster res = q.executeDropDatabaseStatement(stmt) + case *influxql.ShowStatsStatement: + // Send monitor-related queries to the monitor service. + res = q.MonitorStatementExecutor.ExecuteStatement(stmt) default: // Delegate all other meta statements to a separate executor. They don't hit tsdb storage. res = q.MetaStatementExecutor.ExecuteStatement(stmt) From 9df17409d3bec83726ff6f678d096bf3ee5730db Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Mon, 31 Aug 2015 20:32:48 -0700 Subject: [PATCH 5/8] Use monitor service with Graphite --- cmd/influxd/run/server.go | 1 + services/graphite/service.go | 74 ++++++++++++++++++++++++++++++++++-- 2 files changed, 72 insertions(+), 3 deletions(-) diff --git a/cmd/influxd/run/server.go b/cmd/influxd/run/server.go index d86aa5ce452..782fd6abcb8 100644 --- a/cmd/influxd/run/server.go +++ b/cmd/influxd/run/server.go @@ -244,6 +244,7 @@ func (s *Server) appendGraphiteService(c graphite.Config) error { srv.PointsWriter = s.PointsWriter srv.MetaStore = s.MetaStore + srv.MonitorService = s.MonitorService s.Services = append(s.Services, srv) return nil } diff --git a/services/graphite/service.go b/services/graphite/service.go index 683c7581130..2cf67399985 100644 --- a/services/graphite/service.go +++ b/services/graphite/service.go @@ -2,6 +2,7 @@ package graphite import ( "bufio" + "expvar" "fmt" "log" "math" @@ -13,6 +14,7 @@ import ( "github.com/influxdb/influxdb/cluster" "github.com/influxdb/influxdb/meta" + "github.com/influxdb/influxdb/monitor" "github.com/influxdb/influxdb/tsdb" ) @@ -21,6 +23,31 @@ const ( leaderWaitTimeout = 30 * time.Second ) +// expvar stats maintained by the Graphite package. +var monitorOnce sync.Once +var statMap = expvar.NewMap("graphite") +var statMapTCP = expvar.NewMap("tcp") +var statMapUDP = expvar.NewMap("udp") + +// Build the graphite expvar hierarchy. +func init() { + statMap.Set("tcp", statMapTCP) + statMap.Set("udp", statMapUDP) +} + +// statistics gathered by the graphite package. +const ( + statPointsReceived = "points_rx" + statBytesReceived = "bytes_rx" + statPointsParseFail = "points_parse_fail" + statPointsUnsupported = "points_unsupported_fail" + statBatchesTrasmitted = "batches_tx" + statPointsTransmitted = "points_tx" + statBatchesTransmitFail = "batches_tx_fail" + statConnectionsActive = "connections_active" + statConnectionsHandled = "connections_handled" +) + type Service struct { bindAddress string database string @@ -32,7 +59,8 @@ type Service struct { batcher *tsdb.PointBatcher parser *Parser - logger *log.Logger + logger *log.Logger + statMap *expvar.Map ln net.Listener addr net.Addr @@ -40,6 +68,9 @@ type Service struct { wg sync.WaitGroup done chan struct{} + MonitorService interface { + Register(name string, tags map[string]string, client monitor.Client) error + } PointsWriter interface { WritePoints(p *cluster.WritePointsRequest) error } @@ -87,6 +118,20 @@ func NewService(c Config) (*Service, error) { func (s *Service) Open() error { s.logger.Printf("Starting graphite service, batch size %d, batch timeout %s", s.batchSize, s.batchTimeout) + // One Graphite service hooks up monitoring for all Graphite functionality. + monitorOnce.Do(func() { + if s.MonitorService == nil { + s.logger.Println("no monitor service available, no monitoring will be performed") + return + } + + t := monitor.NewMonitorClient(statMapTCP) + s.MonitorService.Register("graphite", map[string]string{"proto": "tcp"}, t) + + u := monitor.NewMonitorClient(statMapUDP) + s.MonitorService.Register("graphite", map[string]string{"proto": "udp"}, u) + }) + if err := s.MetaStore.WaitForLeader(leaderWaitTimeout); err != nil { s.logger.Printf("Failed to detect a cluster leader: %s", err.Error()) return err @@ -151,6 +196,9 @@ func (s *Service) openTCPServer() (net.Addr, error) { } s.ln = ln + // Point at the TCP stats. + s.statMap = statMapTCP + s.wg.Add(1) go func() { defer s.wg.Done() @@ -176,6 +224,9 @@ func (s *Service) openTCPServer() (net.Addr, error) { func (s *Service) handleTCPConnection(conn net.Conn) { defer conn.Close() defer s.wg.Done() + defer s.statMap.Add(statConnectionsActive, -1) + s.statMap.Add(statConnectionsActive, 1) + s.statMap.Add(statConnectionsHandled, 1) reader := bufio.NewReader(conn) @@ -189,6 +240,8 @@ func (s *Service) handleTCPConnection(conn net.Conn) { // Trim the buffer, even though there should be no padding line := strings.TrimSpace(string(buf)) + s.statMap.Add(statPointsReceived, 1) + s.statMap.Add(statBytesReceived, int64(len(buf))) s.handleLine(line) } } @@ -205,6 +258,9 @@ func (s *Service) openUDPServer() (net.Addr, error) { return nil, err } + // Point at the UDP stats. + s.statMap = statMapUDP + buf := make([]byte, udpBufferSize) s.wg.Add(1) go func() { @@ -215,9 +271,13 @@ func (s *Service) openUDPServer() (net.Addr, error) { conn.Close() return } - for _, line := range strings.Split(string(buf[:n]), "\n") { + + lines := strings.Split(string(buf[:n]), "\n") + for _, line := range lines { s.handleLine(line) } + s.statMap.Add(statPointsReceived, int64(len(lines))) + s.statMap.Add(statBytesReceived, int64(n)) } }() return conn.LocalAddr(), nil @@ -227,10 +287,12 @@ func (s *Service) handleLine(line string) { if line == "" { return } + // Parse it. point, err := s.parser.Parse(line) if err != nil { s.logger.Printf("unable to parse line: %s", err) + s.statMap.Add(statPointsParseFail, 1) return } @@ -239,6 +301,7 @@ func (s *Service) handleLine(line string) { // Drop NaN and +/-Inf data points since they are not supported values if math.IsNaN(f) || math.IsInf(f, 0) { s.logger.Printf("dropping unsupported value: '%v'", line) + s.statMap.Add(statPointsUnsupported, 1) return } } @@ -257,9 +320,14 @@ func (s *Service) processBatches(batcher *tsdb.PointBatcher) { RetentionPolicy: "", ConsistencyLevel: s.consistencyLevel, Points: batch, - }); err != nil { + }); err == nil { + s.statMap.Add(statBatchesTrasmitted, 1) + s.statMap.Add(statPointsTransmitted, int64(len(batch))) + } else { s.logger.Printf("failed to write point batch to database %q: %s", s.database, err) + s.statMap.Add(statBatchesTransmitFail, 1) } + case <-s.done: return } From 9d81cdd1a4d4ac150891d29afd1bf9bc96a981f6 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Mon, 31 Aug 2015 20:41:33 -0700 Subject: [PATCH 6/8] Update CHANGELOG --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index c6d3c347fbc..27b7d2865c5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ With this release InfluxDB is moving to Go 1.5. ### Features - [#3863](https://github.com/influxdb/influxdb/pull/3863): Move to Go 1.5 - [#3892](https://github.com/influxdb/influxdb/pull/3892): Support IF NOT EXISTS for CREATE DATABASE +- [#3916](https://github.com/influxdb/influxdb/pull/3916): New statistics and diagnostics support. Graphite first to be instrumented. ### Bugfixes - [#3804](https://github.com/influxdb/influxdb/pull/3804): init.d script fixes, fixes issue 3803. From 26147b2f5fe0d76dd0781200bdf317c207d15c84 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Mon, 31 Aug 2015 20:50:44 -0700 Subject: [PATCH 7/8] Don't include empty statistical data in results --- monitor/service.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/monitor/service.go b/monitor/service.go index 6ae86756f94..5c732824498 100644 --- a/monitor/service.go +++ b/monitor/service.go @@ -172,7 +172,13 @@ func (s *Service) statistics() ([]*statistic, error) { if err != nil { continue } - statistics[i] = newStatistic(r.name, r.tags, stats) + + // If a registered client has no field data, don't include it in the results + if len(stats) == 0 { + continue + } + + statistics = append(statistics, newStatistic(r.name, r.tags, stats)) } return statistics, nil } From 366c0115f91b3b04d4b35b90901f5e7b645625d0 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Tue, 1 Sep 2015 14:57:15 -0700 Subject: [PATCH 8/8] Serve expvar information from HTTP package --- etc/config.sample.toml | 1 - monitor/config.go | 1 - monitor/config_test.go | 5 +---- monitor/service.go | 16 ---------------- services/httpd/handler.go | 23 ++++++++++++++++++++--- 5 files changed, 21 insertions(+), 25 deletions(-) diff --git a/etc/config.sample.toml b/etc/config.sample.toml index e5964c42de2..5910533a550 100644 --- a/etc/config.sample.toml +++ b/etc/config.sample.toml @@ -97,7 +97,6 @@ reporting-disabled = false store-database = "_internal" # The destination database for recorded statistics store-interval = "1m" # The interval at which to record statistics store-address = "http://127.0.0.1" # The protocol and host for the recorded data - # expvar-address = "127.0.0.1:9950" # Unset to expose expvar data over HTTP. ### ### [admin] diff --git a/monitor/config.go b/monitor/config.go index 324a62dea5e..061a472d5ae 100644 --- a/monitor/config.go +++ b/monitor/config.go @@ -27,7 +27,6 @@ type Config struct { StoreDatabase string `toml:"store-database"` StoreInterval toml.Duration `toml:"store-interval"` StoreAddress string `toml:"store-address"` - ExpvarAddress string `toml:"expvar-address"` } // NewConfig returns an instance of Config with defaults. diff --git a/monitor/config_test.go b/monitor/config_test.go index d3ec3fda14b..0626a7cfcae 100644 --- a/monitor/config_test.go +++ b/monitor/config_test.go @@ -16,21 +16,18 @@ store-enabled=true store-database="the_db" store-interval="10m" store-address="server1" -expvar-address="127.0.0.1:9950" `, &c); err != nil { t.Fatal(err) } // Validate configuration. if !c.StoreEnabled { - t.Fatalf("unexpected store-enabled: %s", c.StoreEnabled) + t.Fatalf("unexpected store-enabled: %v", c.StoreEnabled) } else if c.StoreDatabase != "the_db" { t.Fatalf("unexpected store-database: %s", c.StoreDatabase) } else if time.Duration(c.StoreInterval) != 10*time.Minute { t.Fatalf("unexpected store-interval: %s", c.StoreInterval) } else if c.StoreAddress != "server1" { t.Fatalf("unexpected store-address: %s", c.StoreAddress) - } else if c.ExpvarAddress != "127.0.0.1:9950" { - t.Fatalf("unexpected expvar-address: %s", c.ExpvarAddress) } } diff --git a/monitor/service.go b/monitor/service.go index 5c732824498..26381e06980 100644 --- a/monitor/service.go +++ b/monitor/service.go @@ -4,7 +4,6 @@ import ( "expvar" "fmt" "log" - "net" "net/http" "net/url" "os" @@ -38,8 +37,6 @@ type Service struct { storeAddress string storeInterval time.Duration - expvarAddress string - Logger *log.Logger } @@ -51,7 +48,6 @@ func NewService(c Config) *Service { storeDatabase: c.StoreDatabase, storeAddress: c.StoreAddress, storeInterval: time.Duration(c.StoreInterval), - expvarAddress: c.ExpvarAddress, Logger: log.New(os.Stderr, "[monitor] ", log.LstdFlags), } } @@ -82,18 +78,6 @@ func (s *Service) Open(clusterID, nodeID uint64, hostname string) error { go s.storeStatistics() } - // If enabled, expose all expvar data over HTTP. - if s.expvarAddress != "" { - listener, err := net.Listen("tcp", s.expvarAddress) - if err != nil { - return err - } - - go func() { - http.Serve(listener, nil) - }() - s.Logger.Printf("expvar information available on %s", s.expvarAddress) - } return nil } diff --git a/services/httpd/handler.go b/services/httpd/handler.go index 61684d6f7f9..b1011a72c7d 100644 --- a/services/httpd/handler.go +++ b/services/httpd/handler.go @@ -5,6 +5,7 @@ import ( "compress/gzip" "encoding/json" "errors" + "expvar" "fmt" "io" "io/ioutil" @@ -161,10 +162,11 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { default: pprof.Index(w, r) } - return + } else if strings.HasPrefix(r.URL.Path, "/debug/vars") { + serveExpvar(w, r) + } else { + h.mux.ServeHTTP(w, r) } - - h.mux.ServeHTTP(w, r) } func (h *Handler) serveProcessContinuousQueries(w http.ResponseWriter, r *http.Request, user *meta.UserInfo) { @@ -569,6 +571,21 @@ type Batch struct { Points []Point `json:"points"` } +// serveExpvar serves registered expvar information over HTTP. +func serveExpvar(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json; charset=utf-8") + fmt.Fprintf(w, "{\n") + first := true + expvar.Do(func(kv expvar.KeyValue) { + if !first { + fmt.Fprintf(w, ",\n") + } + first = false + fmt.Fprintf(w, "%q: %s", kv.Key, kv.Value) + }) + fmt.Fprintf(w, "\n}\n") +} + // httpError writes an error to the client in a standard format. func httpError(w http.ResponseWriter, error string, pretty bool, code int) { w.Header().Add("content-type", "application/json")