Skip to content

Commit

Permalink
Merge pull request #3969 from influxdb/hook_up_diagnostics
Browse files Browse the repository at this point in the history
Complete diagnostics support
  • Loading branch information
otoolep committed Sep 4, 2015
2 parents 548b898 + 6ad35e2 commit 02e2ed8
Show file tree
Hide file tree
Showing 8 changed files with 338 additions and 88 deletions.
12 changes: 9 additions & 3 deletions monitor/go_runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,13 @@ func (g *goRuntime) Statistics() (map[string]interface{}, error) {
}, nil
}

// Diagnostics returns the statistics for the goRuntime type
func (g *goRuntime) Diagnostics() (map[string]interface{}, error) {
return nil, nil
func (g *goRuntime) Diagnostics() (*Diagnostic, error) {
diagnostics := map[string]interface{}{
"GOARCH": runtime.GOARCH,
"GOOS": runtime.GOOS,
"GOMAXPROCS": runtime.GOMAXPROCS(-1),
"version": runtime.Version(),
}

return DiagnosticFromMap(diagnostics), nil
}
26 changes: 26 additions & 0 deletions monitor/network.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package monitor

import (
"os"
)

// network captures network statistics and implements the monitor client interface
type network struct{}

// Statistics returns the statistics for the network type
func (n *network) Statistics() (map[string]interface{}, error) {
return nil, nil
}

func (n *network) Diagnostics() (*Diagnostic, error) {
h, err := os.Hostname()
if err != nil {
return nil, err
}

diagnostics := map[string]interface{}{
"hostname": h,
}

return DiagnosticFromMap(diagnostics), nil
}
216 changes: 167 additions & 49 deletions monitor/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,64 @@ import (

"github.com/influxdb/influxdb/cluster"
"github.com/influxdb/influxdb/meta"
"github.com/influxdb/influxdb/tsdb"
)

const leaderWaitTimeout = 30 * time.Second

// Client is the interface modules must implement if they wish to register with monitor.
type Client interface {
// StatsClient is the interface modules must implement if they wish to register with monitor.
type StatsClient interface {
// Statistics returns a map of keys to values. Each Value must be either int64 or float64.
// Statistical information is written to an InfluxDB system if enabled.
Statistics() (map[string]interface{}, error)
Diagnostics() (map[string]interface{}, error)
}

// DiagsClient is the interface modules implement if they register diags with monitor.
type DiagsClient interface {
Diagnostics() (*Diagnostic, error)
}

// The DiagsClientFunc type is an adapter to allow the use of
// ordinary functions as Diagnostis clients.
type DiagsClientFunc func() (*Diagnostic, error)

// Diagnostics calls f().
func (f DiagsClientFunc) Diagnostics() (*Diagnostic, error) {
return f()
}

// Diagnostic represents a table of diagnostic information. The first value
// is the name of the columns, the second is a slice of interface slices containing
// the values for each column, by row. This information is never written to an InfluxDB
// system and is display-only. An example showing, say, connections follows:
//
// source_ip source_port dest_ip dest_port
// 182.1.0.2 2890 127.0.0.1 38901
// 174.33.1.2 2924 127.0.0.1 38902
type Diagnostic struct {
Columns []string
Rows [][]interface{}
}

func NewDiagnostic(columns []string) *Diagnostic {
return &Diagnostic{
Columns: columns,
Rows: make([][]interface{}, 0),
}
}

func (d *Diagnostic) AddRow(r []interface{}) {
d.Rows = append(d.Rows, r)
}

// Monitor represents an instance of the monitor system.
type Monitor struct {
wg sync.WaitGroup
done chan struct{}
mu sync.Mutex
registrations []*clientWithMeta
wg sync.WaitGroup
done chan struct{}
mu sync.Mutex

statRegistrations []*clientWithMeta
diagRegistrations map[string]DiagsClient

storeEnabled bool
storeDatabase string
Expand All @@ -50,12 +92,13 @@ type Monitor struct {
// New returns a new instance of the monitor system.
func New(c Config) *Monitor {
return &Monitor{
done: make(chan struct{}),
registrations: make([]*clientWithMeta, 0),
storeEnabled: c.StoreEnabled,
storeDatabase: c.StoreDatabase,
storeInterval: time.Duration(c.StoreInterval),
Logger: log.New(os.Stderr, "[monitor] ", log.LstdFlags),
done: make(chan struct{}),
statRegistrations: make([]*clientWithMeta, 0),
diagRegistrations: make(map[string]DiagsClient),
storeEnabled: c.StoreEnabled,
storeDatabase: c.StoreDatabase,
storeInterval: time.Duration(c.StoreInterval),
Logger: log.New(os.Stderr, "[monitor] ", log.LstdFlags),
}
}

Expand All @@ -64,8 +107,12 @@ func New(c Config) *Monitor {
func (m *Monitor) Open() error {
m.Logger.Printf("Starting monitor system")

// Self-register Go runtime statm.
m.Register("runtime", nil, &goRuntime{})
// Self-register various stats and diagnostics.
gr := &goRuntime{}
m.RegisterStatsClient("runtime", nil, gr)
m.RegisterDiagnosticsClient("runtime", gr)
m.RegisterDiagnosticsClient("network", &network{})
m.RegisterDiagnosticsClient("system", &system{})

// If enabled, record stats in a InfluxDB system.
if m.storeEnabled {
Expand All @@ -92,16 +139,49 @@ func (m *Monitor) SetLogger(l *log.Logger) {
}

// Register registers a client with the given name and tags.
func (m *Monitor) Register(name string, tags map[string]string, client Client) error {
func (m *Monitor) RegisterStatsClient(name string, tags map[string]string, client StatsClient) error {
m.mu.Lock()
defer m.mu.Unlock()

a := tags
if a == nil {
a = make(map[string]string)
}

// Get cluster-level metadata to supplement stats.
var clusterID string
var hostname string
var err error
if cID, err := m.MetaStore.ClusterID(); err != nil {
m.Logger.Printf("failed to determine cluster ID: %s", err)
} else {
clusterID = strconv.FormatUint(cID, 10)
}
nodeID := strconv.FormatUint(m.MetaStore.NodeID(), 10)
if hostname, err = os.Hostname(); err != nil {
m.Logger.Printf("failed to determine hostname: %s", err)
}
a["clusterID"] = clusterID
a["nodeID"] = nodeID
a["hostname"] = hostname

c := &clientWithMeta{
Client: client,
name: name,
tags: tags,
StatsClient: client,
name: name,
tags: a,
}
m.registrations = append(m.registrations, c)
m.Logger.Printf(`'%s:%v' registered for monitoring`, name, tags)

m.statRegistrations = append(m.statRegistrations, c)
m.Logger.Printf(`'%s:%v' registered for statistics monitoring`, name, tags)
return nil
}

// RegisterDiagnosticsClient registers a diagnostics client with the given name and tags.
func (m *Monitor) RegisterDiagnosticsClient(name string, client DiagsClient) error {
m.mu.Lock()
defer m.mu.Unlock()
m.diagRegistrations[name] = client
m.Logger.Printf(`'%s' registered for diagnostics monitoring`, name)
return nil
}

Expand All @@ -110,9 +190,9 @@ func (m *Monitor) Statistics() ([]*statistic, error) {
m.mu.Lock()
defer m.mu.Unlock()

statistics := make([]*statistic, 0, len(m.registrations))
for _, r := range m.registrations {
stats, err := r.Client.Statistics()
statistics := make([]*statistic, 0, len(m.statRegistrations))
for _, r := range m.statRegistrations {
stats, err := r.StatsClient.Statistics()
if err != nil {
continue
}
Expand All @@ -127,14 +207,24 @@ func (m *Monitor) Statistics() ([]*statistic, error) {
return statistics, nil
}

func (m *Monitor) Diagnostics() (map[string]*Diagnostic, error) {
m.mu.Lock()
defer m.mu.Unlock()

diags := make(map[string]*Diagnostic, len(m.diagRegistrations))
for k, v := range m.diagRegistrations {
d, err := v.Diagnostics()
if err != nil {
continue
}
diags[k] = d
}
return diags, nil
}

// storeStatistics writes the statistics to an InfluxDB system.
func (m *Monitor) storeStatistics() {
// XXX add tags such as local hostname and cluster ID
//a.Tags["clusterID"] = strconv.FormatUint(m.clusterID, 10)
//a.Tags["nodeID"] = strconv.FormatUint(m.nodeID, 10)
//a.Tags["hostname"] = m.hostname
defer m.wg.Done()

m.Logger.Printf("storing statistics in database '%s', interval %s",
m.storeDatabase, m.storeInterval)

Expand All @@ -154,7 +244,26 @@ func (m *Monitor) storeStatistics() {
for {
select {
case <-tick.C:
// Write stats here.
stats, err := m.Statistics()
if err != nil {
m.Logger.Printf("failed to retrieve registered statistics: %s", err)
continue
}

points := make(tsdb.Points, 0, len(stats))
for _, s := range stats {
points = append(points, tsdb.NewPoint(s.Name, s.Tags, s.Values, time.Now()))
}

err = m.PointsWriter.WritePoints(&cluster.WritePointsRequest{
Database: m.storeDatabase,
RetentionPolicy: "",
ConsistencyLevel: cluster.ConsistencyLevelOne,
Points: points,
})
if err != nil {
m.Logger.Printf("failed to store statistics: %s", err)
}
case <-m.done:
m.Logger.Printf("terminating storage of statistics")
return
Expand All @@ -170,16 +279,11 @@ type statistic struct {
Values map[string]interface{}
}

// newStatistic returns a new statistic object. It ensures that tags are always non-nil.
// newStatistic returns a new statistic object.
func newStatistic(name string, tags map[string]string, values map[string]interface{}) *statistic {
a := tags
if a == nil {
a = make(map[string]string)
}

return &statistic{
Name: name,
Tags: a,
Tags: tags,
Values: values,
}
}
Expand All @@ -206,24 +310,24 @@ func (s *statistic) valueNames() []string {

// clientWithMeta wraps a registered client with its associated name and tagm.
type clientWithMeta struct {
Client
StatsClient
name string
tags map[string]string
}

// MonitorClient wraps a *expvar.Map so that it implements the Client interface. It is for
// use by external packages that just record stats in an expvar.Map type.
type MonitorClient struct {
// StatsMonitorClient wraps a *expvar.Map so that it implements the StatsClient interface.
// It is for use by external packages that just record stats in an expvar.Map type.
type StatsMonitorClient struct {
ep *expvar.Map
}

// NewMonitorClient returns a new MonitorClient using the given expvar.Map.
func NewMonitorClient(ep *expvar.Map) *MonitorClient {
return &MonitorClient{ep: ep}
// NewStatsMonitorClient returns a new StatsMonitorClient using the given expvar.Map.
func NewStatsMonitorClient(ep *expvar.Map) *StatsMonitorClient {
return &StatsMonitorClient{ep: ep}
}

// Statistics implements the Client interface for a MonitorClient.
func (m MonitorClient) Statistics() (map[string]interface{}, error) {
// Statistics implements the Client interface for a StatsMonitorClient.
func (m StatsMonitorClient) Statistics() (map[string]interface{}, error) {
values := make(map[string]interface{})
m.ep.Do(func(kv expvar.KeyValue) {
var f interface{}
Expand All @@ -248,7 +352,21 @@ func (m MonitorClient) Statistics() (map[string]interface{}, error) {
return values, nil
}

// Diagnostics implements the Client interface for a MonitorClient.
func (m MonitorClient) Diagnostics() (map[string]interface{}, error) {
return nil, nil
// DiagnosticFromMap returns a Diagnostic from a map.
func DiagnosticFromMap(m map[string]interface{}) *Diagnostic {
// Display columns in deterministic order.
sortedKeys := make([]string, 0, len(m))
for k, _ := range m {
sortedKeys = append(sortedKeys, k)
}
sort.Strings(sortedKeys)

d := NewDiagnostic(sortedKeys)
row := make([]interface{}, len(sortedKeys))
for i, k := range sortedKeys {
row[i] = m[k]
}
d.AddRow(row)

return d
}
11 changes: 7 additions & 4 deletions monitor/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,24 @@ func Test_RegisterStats(t *testing.T) {
}

// Register a client without tags.
if err := monitor.Register("foo", nil, client); err != nil {
if err := monitor.RegisterStatsClient("foo", nil, client); err != nil {
t.Fatalf("failed to register client: %s", err.Error())
}
json := executeShowStatsJSON(t, executor)
if !strings.Contains(json, `{"name":"foo","columns":["bar","qux"],"values":[[1,2.4]]}]}`) {
if !strings.Contains(json, `"columns":["bar","qux"],"values":[[1,2.4]]`) || !strings.Contains(json, `"name":"foo"`) {
t.Fatalf("SHOW STATS response incorrect, got: %s\n", json)
}

// Register a client with tags.
if err := monitor.Register("baz", map[string]string{"proto": "tcp"}, client); err != nil {
if err := monitor.RegisterStatsClient("baz", map[string]string{"proto": "tcp"}, client); err != nil {
t.Fatalf("failed to register client: %s", err.Error())
}
json = executeShowStatsJSON(t, executor)
if !strings.Contains(json, `{"name":"baz","tags":{"proto":"tcp"},"columns":["bar","qux"],"values":[[1,2.4]]}]}`) {
if !strings.Contains(json, `"columns":["bar","qux"],"values":[[1,2.4]]`) ||
!strings.Contains(json, `"name":"baz"`) ||
!strings.Contains(json, `"proto":"tcp"`) {
t.Fatalf("SHOW STATS response incorrect, got: %s\n", json)

}
}

Expand Down
Loading

0 comments on commit 02e2ed8

Please sign in to comment.