From 0cbd85310017b312432625db14f7c5501b7b64f8 Mon Sep 17 00:00:00 2001 From: Josh Palay Date: Wed, 23 Sep 2015 11:21:42 -0700 Subject: [PATCH 1/3] Adds command intervals to exec plugin --- plugins/exec/README.md | 5 + plugins/exec/exec.go | 79 +++++++++++----- plugins/exec/exec_test.go | 191 ++++++++++++++++++++++++++++++++++++-- 3 files changed, 241 insertions(+), 34 deletions(-) diff --git a/plugins/exec/README.md b/plugins/exec/README.md index abe99da32e65f..456ab2b774cbf 100644 --- a/plugins/exec/README.md +++ b/plugins/exec/README.md @@ -10,6 +10,11 @@ setup the exec plugin with: [[exec.commands]] command = "/usr/bin/mycollector --output=json" name = "mycollector" +interval = 10 ``` The name is used as a prefix for the measurements. + +The interval is used to determine how often a particular command should be run. Each +time the exec plugin runs, it will only run a particular command if it has been at least +`interval` seconds since the exec plugin last ran the command. diff --git a/plugins/exec/exec.go b/plugins/exec/exec.go index 6c340db6f2d31..c9d115312d774 100644 --- a/plugins/exec/exec.go +++ b/plugins/exec/exec.go @@ -6,8 +6,10 @@ import ( "fmt" "github.com/gonuts/go-shellquote" "github.com/influxdb/telegraf/plugins" + "math" "os/exec" "sync" + "time" ) const sampleConfig = ` @@ -18,31 +20,45 @@ const sampleConfig = ` # name of the command (used as a prefix for measurements) name = "mycollector" -` -type Command struct { - Command string - Name string -} + # Only run this command if it has been at least this many + # seconds since it last ran + interval = 10 +` type Exec struct { Commands []*Command runner Runner + clock Clock } -type Runner interface { - Run(string, ...string) ([]byte, error) +type Command struct { + Command string + Name string + Interval int + lastRunAt time.Time } -type CommandRunner struct { +type Runner interface { + Run(*Command) ([]byte, error) } -func NewExec() *Exec { - return &Exec{runner: CommandRunner{}} +type Clock interface { + Now() time.Time } -func (c CommandRunner) Run(command string, args ...string) ([]byte, error) { - cmd := exec.Command(command, args...) +type CommandRunner struct{} + +type RealClock struct{} + +func (c CommandRunner) Run(command *Command) ([]byte, error) { + command.lastRunAt = time.Now() + split_cmd, err := shellquote.Split(command.Command) + if err != nil || len(split_cmd) == 0 { + return nil, fmt.Errorf("exec: unable to parse command, %s", err) + } + + cmd := exec.Command(split_cmd[0], split_cmd[1:]...) var out bytes.Buffer cmd.Stdout = &out @@ -53,6 +69,14 @@ func (c CommandRunner) Run(command string, args ...string) ([]byte, error) { return out.Bytes(), nil } +func (c RealClock) Now() time.Time { + return time.Now() +} + +func NewExec() *Exec { + return &Exec{runner: CommandRunner{}, clock: RealClock{}} +} + func (e *Exec) SampleConfig() string { return sampleConfig } @@ -80,23 +104,28 @@ func (e *Exec) Gather(acc plugins.Accumulator) error { } func (e *Exec) gatherCommand(c *Command, acc plugins.Accumulator) error { - words, err := shellquote.Split(c.Command) - if err != nil || len(words) == 0 { - return fmt.Errorf("exec: unable to parse command, %s", err) - } + secondsSinceLastRun := 0.0 - out, err := e.runner.Run(words[0], words[1:]...) - if err != nil { - return err + if c.lastRunAt.Unix() == 0 { // means time is uninitialized + secondsSinceLastRun = math.Inf(1) + } else { + secondsSinceLastRun = (e.clock.Now().Sub(c.lastRunAt)).Seconds() } - var jsonOut interface{} - err = json.Unmarshal(out, &jsonOut) - if err != nil { - return fmt.Errorf("exec: unable to parse output of '%s' as JSON, %s", c.Command, err) - } + if secondsSinceLastRun >= float64(c.Interval) { + out, err := e.runner.Run(c) + if err != nil { + return err + } - processResponse(acc, c.Name, map[string]string{}, jsonOut) + var jsonOut interface{} + err = json.Unmarshal(out, &jsonOut) + if err != nil { + return fmt.Errorf("exec: unable to parse output of '%s' as JSON, %s", c.Command, err) + } + + processResponse(acc, c.Name, map[string]string{}, jsonOut) + } return nil } diff --git a/plugins/exec/exec_test.go b/plugins/exec/exec_test.go index 4464e2beb30c6..3f0b6f4cef5ad 100644 --- a/plugins/exec/exec_test.go +++ b/plugins/exec/exec_test.go @@ -5,9 +5,14 @@ import ( "github.com/influxdb/telegraf/testutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "math" "testing" + "time" ) +// Midnight 9/22/2015 +const baseTimeSeconds = 1442905200 + const validJson = ` { "status": "green", @@ -32,24 +37,52 @@ type runnerMock struct { err error } +type clockMock struct { + now time.Time +} + func newRunnerMock(out []byte, err error) Runner { - return &runnerMock{out: out, err: err} + return &runnerMock{ + out: out, + err: err, + } } -func (r runnerMock) Run(command string, args ...string) ([]byte, error) { +func (r runnerMock) Run(command *Command) ([]byte, error) { if r.err != nil { return nil, r.err } return r.out, nil } +func newClockMock(now time.Time) Clock { + return &clockMock{now: now} +} + +func (c clockMock) Now() time.Time { + return c.now +} + func TestExec(t *testing.T) { runner := newRunnerMock([]byte(validJson), nil) - command := Command{Command: "testcommand arg1", Name: "mycollector"} - e := &Exec{runner: runner, Commands: []*Command{&command}} + clock := newClockMock(time.Unix(baseTimeSeconds+20, 0)) + command := Command{ + Command: "testcommand arg1", + Name: "mycollector", + Interval: 10, + lastRunAt: time.Unix(baseTimeSeconds, 0), + } + + e := &Exec{ + runner: runner, + clock: clock, + Commands: []*Command{&command}, + } var acc testutil.Accumulator + initialPoints := len(acc.Points) err := e.Gather(&acc) + deltaPoints := len(acc.Points) - initialPoints require.NoError(t, err) checkFloat := []struct { @@ -66,24 +99,164 @@ func TestExec(t *testing.T) { assert.True(t, acc.CheckValue(c.name, c.value)) } - assert.Equal(t, len(acc.Points), 4, "non-numeric measurements should be ignored") + assert.Equal(t, deltaPoints, 4, "non-numeric measurements should be ignored") } func TestExecMalformed(t *testing.T) { runner := newRunnerMock([]byte(malformedJson), nil) - command := Command{Command: "badcommand arg1", Name: "mycollector"} - e := &Exec{runner: runner, Commands: []*Command{&command}} + clock := newClockMock(time.Unix(baseTimeSeconds+20, 0)) + command := Command{ + Command: "badcommand arg1", + Name: "mycollector", + Interval: 10, + lastRunAt: time.Unix(baseTimeSeconds, 0), + } + + e := &Exec{ + runner: runner, + clock: clock, + Commands: []*Command{&command}, + } var acc testutil.Accumulator + initialPoints := len(acc.Points) err := e.Gather(&acc) + deltaPoints := len(acc.Points) - initialPoints require.Error(t, err) + + assert.Equal(t, deltaPoints, 0, "No new points should have been added") } func TestCommandError(t *testing.T) { runner := newRunnerMock(nil, fmt.Errorf("exit status code 1")) - command := Command{Command: "badcommand", Name: "mycollector"} - e := &Exec{runner: runner, Commands: []*Command{&command}} + clock := newClockMock(time.Unix(baseTimeSeconds+20, 0)) + command := Command{ + Command: "badcommand", + Name: "mycollector", + Interval: 10, + lastRunAt: time.Unix(baseTimeSeconds, 0), + } + + e := &Exec{ + runner: runner, + clock: clock, + Commands: []*Command{&command}, + } + var acc testutil.Accumulator + initialPoints := len(acc.Points) err := e.Gather(&acc) + deltaPoints := len(acc.Points) - initialPoints require.Error(t, err) + + assert.Equal(t, deltaPoints, 0, "No new points should have been added") +} + +func TestExecNotEnoughTime(t *testing.T) { + runner := newRunnerMock([]byte(validJson), nil) + clock := newClockMock(time.Unix(baseTimeSeconds+5, 0)) + command := Command{ + Command: "testcommand arg1", + Name: "mycollector", + Interval: 10, + lastRunAt: time.Unix(baseTimeSeconds, 0), + } + + e := &Exec{ + runner: runner, + clock: clock, + Commands: []*Command{&command}, + } + + var acc testutil.Accumulator + initialPoints := len(acc.Points) + err := e.Gather(&acc) + deltaPoints := len(acc.Points) - initialPoints + require.NoError(t, err) + + assert.Equal(t, deltaPoints, 0, "No new points should have been added") +} + +func TestExecUninitializedLastRunAt(t *testing.T) { + runner := newRunnerMock([]byte(validJson), nil) + clock := newClockMock(time.Unix(baseTimeSeconds, 0)) + command := Command{ + Command: "testcommand arg1", + Name: "mycollector", + Interval: math.MaxInt32, + // Uninitialized lastRunAt should default to time.Unix(0, 0), so this should + // run no matter what the interval is + } + + e := &Exec{ + runner: runner, + clock: clock, + Commands: []*Command{&command}, + } + + var acc testutil.Accumulator + initialPoints := len(acc.Points) + err := e.Gather(&acc) + deltaPoints := len(acc.Points) - initialPoints + require.NoError(t, err) + + checkFloat := []struct { + name string + value float64 + }{ + {"mycollector_num_processes", 82}, + {"mycollector_cpu_used", 8234}, + {"mycollector_cpu_free", 32}, + {"mycollector_percent", 0.81}, + } + + for _, c := range checkFloat { + assert.True(t, acc.CheckValue(c.name, c.value)) + } + + assert.Equal(t, deltaPoints, 4, "non-numeric measurements should be ignored") +} +func TestExecOneNotEnoughTimeAndOneEnoughTime(t *testing.T) { + runner := newRunnerMock([]byte(validJson), nil) + clock := newClockMock(time.Unix(baseTimeSeconds+5, 0)) + notEnoughTimeCommand := Command{ + Command: "testcommand arg1", + Name: "mycollector", + Interval: 10, + lastRunAt: time.Unix(baseTimeSeconds, 0), + } + enoughTimeCommand := Command{ + Command: "testcommand arg1", + Name: "mycollector", + Interval: 3, + lastRunAt: time.Unix(baseTimeSeconds, 0), + } + + e := &Exec{ + runner: runner, + clock: clock, + Commands: []*Command{¬EnoughTimeCommand, &enoughTimeCommand}, + } + + var acc testutil.Accumulator + initialPoints := len(acc.Points) + err := e.Gather(&acc) + deltaPoints := len(acc.Points) - initialPoints + require.NoError(t, err) + + checkFloat := []struct { + name string + value float64 + }{ + {"mycollector_num_processes", 82}, + {"mycollector_cpu_used", 8234}, + {"mycollector_cpu_free", 32}, + {"mycollector_percent", 0.81}, + } + + for _, c := range checkFloat { + assert.True(t, acc.CheckValue(c.name, c.value)) + } + + assert.Equal(t, deltaPoints, 4, "Only one command should have been run") } From 8944182d8e63c290f862171fb4aa1eb2a247a915 Mon Sep 17 00:00:00 2001 From: Josh Palay Date: Wed, 23 Sep 2015 14:45:06 -0700 Subject: [PATCH 2/3] Fix printf format issue --- plugins/exec/exec.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/exec/exec.go b/plugins/exec/exec.go index c9d115312d774..5a039565dbfa3 100644 --- a/plugins/exec/exec.go +++ b/plugins/exec/exec.go @@ -63,7 +63,7 @@ func (c CommandRunner) Run(command *Command) ([]byte, error) { cmd.Stdout = &out if err := cmd.Run(); err != nil { - return nil, fmt.Errorf("exec: %s for command '%s'", err, command) + return nil, fmt.Errorf("exec: %s for command '%s'", err, command.command) } return out.Bytes(), nil From e07aeebdd33ef452201048be31ec72b93faa3425 Mon Sep 17 00:00:00 2001 From: Josh Palay Date: Wed, 23 Sep 2015 14:46:35 -0700 Subject: [PATCH 3/3] fix typo --- plugins/exec/exec.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/exec/exec.go b/plugins/exec/exec.go index 5a039565dbfa3..d37f36c72cd7b 100644 --- a/plugins/exec/exec.go +++ b/plugins/exec/exec.go @@ -63,7 +63,7 @@ func (c CommandRunner) Run(command *Command) ([]byte, error) { cmd.Stdout = &out if err := cmd.Run(); err != nil { - return nil, fmt.Errorf("exec: %s for command '%s'", err, command.command) + return nil, fmt.Errorf("exec: %s for command '%s'", err, command.Command) } return out.Bytes(), nil