Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow exec plugin to parse line-protocol #617

Merged
merged 1 commit into from
Jan 30, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
- [#603](https://github.com/influxdata/telegraf/pull/603): Aggregate statsd timing measurements into fields. Thanks @marcinbunsch!
- [#601](https://github.com/influxdata/telegraf/issues/601): Warn when overwriting cached metrics.
- [#614](https://github.com/influxdata/telegraf/pull/614): PowerDNS input plugin. Thanks @Kasen!
- [#617](https://github.com/influxdata/telegraf/pull/617): exec plugin: parse influx line protocol in addition to JSON.

### Bugfixes
- [#595](https://github.com/influxdata/telegraf/issues/595): graphite output should include tags to separate duplicate measurements.
Expand Down
73 changes: 55 additions & 18 deletions plugins/inputs/exec/README.md
Original file line number Diff line number Diff line change
@@ -1,28 +1,39 @@
# Exec Plugin
# Exec Input Plugin

The exec plugin can execute arbitrary commands which output JSON. Then it flattens JSON and finds
all numeric values, treating them as floats.
The exec plugin can execute arbitrary commands which output JSON or
InfluxDB [line-protocol](https://docs.influxdata.com/influxdb/v0.9/write_protocols/line/).

For example, if you have a json-returning command called mycollector, you could
setup the exec plugin with:
If using JSON, only numeric values are parsed and turned into floats. Booleans
and strings will be ignored.

### Configuration

```
# Read flattened metrics from one or more commands that output JSON to stdout
[[inputs.exec]]
command = "/usr/bin/mycollector --output=json"
# the command to run
command = "/usr/bin/mycollector --foo=bar"
# Data format to consume. This can be "json" or "influx" (line-protocol)
# NOTE json only reads numerical measurements, strings and booleans are ignored.
data_format = "json"
# measurement name suffix (for separating different commands)
name_suffix = "_mycollector"
interval = "10s"
```

The name suffix is appended to exec as "exec_name_suffix" to identify the input stream.
Other options for modifying the measurement names are:

The interval is used to determine how often a particular command should be run. Each
time the exec plugin runs, it will only run a particular command if it has been at least
`interval` seconds since the exec plugin last ran the command.
```
name_override = "measurement_name"
name_prefix = "prefix_"
```

### Example 1

# Sample
Let's say that we have the above configuration, and mycollector outputs the
following JSON:

Let's say that we have a command with the name_suffix "_mycollector", which gives the following output:
```json
{
"a": 0.5,
Expand All @@ -33,13 +44,39 @@ Let's say that we have a command with the name_suffix "_mycollector", which give
}
```

The collected metrics will be stored as field values under the same measurement "exec_mycollector":
The collected metrics will be stored as fields under the measurement
"exec_mycollector":

```
exec_mycollector a=0.5,b_c=0.1,b_d=5 1452815002357578567
exec_mycollector a=0.5,b_c=0.1,b_d=5 1452815002357578567
```

Other options for modifying the measurement names are:
### Example 2

Now let's say we have the following configuration:

```
name_override = "newname"
name_prefix = "prefix_"
[[inputs.exec]]
# the command to run
command = "/usr/bin/line_protocol_collector"
# Data format to consume. This can be "json" or "influx" (line-protocol)
# NOTE json only reads numerical measurements, strings and booleans are ignored.
data_format = "influx"
```

And line_protocol_collector outputs the following line protocol:

```
cpu,cpu=cpu0,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
cpu,cpu=cpu1,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
cpu,cpu=cpu2,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
cpu,cpu=cpu3,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
cpu,cpu=cpu4,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
cpu,cpu=cpu5,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
cpu,cpu=cpu6,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
```

You will get data in InfluxDB exactly as it is defined above,
tags are cpu=cpuN, host=foo, and datacenter=us-east with fields usage_idle
and usage_busy. They will receive a timestamp at collection time.
47 changes: 31 additions & 16 deletions plugins/inputs/exec/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"os/exec"
"time"

"github.com/gonuts/go-shellquote"

Expand All @@ -14,18 +15,20 @@ import (
)

const sampleConfig = `
# NOTE This plugin only reads numerical measurements, strings and booleans
# will be ignored.
# the command to run
command = "/usr/bin/mycollector --foo=bar"
# Data format to consume. This can be "json" or "influx" (line-protocol)
# NOTE json only reads numerical measurements, strings and booleans are ignored.
data_format = "json"
# measurement name suffix (for separating different commands)
name_suffix = "_mycollector"
`

type Exec struct {
Command string
Command string
DataFormat string

runner Runner
}
Expand Down Expand Up @@ -71,20 +74,32 @@ func (e *Exec) Gather(acc telegraf.Accumulator) error {
return err
}

var jsonOut interface{}
err = json.Unmarshal(out, &jsonOut)
if err != nil {
return fmt.Errorf("exec: unable to parse output of '%s' as JSON, %s",
e.Command, err)
}

f := internal.JSONFlattener{}
err = f.FlattenJSON("", jsonOut)
if err != nil {
switch e.DataFormat {
case "", "json":
var jsonOut interface{}
err = json.Unmarshal(out, &jsonOut)
if err != nil {
return fmt.Errorf("exec: unable to parse output of '%s' as JSON, %s",
e.Command, err)
}

f := internal.JSONFlattener{}
err = f.FlattenJSON("", jsonOut)
if err != nil {
return err
}
acc.AddFields("exec", f.Fields, nil)
case "influx":
now := time.Now()
metrics, err := telegraf.ParseMetrics(out)
for _, metric := range metrics {
acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), now)
}
return err
default:
return fmt.Errorf("Unsupported data format: %s. Must be either json "+
"or influx.", e.DataFormat)
}

acc.AddFields("exec", f.Fields, nil)
return nil
}

Expand Down
73 changes: 73 additions & 0 deletions plugins/inputs/exec/exec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,18 @@ const malformedJson = `
"status": "green",
`

const lineProtocol = "cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1"

const lineProtocolMulti = `
cpu,cpu=cpu0,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
cpu,cpu=cpu1,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
cpu,cpu=cpu2,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
cpu,cpu=cpu3,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
cpu,cpu=cpu4,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
cpu,cpu=cpu5,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
cpu,cpu=cpu6,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
`

type runnerMock struct {
out []byte
err error
Expand Down Expand Up @@ -97,3 +109,64 @@ func TestCommandError(t *testing.T) {
require.Error(t, err)
assert.Equal(t, acc.NFields(), 0, "No new points should have been added")
}

func TestLineProtocolParse(t *testing.T) {
e := &Exec{
runner: newRunnerMock([]byte(lineProtocol), nil),
Command: "line-protocol",
DataFormat: "influx",
}

var acc testutil.Accumulator
err := e.Gather(&acc)
require.NoError(t, err)

fields := map[string]interface{}{
"usage_idle": float64(99),
"usage_busy": float64(1),
}
tags := map[string]string{
"host": "foo",
"datacenter": "us-east",
}
acc.AssertContainsTaggedFields(t, "cpu", fields, tags)
}

func TestLineProtocolParseMultiple(t *testing.T) {
e := &Exec{
runner: newRunnerMock([]byte(lineProtocolMulti), nil),
Command: "line-protocol",
DataFormat: "influx",
}

var acc testutil.Accumulator
err := e.Gather(&acc)
require.NoError(t, err)

fields := map[string]interface{}{
"usage_idle": float64(99),
"usage_busy": float64(1),
}
tags := map[string]string{
"host": "foo",
"datacenter": "us-east",
}
cpuTags := []string{"cpu0", "cpu1", "cpu2", "cpu3", "cpu4", "cpu5", "cpu6"}

for _, cpu := range cpuTags {
tags["cpu"] = cpu
acc.AssertContainsTaggedFields(t, "cpu", fields, tags)
}
}

func TestInvalidDataFormat(t *testing.T) {
e := &Exec{
runner: newRunnerMock([]byte(lineProtocol), nil),
Command: "bad data format",
DataFormat: "FooBar",
}

var acc testutil.Accumulator
err := e.Gather(&acc)
require.Error(t, err)
}