Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aurora plugin #1

Closed
wants to merge 38 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
61490f0
Packaging w/o s3 upload - initial commit
Aug 18, 2016
c181ae8
First cut of wavefront storage driver - quotes tag values
Aug 19, 2016
0bf443e
Merge pull request #1 from ezeev/master
ezeev Aug 19, 2016
0f4371f
committing latest changes before rebasing
Sep 15, 2016
a4ffb52
Merge remote-tracking branch 'origin/master'
Sep 15, 2016
b8f861c
Merge remote-tracking branch 'wavefront/master'
Sep 15, 2016
1ce9f4e
Added Metric_separator configuration property
puckpuck Oct 12, 2016
d672f8c
Move to Wavefront format
puckpuck Oct 21, 2016
23657b8
config names + special char handling
puckpuck Oct 25, 2016
34335bb
change host to source + simple value fields
puckpuck Oct 25, 2016
d2a68c7
Added new test cases for SimpleFields option
puckpuck Oct 25, 2016
e03bf27
updated output to use log. to stay consistent with all logging
puckpuck Oct 27, 2016
57abc93
Merge pull request #2 from puckpuck/master
ezeev Nov 2, 2016
db7c44e
Merge remote-tracking branch 'wavefrontHQ/master'
puckpuck Nov 3, 2016
3cca45c
brought up to date w/ telegraf master + new plugin changes
Nov 3, 2016
a0f412c
Merge remote-tracking branch 'wavefrontHQ/master'
puckpuck Nov 3, 2016
ca024e6
Fixed upstream merge conflicts
puckpuck Nov 4, 2016
f9813ef
Merge remote-tracking branch 'wavefrontHQ/master'
puckpuck Nov 4, 2016
a9d953d
Fixed upstream merge conflicts
puckpuck Nov 4, 2016
fcf7097
cleaned logging (again) and tagValueReplacer
puckpuck Nov 7, 2016
a6226b9
initial README.md
puckpuck Nov 8, 2016
72a2622
added support for source_tags
puckpuck Dec 5, 2016
91fdbe3
Fixed default config file to have proper option names, etc
puckpuck Dec 5, 2016
cbec765
Merge pull request #3 from puckpuck/wavefront-output
ezeev Dec 8, 2016
23d35c6
Changed name of Wavefront source option to source_override
puckpuck Dec 8, 2016
a00df0d
Merge pull request #4 from puckpuck/wavefront-output
ezeev Dec 13, 2016
c3ae757
opentsdb: add tcp:// prefix if not present
sparrc Jan 23, 2017
8282311
Add newline to influx line-protocol if not present
sparrc Jan 23, 2017
10d0a78
influxdb output: treat field type conflicts as a successful write
sparrc Jan 24, 2017
b2c1d98
empty commit for tagging release 1.2
sparrc Jan 24, 2017
853178c
32-bit binary for windows and freebsd
sparrc Jan 24, 2017
45ea321
running output: Drop nil metrics
sparrc Feb 1, 2017
c9d359f
metric: Fix negative number handling
sparrc Feb 1, 2017
3b6ffb3
Go 1.7.4 -> 1.7.5
sparrc Feb 1, 2017
3dab206
first pass
Feb 22, 2017
6271b42
Merge remote-tracking branch 'wavefront-hq/master' into aurora-plugin
Feb 22, 2017
14b1d1c
Update telegraf and docker
Feb 23, 2017
65b7d6b
Add support for aurora
Feb 23, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,10 @@ plugins, not just statsd.
- [#1973](https://github.com/influxdata/telegraf/issues/1973): Partial fix: logparser CLF pattern with IPv6 addresses.
- [#1975](https://github.com/influxdata/telegraf/issues/1975) & [#2102](https://github.com/influxdata/telegraf/issues/2102): Fix thread-safety when using multiple instances of the statsd input plugin.
- [#2027](https://github.com/influxdata/telegraf/issues/2027): docker input: interface conversion panic fix.
- [#1814](https://github.com/influxdata/telegraf/issues/1814): snmp: ensure proper context is present on error messages
- [#1814](https://github.com/influxdata/telegraf/issues/1814): snmp: ensure proper context is present on error messages.
- [#2299](https://github.com/influxdata/telegraf/issues/2299): opentsdb: add tcp:// prefix if no scheme provided.
- [#2297](https://github.com/influxdata/telegraf/issues/2297): influx parser: parse line-protocol without newlines.
- [#2245](https://github.com/influxdata/telegraf/issues/2245): influxdb output: fix field type conflict blocking output buffer.

## v1.1.2 [2016-12-12]

Expand Down
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ build-for-docker:
package:
./scripts/build.py --package --version="$(VERSION)" --platform=linux --arch=all --upload

package-wavefront:
./scripts/build.py --package --version="$(VERSION)" --platform=linux --arch=all

# Get dependencies and use gdm to checkout changesets
prepare:
go get github.com/sparrc/gdm
Expand Down
6 changes: 3 additions & 3 deletions circle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ machine:
post:
- sudo service zookeeper stop
- go version
- go version | grep 1.7.4 || sudo rm -rf /usr/local/go
- wget https://storage.googleapis.com/golang/go1.7.4.linux-amd64.tar.gz
- sudo tar -C /usr/local -xzf go1.7.4.linux-amd64.tar.gz
- go version | grep 1.7.5 || sudo rm -rf /usr/local/go
- wget https://storage.googleapis.com/golang/go1.7.5.linux-amd64.tar.gz
- sudo tar -C /usr/local -xzf go1.7.5.linux-amd64.tar.gz
- go version

dependencies:
Expand Down
3 changes: 3 additions & 0 deletions internal/models/running_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ func NewRunningOutput(
// AddMetric adds a metric to the output. This function can also write cached
// points if FlushBufferWhenFull is true.
func (ro *RunningOutput) AddMetric(m telegraf.Metric) {
if m == nil {
return
}
// Filter any tagexclude/taginclude parameters before adding metric
if ro.Config.Filter.IsActive() {
// In order to filter out tags, we need to create a new metric, since
Expand Down
17 changes: 17 additions & 0 deletions internal/models/running_output_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,23 @@ func BenchmarkRunningOutputAddFailWrites(b *testing.B) {
}
}

func TestAddingNilMetric(t *testing.T) {
conf := &OutputConfig{
Filter: Filter{},
}

m := &mockOutput{}
ro := NewRunningOutput("test", m, conf, 1000, 10000)

ro.AddMetric(nil)
ro.AddMetric(nil)
ro.AddMetric(nil)

err := ro.Write()
assert.NoError(t, err)
assert.Len(t, m.Metrics(), 0)
}

// Test that NameDrop filters ger properly applied.
func TestRunningOutput_DropFilter(t *testing.T) {
conf := &OutputConfig{
Expand Down
2 changes: 1 addition & 1 deletion metric/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ func (m *metric) Fields() map[string]interface{} {
case '"':
// string field
fieldMap[unescape(string(m.fields[i:][0:i1]), "fieldkey")] = unescape(string(m.fields[i:][i2+1:i3-1]), "fieldval")
case '0', '1', '2', '3', '4', '5', '6', '7', '8', '9':
case '-', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9':
// number field
switch m.fields[i:][i3-1] {
case 'i':
Expand Down
23 changes: 23 additions & 0 deletions metric/parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ cpu,host=foo,datacenter=us-east idle=99,busy=1i,b=true,s="string"
cpu,host=foo,datacenter=us-east idle=99,busy=1i,b=true,s="string"
`

const negMetrics = `weather,host=local temp=-99i,temp_float=-99.4 1465839830100400200
`

// some metrics are invalid
const someInvalid = `cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
Expand Down Expand Up @@ -85,6 +88,26 @@ func TestParse(t *testing.T) {
}
}

func TestParseNegNumbers(t *testing.T) {
metrics, err := Parse([]byte(negMetrics))
assert.NoError(t, err)
assert.Len(t, metrics, 1)

assert.Equal(t,
map[string]interface{}{
"temp": int64(-99),
"temp_float": float64(-99.4),
},
metrics[0].Fields(),
)
assert.Equal(t,
map[string]string{
"host": "local",
},
metrics[0].Tags(),
)
}

func TestParseErrors(t *testing.T) {
start := time.Now()
metrics, err := Parse([]byte(someInvalid))
Expand Down
1 change: 1 addition & 0 deletions plugins/inputs/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,5 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/win_perf_counters"
_ "github.com/influxdata/telegraf/plugins/inputs/zfs"
_ "github.com/influxdata/telegraf/plugins/inputs/zookeeper"
_ "github.com/influxdata/telegraf/plugins/inputs/aurora"
)
147 changes: 147 additions & 0 deletions plugins/inputs/aurora/aurora.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package aurora

import (
"fmt"
"io/ioutil"
"log"
"net/http"
"regexp"
"strconv"
"strings"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
)

type Aurora struct {
Timeout int
Master string
HttpPrefix string
Numeric bool
}

var sampleConfig = `
## Timeout, in ms.
timeout = 100
## Aurora Master
master = "localhost:8081"
## Http Prefix
prefix = "http"
## Numeric values only
numeric = true
`

// SampleConfig returns a sample configuration block
func (a *Aurora) SampleConfig() string {
return sampleConfig
}

// Description just returns a short description of the Mesos plugin
func (a *Aurora) Description() string {
return "Telegraf plugin for gathering metrics from N Apache Aurora Masters"
}

func (a *Aurora) SetDefaults() {
if a.Timeout == 0 {
log.Println("I! [aurora] Missing timeout value, setting default value (100ms)")
a.Timeout = 1000
} else if a.HttpPrefix == "" {
log.Println("I! [aurora] Missing http prefix value, setting default value (http)")
a.HttpPrefix = "http"
}
}

func convertToNumeric(value string) (interface{}, bool) {
var err error
var val interface{}
if val, err = strconv.ParseFloat(value, 64); err == nil {
return val, true
}
if val, err = strconv.ParseBool(value); err != nil {
return val.(bool), false
}
return val, true
}

func isJobMetric(key string) bool {
// Regex for matching job specific tasks
re := regexp.MustCompile("^sla_(.*?)/(.*?)/.*")
return re.MatchString(key)
}

func parseJobSpecificMetric(key string, value interface{}) (map[string]interface{}, map[string]string) {
// cut off the sla_
key = key[4:]
slashSplit := strings.Split(key, "/")
role := slashSplit[0]
env := slashSplit[1]
underscoreIdx := strings.Index(slashSplit[2], "_")
job := slashSplit[2][:underscoreIdx]
metric := slashSplit[2][underscoreIdx+1:]

fields := make(map[string]interface{})
fields[metric] = value

tags := make(map[string]string)
tags["role"] = role
tags["env"] = env
tags["job"] = job
return fields, tags
}

// Gather() metrics from given list of Mesos Masters
func (a *Aurora) Gather(acc telegraf.Accumulator) error {
a.SetDefaults()

client := http.Client{
Timeout: time.Duration(a.Timeout) * time.Second,
}
url := fmt.Sprintf("%s://%s/vars", a.HttpPrefix, a.Master)
resp, err := client.Get(url)
if err != nil {
return err
}

data, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
defer resp.Body.Close()

// Map for fields
nonJobFields := make(map[string]interface{})

lines := strings.Split(string(data), "\n")
for _, line := range lines {
splitIdx := strings.Index(line, " ")
if splitIdx == -1 {
continue
}
key := line[:splitIdx]
value := line[splitIdx+1:]
// If numeric is true and the metric is not numeric then ignore
numeric, isNumeric := convertToNumeric(value)
if a.Numeric && !isNumeric {
continue
}

// If it matches this, then we want to parse it specially because it has jobnames in it
if isJobMetric(key) {
fields, tags := parseJobSpecificMetric(key, numeric)
// Per job there are different tags so need to add a field per line
acc.AddFields("aurora", fields, tags)
} else {
// No tags for other fields so can group add them
nonJobFields[key] = numeric
}
}
acc.AddFields("aurora", nonJobFields, make(map[string]string))
return nil
}

func init() {
inputs.Add("aurora", func() telegraf.Input {
return &Aurora{}
})
}
130 changes: 130 additions & 0 deletions plugins/inputs/aurora/aurora_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package aurora

import (
"fmt"
"net/http"
"net/http/httptest"
"os"
"testing"

"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
)

var masterServer *httptest.Server

//sla_role/prod/jobname_

func getRawMetrics() string {
return `assigner_launch_failures 0
cron_job_triggers 240
sla_cluster_mtta_ms 18
sla_disk_small_mttr_ms 1029
sla_cpu_small_mtta_ms 17
jvm_prop_java.endorsed.dirs /usr/lib/jvm/java-8-openjdk-amd64/jre/lib/endorsed`
}

func TestMain(m *testing.M) {
metrics := getRawMetrics()

masterRouter := http.NewServeMux()
masterRouter.HandleFunc("/vars", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, metrics)
})
masterServer = httptest.NewServer(masterRouter)

rc := m.Run()

masterServer.Close()
os.Exit(rc)
}

func TestConvertToNumeric(t *testing.T) {
if _, isNumeric := convertToNumeric("0.000"); !isNumeric {
t.Fatalf("0.000 should have been numeric")
}
if _, isNumeric := convertToNumeric("7"); !isNumeric {
t.Fatalf("7 should have been numeric")
}
if boolVal, isNumeric := convertToNumeric("true"); !isNumeric {
if val := boolVal.(int); val != 1 {
t.Fatalf("true should have been converted to a 1")
}
t.Fatalf("true should have been numeric")
}
if boolVal, isNumeric := convertToNumeric("false"); !isNumeric {
if val := boolVal.(int); val != 0 {
t.Fatalf("true should have been converted to a 0")
}
t.Fatalf("true should have been numeric")
}
if _, isNumeric := convertToNumeric("&"); isNumeric {
t.Fatalf("& should not be numeric")
}
}

func TestIsJobMetric(t *testing.T) {
var notJobMetrics = []string{
"assigner_launch_failures",
"cron_job_triggers",
"sla_cluster_mtta_ms",
"sla_disk_small_mttr_ms",
"sla_cpu_small_mtta_ms",
}
for _, metric := range notJobMetrics {
if isJobMetric(metric) {
t.Fatalf("%s should not be a job metric", metric)
}
}
var isJobMetrics = []string{
"sla_role2/prod2/jobname2_job_uptime_50.00_sec",
}
for _, metric := range isJobMetrics {
if !isJobMetric(metric) {
t.Fatalf("%s should be a job metric", metric)
}
}
}

func TestParseJobSpecificMetric(t *testing.T) {
var expectedFields = map[string]interface{}{
"job_uptime_50.00_sec": 0,
}
var expectedTags = map[string]string {
"role": "role2",
"env": "prod2",
"job": "jobname2",
}
key := "sla_role2/prod2/jobname2_job_uptime_50.00_sec"
value := 0
fields, tags := parseJobSpecificMetric(key, value)
assert.Equal(t, fields, expectedFields)
assert.Equal(t, tags, expectedTags)
}

func TestAuroraMaster(t *testing.T) {
var acc testutil.Accumulator

m := Aurora{
Master: masterServer.Listener.Addr().String(),
Timeout: 10,
HttpPrefix: "http",
Numeric: true,
}

err := m.Gather(&acc)
if err != nil {
t.Errorf(err.Error())
}

var referenceMetrics = map[string]interface{} {
"assigner_launch_failures": 0.0,
"cron_job_triggers": 240.0,
"sla_cluster_mtta_ms": 18.0,
"sla_disk_small_mttr_ms": 1029.0,
"sla_cpu_small_mtta_ms": 17.0,
}

acc.AssertContainsFields(t, "aurora", referenceMetrics)
}
Loading