Skip to content

Commit

Permalink
Add plugin for Twemproxy
Browse files Browse the repository at this point in the history
This plugin collects data from Twemproxy's stats interface
  • Loading branch information
codeb2cc authored and Jeffrey Allen committed Nov 18, 2015
1 parent 12ee889 commit 6467fcd
Show file tree
Hide file tree
Showing 3 changed files with 317 additions and 0 deletions.
1 change: 1 addition & 0 deletions plugins/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
_ "github.com/influxdb/telegraf/plugins/rethinkdb"
_ "github.com/influxdb/telegraf/plugins/statsd"
_ "github.com/influxdb/telegraf/plugins/system"
_ "github.com/influxdb/telegraf/plugins/twemproxy"
_ "github.com/influxdb/telegraf/plugins/zfs"
_ "github.com/influxdb/telegraf/plugins/zookeeper"
)
182 changes: 182 additions & 0 deletions plugins/twemproxy/twemproxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
package twemproxy

import (
"encoding/json"
"errors"
"io/ioutil"
"net"
"strings"
"sync"
"time"

"github.com/influxdb/telegraf/plugins"
)

type Twemproxy struct {
Instances []TwemproxyInstance
}

type TwemproxyInstance struct {
StatsAddr string
Pools []string
}

var sampleConfig = `
# Twemproxy plugin config
[twemproxy]
[[twemproxy.instances]]
# Twemproxy stats address and port(NO scheme!)
statsAddr = "10.16.29.1:22222"
# Monitor pool name
pools = ["redis_pool", "mc_pool"]
`

func (t *Twemproxy) SampleConfig() string {
return sampleConfig
}

func (t *Twemproxy) Description() string {
return "Read Twemproxy stats data"
}

// Gather data from all Twemproxy instances
func (t *Twemproxy) Gather(acc plugins.Accumulator) error {
var wg sync.WaitGroup
errorChan := make(chan error, len(t.Instances))
for _, inst := range t.Instances {
wg.Add(1)
go func(inst TwemproxyInstance) {
defer wg.Done()
if err := inst.Gather(acc); err != nil {
errorChan <- err
}
}(inst)
}
wg.Wait()

close(errorChan)
errs := []string{}
for err := range errorChan {
errs = append(errs, err.Error())
}
if len(errs) == 0 {
return nil
}
return errors.New(strings.Join(errs, "\n"))
}

// Gather data from one Twemproxy
func (ti *TwemproxyInstance) Gather(
acc plugins.Accumulator,
) error {
conn, err := net.DialTimeout("tcp", ti.StatsAddr, 1*time.Second)
if err != nil {
return err
}
body, err := ioutil.ReadAll(conn)
if err != nil {
return err
}

var stats map[string]interface{}
if err = json.Unmarshal(body, &stats); err != nil {
return errors.New("Error decoding JSON response")
}

tags := make(map[string]string)
tags["twemproxy"] = ti.StatsAddr
ti.processStat(acc, tags, stats)

return nil
}

// Process Twemproxy server stats
func (ti *TwemproxyInstance) processStat(
acc plugins.Accumulator,
tags map[string]string,
data map[string]interface{},
) {
if source, ok := data["source"]; ok {
if val, ok := source.(string); ok {
tags["source"] = val
}
}

metrics := []string{"total_connections", "curr_connections", "timestamp"}
for _, m := range metrics {
if value, ok := data[m]; ok {
if val, ok := value.(float64); ok {
acc.Add(m, val, tags)
}
}
}

for _, pool := range ti.Pools {
if poolStat, ok := data[pool]; ok {
if data, ok := poolStat.(map[string]interface{}); ok {
poolTags := copyTags(tags)
poolTags["pool"] = pool
ti.processPool(acc, poolTags, pool+"_", data)
}
}
}
}

// Process pool data in Twemproxy stats
func (ti *TwemproxyInstance) processPool(
acc plugins.Accumulator,
tags map[string]string,
prefix string,
data map[string]interface{},
) {
serverTags := make(map[string]map[string]string)

for key, value := range data {
switch key {
case "client_connections", "forward_error", "client_err", "server_ejects", "fragments", "client_eof":
if val, ok := value.(float64); ok {
acc.Add(prefix+key, val, tags)
}
default:
if data, ok := value.(map[string]interface{}); ok {
if _, ok := serverTags[key]; !ok {
serverTags[key] = copyTags(tags)
serverTags[key]["server"] = key
}
ti.processServer(acc, serverTags[key], prefix, data)
}
}
}
}

// Process backend server(redis/memcached) stats
func (ti *TwemproxyInstance) processServer(
acc plugins.Accumulator,
tags map[string]string,
prefix string,
data map[string]interface{},
) {
for key, value := range data {
switch key {
default:
if val, ok := value.(float64); ok {
acc.Add(prefix+key, val, tags)
}
}
}
}

// Tags is not expected to be mutated after passing to Add.
func copyTags(tags map[string]string) map[string]string {
newTags := make(map[string]string)
for k, v := range tags {
newTags[k] = v
}
return newTags
}

func init() {
plugins.Add("twemproxy", func() plugins.Plugin {
return &Twemproxy{}
})
}
134 changes: 134 additions & 0 deletions plugins/twemproxy/twemproxy_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package twemproxy

import (
"net"
"testing"
"encoding/json"

"github.com/influxdb/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

const sampleStatsAddr = "127.0.0.1:22222"

const sampleStats = `{
"total_connections": 276448,
"uptime": 160657,
"version": "0.4.1",
"service": "nutcracker",
"curr_connections": 1322,
"source": "server1.website.com",
"demo": {
"client_connections": 1305,
"forward_error": 11684,
"client_err": 147942,
"server_ejects": 0,
"fragments": 0,
"client_eof": 126813,
"10.16.29.1:6379": {
"requests": 43604566,
"server_eof": 0,
"out_queue": 0,
"server_err": 0,
"out_queue_bytes": 0,
"in_queue": 0,
"server_timedout": 24,
"request_bytes": 2775840400,
"server_connections": 1,
"response_bytes": 7663182096,
"in_queue_bytes": 0,
"server_ejected_at": 0,
"responses": 43603900
},
"10.16.29.2:6379": {
"requests": 37870211,
"server_eof": 0,
"out_queue": 0,
"server_err": 0,
"out_queue_bytes": 0,
"in_queue": 0,
"server_timedout": 25,
"request_bytes": 2412114759,
"server_connections": 1,
"response_bytes": 5228980582,
"in_queue_bytes": 0,
"server_ejected_at": 0,
"responses": 37869551
}
},
"timestamp": 1447312436
}`

func mockTwemproxyServer() (net.Listener, error) {
listener, err := net.Listen("tcp", sampleStatsAddr)
if err != nil {
return nil, err
}
go func(l net.Listener) {
for {
conn, _ := l.Accept()
conn.Write([]byte(sampleStats))
conn.Close()
break
}
}(listener)

return listener, nil
}

func TestGather(t *testing.T) {
mockServer, err := mockTwemproxyServer()
if err != nil {
panic(err)
}
defer mockServer.Close()

twemproxy := &Twemproxy{
Instances: []TwemproxyInstance{
TwemproxyInstance{
StatsAddr: sampleStatsAddr,
Pools: []string{"demo"},
},
},
}

var acc testutil.Accumulator
err = twemproxy.Instances[0].Gather(&acc)
require.NoError(t, err)

var sourceData map[string]interface{}
if err := json.Unmarshal([]byte(sampleStats), &sourceData); err != nil {
panic(err)
}

metrics := []string{"total_connections", "curr_connections", "timestamp"}
tags := map[string]string{
"twemproxy": sampleStatsAddr,
"source": sourceData["source"].(string),
}
for _, m := range metrics {
assert.NoError(t, acc.ValidateTaggedValue(m, sourceData[m].(float64), tags))
}

poolName := "demo"
poolMetrics := []string{
"client_connections", "forward_error", "client_err", "server_ejects",
"fragments", "client_eof",
}
tags["pool"] = poolName
poolData := sourceData[poolName].(map[string]interface{})
for _, m := range poolMetrics {
measurement := poolName + "_" + m
assert.NoError(t, acc.ValidateTaggedValue(measurement, poolData[m].(float64), tags))
}
poolServers := []string{"10.16.29.1:6379", "10.16.29.2:6379"}
for _, s := range poolServers {
tags["server"] = s
serverData := poolData[s].(map[string]interface{})
for k, v := range serverData {
measurement := poolName + "_" + k
assert.NoError(t, acc.ValidateTaggedValue(measurement, v, tags))
}
}
}

0 comments on commit 6467fcd

Please sign in to comment.