Skip to content

Commit

Permalink
Initial jolokia2 input plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
dylanmei committed Jan 25, 2017
1 parent 20bf90e commit 56c53a0
Show file tree
Hide file tree
Showing 16 changed files with 2,004 additions and 50 deletions.
115 changes: 65 additions & 50 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"errors"
"fmt"
"io"
"io/ioutil"
"log"
"os"
Expand Down Expand Up @@ -554,21 +555,67 @@ func (c *Config) LoadConfig(path string) error {
return err
}
}
tbl, err := parseFile(path)
contents, err := loadFile(path)
if err != nil {
return fmt.Errorf("Error loading %s, %s", path, err)
}

if err = c.ParseConfig(bytes.NewBuffer(contents)); err != nil {
return fmt.Errorf("Error parsing %s, %s", path, err)
}

return nil
}

// trimBOM trims the Byte-Order-Marks from the beginning of the file.
// this is for Windows compatability only.
// see https://github.com/influxdata/telegraf/issues/1378
func trimBOM(f []byte) []byte {
return bytes.TrimPrefix(f, []byte("\xef\xbb\xbf"))
}

// loadFile loads a TOML configuration. When loading the file, it
// will find environment variables and replace them.
func loadFile(fpath string) ([]byte, error) {
contents, err := ioutil.ReadFile(fpath)
if err != nil {
return nil, err
}
// ugh windows why
contents = trimBOM(contents)

env_vars := envVarRe.FindAll(contents, -1)
for _, env_var := range env_vars {
env_val := os.Getenv(strings.TrimPrefix(string(env_var), "$"))
if env_val != "" {
contents = bytes.Replace(contents, env_var, []byte(env_val), 1)
}
}

return contents, nil
}

func (c *Config) ParseConfig(reader io.Reader) error {
contents, err := ioutil.ReadAll(reader)
if err != nil {
return err
}

tbl, err := toml.Parse(contents)
if err != nil {
return err
}

// Parse tags tables first:
for _, tableName := range []string{"tags", "global_tags"} {
if val, ok := tbl.Fields[tableName]; ok {
subTable, ok := val.(*ast.Table)
if !ok {
return fmt.Errorf("%s: invalid configuration", path)
return fmt.Errorf("invalid configuration")
}
if err = config.UnmarshalTable(subTable, c.Tags); err != nil {
log.Printf("E! Could not parse [global_tags] config\n")
return fmt.Errorf("Error parsing %s, %s", path, err)
return err
}
}
}
Expand All @@ -577,19 +624,19 @@ func (c *Config) LoadConfig(path string) error {
if val, ok := tbl.Fields["agent"]; ok {
subTable, ok := val.(*ast.Table)
if !ok {
return fmt.Errorf("%s: invalid configuration", path)
return fmt.Errorf("invalid configuration")
}
if err = config.UnmarshalTable(subTable, c.Agent); err != nil {
log.Printf("E! Could not parse [agent] config\n")
return fmt.Errorf("Error parsing %s, %s", path, err)
return err
}
}

// Parse all the rest of the plugins:
for name, val := range tbl.Fields {
subTable, ok := val.(*ast.Table)
if !ok {
return fmt.Errorf("%s: invalid configuration", path)
return fmt.Errorf("invalid configuration")
}

switch name {
Expand All @@ -600,17 +647,16 @@ func (c *Config) LoadConfig(path string) error {
// legacy [outputs.influxdb] support
case *ast.Table:
if err = c.addOutput(pluginName, pluginSubTable); err != nil {
return fmt.Errorf("Error parsing %s, %s", path, err)
return err
}
case []*ast.Table:
for _, t := range pluginSubTable {
if err = c.addOutput(pluginName, t); err != nil {
return fmt.Errorf("Error parsing %s, %s", path, err)
return err
}
}
default:
return fmt.Errorf("Unsupported config format: %s, file %s",
pluginName, path)
return fmt.Errorf("Unsupported config format: %s", pluginName)
}
}
case "inputs", "plugins":
Expand All @@ -619,17 +665,16 @@ func (c *Config) LoadConfig(path string) error {
// legacy [inputs.cpu] support
case *ast.Table:
if err = c.addInput(pluginName, pluginSubTable); err != nil {
return fmt.Errorf("Error parsing %s, %s", path, err)
return err
}
case []*ast.Table:
for _, t := range pluginSubTable {
if err = c.addInput(pluginName, t); err != nil {
return fmt.Errorf("Error parsing %s, %s", path, err)
return err
}
}
default:
return fmt.Errorf("Unsupported config format: %s, file %s",
pluginName, path)
return fmt.Errorf("Unsupported config format: %s", pluginName)
}
}
case "processors":
Expand All @@ -638,12 +683,11 @@ func (c *Config) LoadConfig(path string) error {
case []*ast.Table:
for _, t := range pluginSubTable {
if err = c.addProcessor(pluginName, t); err != nil {
return fmt.Errorf("Error parsing %s, %s", path, err)
return err
}
}
default:
return fmt.Errorf("Unsupported config format: %s, file %s",
pluginName, path)
return fmt.Errorf("Unsupported config format: %s", pluginName)
}
}
case "aggregators":
Expand All @@ -652,56 +696,27 @@ func (c *Config) LoadConfig(path string) error {
case []*ast.Table:
for _, t := range pluginSubTable {
if err = c.addAggregator(pluginName, t); err != nil {
return fmt.Errorf("Error parsing %s, %s", path, err)
return err
}
}
default:
return fmt.Errorf("Unsupported config format: %s, file %s",
pluginName, path)
return fmt.Errorf("Unsupported config format: %s", pluginName)
}
}
// Assume it's an input input for legacy config file support if no other
// identifiers are present
default:
if err = c.addInput(name, subTable); err != nil {
return fmt.Errorf("Error parsing %s, %s", path, err)
return err
}
}
}

if len(c.Processors) > 1 {
sort.Sort(c.Processors)
}
return nil
}

// trimBOM trims the Byte-Order-Marks from the beginning of the file.
// this is for Windows compatability only.
// see https://github.com/influxdata/telegraf/issues/1378
func trimBOM(f []byte) []byte {
return bytes.TrimPrefix(f, []byte("\xef\xbb\xbf"))
}

// parseFile loads a TOML configuration from a provided path and
// returns the AST produced from the TOML parser. When loading the file, it
// will find environment variables and replace them.
func parseFile(fpath string) (*ast.Table, error) {
contents, err := ioutil.ReadFile(fpath)
if err != nil {
return nil, err
}
// ugh windows why
contents = trimBOM(contents)

env_vars := envVarRe.FindAll(contents, -1)
for _, env_var := range env_vars {
env_val := os.Getenv(strings.TrimPrefix(string(env_var), "$"))
if env_val != "" {
contents = bytes.Replace(contents, env_var, []byte(env_val), 1)
}
}

return toml.Parse(contents)
return nil
}

func (c *Config) addAggregator(name string, table *ast.Table) error {
Expand Down
18 changes: 18 additions & 0 deletions internal/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package config

import (
"os"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -174,3 +175,20 @@ func TestConfig_LoadDirectory(t *testing.T) {
assert.Equal(t, pConfig, c.Inputs[3].Config,
"Merged Testdata did not produce correct procstat metadata.")
}

func TestConfig_ParseConfig(t *testing.T) {
c := NewConfig()
r := strings.NewReader(`
[global_tags]
foo = "bar"
[agent]
debug = true
`)

err := c.ParseConfig(r)
assert.NoError(t, err)

assert.Equal(t, c.Tags["foo"], "bar")
assert.True(t, c.Agent.Debug)
}
1 change: 1 addition & 0 deletions plugins/inputs/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/ipmi_sensor"
_ "github.com/influxdata/telegraf/plugins/inputs/iptables"
_ "github.com/influxdata/telegraf/plugins/inputs/jolokia"
_ "github.com/influxdata/telegraf/plugins/inputs/jolokia2"
_ "github.com/influxdata/telegraf/plugins/inputs/kafka_consumer"
_ "github.com/influxdata/telegraf/plugins/inputs/kubernetes"
_ "github.com/influxdata/telegraf/plugins/inputs/leofs"
Expand Down
111 changes: 111 additions & 0 deletions plugins/inputs/jolokia2/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
# Jolokia2 Input Plugin

The [Jolokia](http://jolokia.org) input plugin collects JVM metrics exposed as JMX MBean attributes
through the Jolokia REST endpoint and its [JSON-over-HTTP protocol](https://jolokia.org/reference/html/protocol.html).

### Configuration:

```toml

# Read JMX metrics through Jolokia

[[inputs.jolokia2]]
#default_field_delimiter = "."
#default_field_prefix = ""
#default_tag_delimiter = "_"
#default_tag_prefix = "mbean"

# Add agents to query
[inputs.jolokia2.agents]
urls = ["http://kafka:8080/jolokia"]

[[inputs.jolokia2.metric]]
name = "jvm_runtime"
mbean = "java.lang:type=Runtime"
paths = ["Uptime"]

[[inputs.jolokia2.metric]]
name = "jvm_memory"
mbean = "java.lang:type=Memory"
paths = ["HeapMemoryUsage", "NonHeapMemoryUsage", "ObjectPendingFinalizationCount"]

# By default, all mbean keys are added as tags
# Use 'taginclude' to specify the exact tags to add.
[[inputs.jolokia2.metric]]
name = "jvm_g1_garbage_collector"
mbean = "java.lang:name=G1*,type=GarbageCollector"
paths = [
"CollectionTime",
"CollectionCount",
"LastGcInfo/duration",
"LastGcInfo/GcThreadCount",
]
taginclude = ["name"]

# Use 'tagexclude' to specify just the tags to remove.
[[inputs.jolokia2.metric]]
name = "jvm_memory_pool"
mbean = "java.lang:name=*,type=MemoryPool"
paths = ["Usage", "PeakUsage, "CollectionUsage"]
tagexclude = ["type"]

[[inputs.jolokia2.metric]]
name = "kafka_topic"
mbean = "kafka.server:name=*,topic=*,type=BrokerTopicMetrics"
field_prefix = "$1"
taginclude = ["topic"]

[[inputs.jolokia2.metric]]
name = "kafka_log"
mbean = "kafka.log:name=*,partition=*,topic=*,type=Log"
field_name = "$1"
taginclude = ["topic", "partition"]
```

To specify timeouts for slower/over-loaded clients:

```
[[inputs.jolokia2]]
[inputs.jolokia2.agents]
urls = ["http://kafka:8080/jolokia"]

# The amount of time to wait for any requests made by this client.
# Includes connection time, any redirects, and reading the response body.
# (default is 5s)
response_timeout = "10s"
```

To specify SSL options, add details to the `agents` configuration:

```
[[inputs.jolokia2]]
[inputs.jolokia2.agents]
urls = [
"https://kafka:8080/jolokia",
]
#username = ""
#password = ""
ssl_ca = "/var/private/ca.pem"
ssl_cert = "/var/private/client.pem"
ssl_key = "/var/private/client-key.pem"
#insecure_skip_verify = false
```

To interact with agents via a Jolokia proxy, use a `proxy` configuration instead:

```
[[inputs.jolokia2]]
[inputs.jolokia2.proxy]
url = "https://proxy:8080/jolokia"
response_timeout = "10s"
#default_target_username = ""
#default_target_password = ""
ssl_ca = "/var/private/ca.pem"
ssl_cert = "/var/private/client.pem"
ssl_key = "/var/private/client-key.pem"
[[inputs.jolokia2.proxy.target]]
url = "service:jmx:rmi:///jndi/rmi://targethost:9999/jmxrmi"
#username = ""
#password = ""
```
Loading

0 comments on commit 56c53a0

Please sign in to comment.