Skip to content

Commit

Permalink
Merge pull request #546 from peekeri/graphite_udp_api
Browse files Browse the repository at this point in the history
graphite api: add support for sending metrics over UDP
  • Loading branch information
pauldix committed May 16, 2014
2 parents bd33adf + 50d8050 commit 6a74cfd
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 32 deletions.
87 changes: 64 additions & 23 deletions src/api/graphite/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"io"
"net"
"protocol"
"strings"
"time"

log "code.google.com/p/log4go"
Expand All @@ -32,8 +33,10 @@ type Server struct {
coordinator coordinator.Coordinator
clusterConfig *cluster.ClusterConfiguration
conn net.Listener
udpConn *net.UDPConn
user *cluster.ClusterAdmin
shutdown chan bool
udpEnabled bool
}

// TODO: check that database exists and create it if not
Expand All @@ -44,6 +47,8 @@ func NewServer(config *configuration.Configuration, coord coordinator.Coordinato
self.coordinator = coord
self.shutdown = make(chan bool, 1)
self.clusterConfig = clusterConfig
self.udpEnabled = config.GraphiteUdpEnabled

return self
}

Expand All @@ -65,6 +70,11 @@ func (self *Server) ListenAndServe() {
return
}
}
if self.udpEnabled {
udpAddress, _ := net.ResolveUDPAddr("udp", self.listenAddress)
self.udpConn, _ = net.ListenUDP("udp", udpAddress)
go self.ServeUdp(self.udpConn)
}
self.Serve(self.conn)
}

Expand All @@ -83,7 +93,30 @@ func (self *Server) Serve(listener net.Listener) {
}
}

func (self *Server) ServeUdp(conn *net.UDPConn) {
var buf []byte = make([]byte, 65536)
for {
n, _, err := conn.ReadFromUDP(buf)
if err != nil {
log.Warn("Error when reading from UDP connection %s", err.Error())
}
go self.handleUdpMessage(string(buf[:n]))
}
}

func (self *Server) handleUdpMessage(msg string) {
metrics := strings.Split(msg, "\n")
for _, metric := range metrics {
reader := bufio.NewReader(strings.NewReader(metric + "\n"))
go self.handleMessage(reader)
}
}

func (self *Server) Close() {
if self.udpConn != nil {
log.Info("GraphiteService: Closing graphite UDP listener")
self.udpConn.Close()
}
if self.conn != nil {
log.Info("GraphiteServer: Closing graphite server")
self.conn.Close()
Expand Down Expand Up @@ -119,8 +152,7 @@ func (self *Server) handleClient(conn net.Conn) {
defer conn.Close()
reader := bufio.NewReader(conn)
for {
graphiteMetric := &GraphiteMetric{}
err := graphiteMetric.Read(reader)
err := self.handleMessage(reader)
if err != nil {
if io.EOF == err {
log.Debug("Client closed graphite connection")
Expand All @@ -129,26 +161,35 @@ func (self *Server) handleClient(conn net.Conn) {
log.Error(err)
return
}
values := []*protocol.FieldValue{}
if graphiteMetric.isInt {
values = append(values, &protocol.FieldValue{Int64Value: &graphiteMetric.integerValue})
} else {
values = append(values, &protocol.FieldValue{DoubleValue: &graphiteMetric.floatValue})
}
sn := uint64(1) // use same SN makes sure that we'll only keep the latest value for a given metric_id-timestamp pair
point := &protocol.Point{
Timestamp: &graphiteMetric.timestamp,
Values: values,
SequenceNumber: &sn,
}
series := &protocol.Series{
Name: &graphiteMetric.name,
Fields: []string{"value"},
Points: []*protocol.Point{point},
}
// little inefficient for now, later we might want to add multiple series in 1 writePoints request
if err := self.writePoints(series); err != nil {
log.Error("Error in graphite plugin: %s", err)
}
}
}

func (self *Server) handleMessage(reader *bufio.Reader) error {
graphiteMetric := &GraphiteMetric{}
err := graphiteMetric.Read(reader)
if err != nil {
return err
}
values := []*protocol.FieldValue{}
if graphiteMetric.isInt {
values = append(values, &protocol.FieldValue{Int64Value: &graphiteMetric.integerValue})
} else {
values = append(values, &protocol.FieldValue{DoubleValue: &graphiteMetric.floatValue})
}
sn := uint64(1) // use same SN makes sure that we'll only keep the latest value for a given metric_id-timestamp pair
point := &protocol.Point{
Timestamp: &graphiteMetric.timestamp,
Values: values,
SequenceNumber: &sn,
}
series := &protocol.Series{
Name: &graphiteMetric.name,
Fields: []string{"value"},
Points: []*protocol.Point{point},
}
// little inefficient for now, later we might want to add multiple series in 1 writePoints request
if err := self.writePoints(series); err != nil {
log.Error("Error in graphite plugin: %s", err)
}
return nil
}
21 changes: 12 additions & 9 deletions src/configuration/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,10 @@ type ApiConfig struct {
}

type GraphiteConfig struct {
Enabled bool
Port int
Database string
Enabled bool
Port int
Database string
UdpEnabled bool `toml:"udp_enabled"`
}
type UdpInputConfig struct {
Enabled bool
Expand Down Expand Up @@ -203,9 +204,10 @@ type Configuration struct {
ApiHttpPort int
ApiReadTimeout time.Duration

GraphiteEnabled bool
GraphitePort int
GraphiteDatabase string
GraphiteEnabled bool
GraphitePort int
GraphiteDatabase string
GraphiteUdpEnabled bool

UdpInputEnabled bool
UdpInputPort int
Expand Down Expand Up @@ -316,9 +318,10 @@ func parseTomlConfiguration(filename string) (*Configuration, error) {
ApiHttpSslPort: tomlConfiguration.HttpApi.SslPort,
ApiReadTimeout: apiReadTimeout,

GraphiteEnabled: tomlConfiguration.InputPlugins.Graphite.Enabled,
GraphitePort: tomlConfiguration.InputPlugins.Graphite.Port,
GraphiteDatabase: tomlConfiguration.InputPlugins.Graphite.Database,
GraphiteEnabled: tomlConfiguration.InputPlugins.Graphite.Enabled,
GraphitePort: tomlConfiguration.InputPlugins.Graphite.Port,
GraphiteDatabase: tomlConfiguration.InputPlugins.Graphite.Database,
GraphiteUdpEnabled: tomlConfiguration.InputPlugins.Graphite.UdpEnabled,

UdpInputEnabled: tomlConfiguration.InputPlugins.UdpInput.Enabled,
UdpInputPort: tomlConfiguration.InputPlugins.UdpInput.Port,
Expand Down
26 changes: 26 additions & 0 deletions src/integration/multiple_servers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,32 @@ func (self *ServerSuite) TestGraphiteInterface(c *C) {
c.Assert(series.GetValueForPointAndColumn(1, "value", c), Equals, 100.0)
}

func (self *ServerSuite) TestGraphiteUdpInterface(c *C) {
conn, err := net.Dial("udp", "localhost:60513")
c.Assert(err, IsNil)

now := time.Now().UTC().Truncate(time.Minute)
data := fmt.Sprintf("some_udp_metric 100 %d\nsome_udp_metric 200.5 %d\n", now.Add(-time.Minute).Unix(), now.Unix())

_, err = conn.Write([]byte(data))
c.Assert(err, IsNil)
conn.Close()

// there's no easy way to check whether the server started
// processing this request, unlike http requests which must return a
// status code
time.Sleep(time.Second)

self.serverProcesses[0].WaitForServerToSync()

collection := self.serverProcesses[0].QueryWithUsername("graphite_db", "select * from some_udp_metric", false, c, "root", "root")
c.Assert(collection.Members, HasLen, 1)
series := collection.GetSeries("some_udp_metric", c)
c.Assert(series.Points, HasLen, 2)
c.Assert(series.GetValueForPointAndColumn(0, "value", c), Equals, 200.5)
c.Assert(series.GetValueForPointAndColumn(1, "value", c), Equals, 100.0)
}

func (self *ServerSuite) TestRestartAfterCompaction(c *C) {
data := `
[{
Expand Down
1 change: 1 addition & 0 deletions src/integration/test_config1.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ ssl-cert = "./cert.pem"
enabled = true
port = 60513
database = "graphite_db" # store graphite data in this database
udp_enabled = true

[input_plugins.udp]
enabled = true
Expand Down

0 comments on commit 6a74cfd

Please sign in to comment.