Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add wavefront plugin #3

Merged
merged 1 commit into from
May 9, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions plugins/outputs/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,5 @@ import (
_ "github.com/influxdata/telegraf/plugins/outputs/riemann"
_ "github.com/influxdata/telegraf/plugins/outputs/riemann_legacy"
_ "github.com/influxdata/telegraf/plugins/outputs/socket_writer"
_ "github.com/influxdata/telegraf/plugins/outputs/wavefront"
)
117 changes: 117 additions & 0 deletions plugins/outputs/wavefront/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
# Wavefront Output Plugin

This plugin writes to a [Wavefront](https://www.wavefront.com) proxy, in Wavefront data format over TCP.


## Wavefront Data format

The expected input for Wavefront is specified in the following way:

```
<metric> <value> [<timestamp>] <source|host>=<soureTagValue> [tagk1=tagv1 ...tagkN=tagvN]
```

More information about the Wavefront data format is available [here](https://community.wavefront.com/docs/DOC-1031)


By default, to ease Metrics browsing in the Wavefront UI, metrics are grouped by converting any `_` characters to `.` in the final name.
This behavior can be altered by changing the `metric_separator` and/or the `convert_paths` settings.
Most illegal characters in the metric name are automatically converted to `-`.
The `use_regex` setting can be used to ensure all illegal characters are properly handled, but can lead to performance degradation.

## Configuration:

```toml
# Configuration for Wavefront output
[[outputs.wavefront]]
## prefix for metrics keys
prefix = "my.specific.prefix."

## DNS name of the wavefront proxy server
host = "wavefront.example.com"

## Port that the Wavefront proxy server listens on
port = 2878

## wether to use "value" for name of simple fields
simple_fields = false

## character to use between metric and field name. defaults to . (dot)
metric_separator = "."

## Convert metric name paths to use metricSeperator character
## When true (default) will convert all _ (underscore) chartacters in final metric name
convert_paths = true

## Use Regex to sanitize metric and tag names from invalid characters
## Regex is more thorough, but significantly slower
use_regex = false

## point tags to use as the source name for Wavefront (if none found, host will be used)
source_override = ["hostname", "snmp_host", "node_host"]

## Print additional debug information requires debug = true at the agent level
debug_all = false
```

Parameters:

Prefix string
Host string
Port int
SimpleFields bool
MetricSeparator string
ConvertPaths bool
UseRegex bool
SourceOverride string
DebugAll bool

* `prefix`: String to use as a prefix for all sent metrics.
* `host`: Name of Wavefront proxy server
* `port`: Port that Wavefront proxy server is configured for `pushListenerPorts`
* `simple_fields`: if false (default) metric field names called `value` are converted to empty strings
* `metric_separator`: character to use to separate metric and field names. (default is `_`)
* `convert_paths`: if true (default) will convert all `_` in metric and field names to `metric_seperator`
* `use_regex`: if true (default is false) will use regex to ensure all illegal characters are converted to `-`. Regex is much slower than the default mode which will catch most illegal characters. Use with caution.
* `source_override`: ordered list of point tags to use as the source name for Wavefront. Once a match is found, that tag is used as the source for that point. If no tags are found the host tag will be used.
* `debug_all`: Will output additional debug information. Requires `debug = true` to be configured at the agent level


##

The Wavefront proxy interface can be simulated with this reader:

```
// wavefront_proxy_mock.go
package main

import (
"io"
"log"
"net"
"os"
)

func main() {
l, err := net.Listen("tcp", "localhost:2878")
if err != nil {
log.Fatal(err)
}
defer l.Close()
for {
conn, err := l.Accept()
if err != nil {
log.Fatal(err)
}
go func(c net.Conn) {
defer c.Close()
io.Copy(os.Stdout, c)
}(conn)
}
}

```

## Allowed values for metrics

Wavefront allows `integers` and `floats` as input values
262 changes: 262 additions & 0 deletions plugins/outputs/wavefront/wavefront.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,262 @@
package wavefront

import (
"fmt"
"net"
"regexp"
"sort"
"strconv"
"strings"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/outputs"
"log"
)

type Wavefront struct {
Prefix string
Host string
Port int
SimpleFields bool
MetricSeparator string
ConvertPaths bool
UseRegex bool
SourceOverride []string
DebugAll bool
}

// catch many of the invalid chars that could appear in a metric or tag name
var sanitizedChars = strings.NewReplacer(
"!", "-", "@", "-", "#", "-", "$", "-", "%", "-", "^", "-", "&", "-",
"*", "-", "(", "-", ")", "-", "+", "-", "`", "-", "'", "-", "\"", "-",
"[", "-", "]", "-", "{", "-", "}", "-", ":", "-", ";", "-", "<", "-",
">", "-", ",", "-", "?", "-", "/", "-", "\\", "-", "|", "-", " ", "-",
)

// instead of Replacer which may miss some special characters we can use a regex pattern, but this is significantly slower than Replacer
var sanitizedRegex, _ = regexp.Compile("[^a-zA-Z\\d_.-]")

var tagValueReplacer = strings.NewReplacer("\"", "\\\"", "*", "-")

var pathReplacer = strings.NewReplacer("_", "_")

var sampleConfig = `
## prefix for metrics keys
#prefix = "my.specific.prefix."

## DNS name of the wavefront proxy server
host = "wavefront.example.com"

## Port that the Wavefront proxy server listens on
port = 2878

## wether to use "value" for name of simple fields
#simple_fields = false

## character to use between metric and field name. defaults to . (dot)
#metric_separator = "."

## Convert metric name paths to use metricSeperator character
## When true (default) will convert all _ (underscore) chartacters in final metric name
#convert_paths = true

## Use Regex to sanitize metric and tag names from invalid characters
## Regex is more thorough, but significantly slower
#use_regex = false

## point tags to use as the source name for Wavefront (if none found, host will be used)
#source_override = ["hostname", "snmp_host", "node_host"]

## Print additional debug information requires debug = true at the agent level
#debug_all = false
`

type MetricLine struct {
Metric string
Value string
Timestamp int64
Tags string
}

func (w *Wavefront) Connect() error {

if w.ConvertPaths && w.MetricSeparator == "_" {
w.ConvertPaths = false
}
if w.ConvertPaths {
pathReplacer = strings.NewReplacer("_", w.MetricSeparator)
}

// Test Connection to Wavefront proxy Server
uri := fmt.Sprintf("%s:%d", w.Host, w.Port)
tcpAddr, err := net.ResolveTCPAddr("tcp", uri)
if err != nil {
return fmt.Errorf("Wavefront: TCP address cannot be resolved %s", err.Error())
}
connection, err := net.DialTCP("tcp", nil, tcpAddr)
if err != nil {
return fmt.Errorf("Wavefront: TCP connect fail %s", err.Error())
}
defer connection.Close()
return nil
}

func (w *Wavefront) Write(metrics []telegraf.Metric) error {
if len(metrics) == 0 {
return nil
}

// Send Data to Wavefront proxy Server
uri := fmt.Sprintf("%s:%d", w.Host, w.Port)
tcpAddr, _ := net.ResolveTCPAddr("tcp", uri)
connection, err := net.DialTCP("tcp", nil, tcpAddr)
if err != nil {
return fmt.Errorf("Wavefront: TCP connect fail %s", err.Error())
}
defer connection.Close()

for _, m := range metrics {
for _, metric := range buildMetrics(m, w) {
messageLine := fmt.Sprintf("%s %s %v %s\n", metric.Metric, metric.Value, metric.Timestamp, metric.Tags)
log.Printf("D! Output [wavefront] %s", messageLine)
_, err := connection.Write([]byte(messageLine))
if err != nil {
return fmt.Errorf("Wavefront: TCP writing error %s", err.Error())
}
}
}

return nil
}

func buildTags(mTags map[string]string, w *Wavefront) []string {
sourceTagFound := false

for _, s := range w.SourceOverride {
for k, v := range mTags {
if k == s {
mTags["source"] = v
mTags["telegraf_host"] = mTags["host"]
sourceTagFound = true
delete(mTags, k)
break
}
}
if sourceTagFound {
break
}
}

if !sourceTagFound {
mTags["source"] = mTags["host"]
}
delete(mTags, "host")

tags := make([]string, len(mTags))
index := 0
for k, v := range mTags {
if w.UseRegex {
tags[index] = fmt.Sprintf("%s=\"%s\"", sanitizedRegex.ReplaceAllString(k, "-"), tagValueReplacer.Replace(v))
} else {
tags[index] = fmt.Sprintf("%s=\"%s\"", sanitizedChars.Replace(k), tagValueReplacer.Replace(v))
}

index++
}

sort.Strings(tags)
return tags
}

func buildMetrics(m telegraf.Metric, w *Wavefront) []*MetricLine {
if w.DebugAll {
log.Printf("D! Output [wavefront] original name: %s\n", m.Name())
}

ret := []*MetricLine{}
for fieldName, value := range m.Fields() {
if w.DebugAll {
log.Printf("D! Output [wavefront] original field: %s\n", fieldName)
}

var name string
if !w.SimpleFields && fieldName == "value" {
name = fmt.Sprintf("%s%s", w.Prefix, m.Name())
} else {
name = fmt.Sprintf("%s%s%s%s", w.Prefix, m.Name(), w.MetricSeparator, fieldName)
}

if w.UseRegex {
name = sanitizedRegex.ReplaceAllLiteralString(name, "-")
} else {
name = sanitizedChars.Replace(name)
}

if w.ConvertPaths {
name = pathReplacer.Replace(name)
}

metric := &MetricLine{
Metric: name,
Timestamp: m.UnixNano() / 1000000000,
}
metricValue, buildError := buildValue(value, metric.Metric)
if buildError != nil {
log.Printf("E! Output [wavefront] %s\n", buildError.Error())
continue
}
metric.Value = metricValue
tagsSlice := buildTags(m.Tags(), w)
metric.Tags = fmt.Sprint(strings.Join(tagsSlice, " "))
ret = append(ret, metric)
}
return ret
}

func buildValue(v interface{}, name string) (string, error) {
var retv string
switch p := v.(type) {
case int64:
retv = IntToString(int64(p))
case uint64:
retv = UIntToString(uint64(p))
case float64:
retv = FloatToString(float64(p))
default:
return retv, fmt.Errorf("unexpected type: %T, with value: %v, for: %s", v, v, name)
}
return retv, nil
}

func IntToString(input_num int64) string {
return strconv.FormatInt(input_num, 10)
}

func UIntToString(input_num uint64) string {
return strconv.FormatUint(input_num, 10)
}

func FloatToString(input_num float64) string {
return strconv.FormatFloat(input_num, 'f', 6, 64)
}

func (w *Wavefront) SampleConfig() string {
return sampleConfig
}

func (w *Wavefront) Description() string {
return "Configuration for Wavefront server to send metrics to"
}

func (w *Wavefront) Close() error {
return nil
}

func init() {
outputs.Add("wavefront", func() telegraf.Output {
return &Wavefront{
MetricSeparator: ".",
ConvertPaths: true,
}
})
}
Loading