Skip to content

Commit

Permalink
implement graphite listener
Browse files Browse the repository at this point in the history
  • Loading branch information
Dieterbe authored and jvshahid committed Mar 24, 2014
1 parent ced0078 commit 7e957ec
Show file tree
Hide file tree
Showing 5 changed files with 234 additions and 0 deletions.
6 changes: 6 additions & 0 deletions config.toml.sample
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ port = 8086 # binding is disabled if the port isn't set
# ssl-port = 8084 # Ssl support is enabled if you set a port and cert
# ssl-cert = /path/to/cert.pem

[graphite]
# optionally enable a graphite (carbon) compatible ingestion
enabled = false
port = 2003
database = "" # store graphite data in this database

# Raft configuration
[raft]
# The raft port should be open between all servers in a cluster.
Expand Down
145 changes: 145 additions & 0 deletions src/api/graphite/api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
// package Graphite provides a tcp listener that you can use to ingest metrics into influxdb
// via the graphite protocol.
// it behaves as a carbon daemon, except:

// no rounding of timestamps to the nearest interval. Upon ingestion
// of multiple datapoints for a given key within the same interval
// (possibly but not necessarily the same timestamp), graphite would
// use one (the latest received) value with a rounded timestamp
// representing that interval. We store values for every timestamp we
// receive (only the latest value for a given metric-timestamp pair)
// so it's up to the user to feed the data in proper intervals (and
// use round intervals if you plan to rely on that)
package graphite

import (
"bufio"
"cluster"
log "code.google.com/p/log4go"
. "common"
"configuration"
"coordinator"
"net"
"protocol"
"time"
)

type Server struct {
listenAddress string
database string
coordinator coordinator.Coordinator
clusterConfig *cluster.ClusterConfiguration
conn net.Listener
user *cluster.ClusterAdmin
shutdown chan bool
}

// TODO: check that database exists and create it if not
func NewServer(config *configuration.Configuration, coord coordinator.Coordinator, clusterConfig *cluster.ClusterConfiguration) *Server {
self := &Server{}
self.listenAddress = config.GraphitePortString()
self.database = config.GraphiteDatabase
self.coordinator = coord
self.shutdown = make(chan bool, 1)
self.clusterConfig = clusterConfig
return self
}

// getAuth assures that the user property is a user with access to the graphite database
// only call this function after everything (i.e. Raft) is initialized, so that there's at least 1 admin user
func (self *Server) getAuth() {
// just use any (the first) of the list of admins.
names := self.clusterConfig.GetClusterAdmins()
self.user = self.clusterConfig.GetClusterAdmin(names[0])
}

func (self *Server) ListenAndServe() {
self.getAuth()
var err error
if self.listenAddress != "" {
self.conn, err = net.Listen("tcp", self.listenAddress)
if err != nil {
log.Error("GraphiteServer: Listen: ", err)
return
}
}
self.Serve(self.conn)
}

func (self *Server) Serve(listener net.Listener) {
// not really sure of the use of this shutdown channel,
// as all handling is done through goroutines. maybe we should use a waitgroup
defer func() { self.shutdown <- true }()

for {
conn_in, err := listener.Accept()
if err != nil {
log.Error("GraphiteServer: Accept: ", err)
continue
}
go self.handleClient(conn_in)
}
}

func (self *Server) Close() {
if self.conn != nil {
log.Info("GraphiteServer: Closing graphite server")
self.conn.Close()
log.Info("GraphiteServer: Waiting for all graphite requests to finish before killing the process")
select {
case <-time.After(time.Second * 5):
log.Error("GraphiteServer: There seems to be a hanging graphite request. Closing anyway")
case <-self.shutdown:
}
}
}

func (self *Server) writePoints(series *protocol.Series) error {
err := self.coordinator.WriteSeriesData(self.user, self.database, series)
if err != nil {
switch err.(type) {
case AuthorizationError:
// user information got stale, get a fresh one (this should happen rarely)
self.getAuth()
err = self.coordinator.WriteSeriesData(self.user, self.database, series)
if err != nil {
log.Warn("GraphiteServer: failed to write series after getting new auth: %s\n", err.Error())
}
default:
log.Warn("GraphiteServer: failed write series: %s\n", err.Error())
}
}
return err
}

func (self *Server) handleClient(conn net.Conn) {
defer conn.Close()
reader := bufio.NewReader(conn)
for {
graphiteMetric := &GraphiteMetric{}
err := graphiteMetric.Read(reader)
if err != nil {
log.Error(err)
return
}
values := []*protocol.FieldValue{}
if graphiteMetric.isInt {
values = append(values, &protocol.FieldValue{Int64Value: &graphiteMetric.integerValue})
} else {
values = append(values, &protocol.FieldValue{DoubleValue: &graphiteMetric.floatValue})
}
sn := uint64(1) // use same SN makes sure that we'll only keep the latest value for a given metric_id-timestamp pair
point := &protocol.Point{
Timestamp: &graphiteMetric.timestamp,
Values: values,
SequenceNumber: &sn,
}
series := &protocol.Series{
Name: &graphiteMetric.name,
Fields: []string{"value"},
Points: []*protocol.Point{point},
}
// little inefficient for now, later we might want to add multiple series in 1 writePoints request
self.writePoints(series)
}
}
50 changes: 50 additions & 0 deletions src/api/graphite/graphite_metric.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package graphite

import (
"bufio"
"fmt"
"io"
"strconv"
"strings"
)

type GraphiteMetric struct {
name string
isInt bool
integerValue int64
floatValue float64
timestamp int64
}

func (self *GraphiteMetric) Read(reader *bufio.Reader) error {
buf, err := reader.ReadBytes('\n')
str := strings.TrimSpace(string(buf))
if err != nil {
if err != io.EOF {
return fmt.Errorf("GraphiteServer: connection closed uncleanly/broken: %s\n", err.Error())
}
if len(str) > 0 {
return fmt.Errorf("GraphiteServer: incomplete read, line read: '%s'. neglecting line because connection closed because of %s\n", str, err.Error())
}
return err
}
elements := strings.Split(str, " ")
if len(elements) != 3 {
return fmt.Errorf("Received '%s' which doesn't have three fields", str)
}
self.name = elements[0]
self.floatValue, err = strconv.ParseFloat(elements[1], 64)
if err != nil {
return err
}
if i := int64(self.floatValue); float64(i) == self.floatValue {
self.isInt = true
self.integerValue = int64(self.floatValue)
}
timestamp, err := strconv.ParseUint(elements[2], 10, 32)
if err != nil {
return err
}
self.timestamp = int64(timestamp * 1000000)
return nil
}
21 changes: 21 additions & 0 deletions src/configuration/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ type ApiConfig struct {
Port int
}

type GraphiteConfig struct {
Enabled bool
Port int
Database string
}

type RaftConfig struct {
Port int
Dir string
Expand Down Expand Up @@ -157,6 +163,7 @@ type WalConfig struct {
type TomlConfiguration struct {
Admin AdminConfig
Api ApiConfig
Graphite GraphiteConfig
Raft RaftConfig
Storage StorageConfig
Cluster ClusterConfig
Expand All @@ -174,6 +181,9 @@ type Configuration struct {
ApiHttpSslPort int
ApiHttpCertPath string
ApiHttpPort int
GraphiteEnabled bool
GraphitePort int
GraphiteDatabase string
RaftServerPort int
SeedServers []string
DataDir string
Expand Down Expand Up @@ -249,6 +259,9 @@ func parseTomlConfiguration(filename string) (*Configuration, error) {
ApiHttpPort: tomlConfiguration.Api.Port,
ApiHttpCertPath: tomlConfiguration.Api.SslCertPath,
ApiHttpSslPort: tomlConfiguration.Api.SslPort,
GraphiteEnabled: tomlConfiguration.Graphite.Enabled,
GraphitePort: tomlConfiguration.Graphite.Port,
GraphiteDatabase: tomlConfiguration.Graphite.Database,
RaftServerPort: tomlConfiguration.Raft.Port,
RaftDir: tomlConfiguration.Raft.Dir,
ProtobufPort: tomlConfiguration.Cluster.ProtobufPort,
Expand Down Expand Up @@ -339,6 +352,14 @@ func (self *Configuration) ApiHttpSslPortString() string {
return fmt.Sprintf("%s:%d", self.BindAddress, self.ApiHttpSslPort)
}

func (self *Configuration) GraphitePortString() string {
if self.GraphitePort <= 0 {
return ""
}

return fmt.Sprintf("%s:%d", self.BindAddress, self.GraphitePort)
}

func (self *Configuration) ProtobufPortString() string {
return fmt.Sprintf("%s:%d", self.BindAddress, self.ProtobufPort)
}
Expand Down
12 changes: 12 additions & 0 deletions src/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package server

import (
"admin"
"api/graphite"
"api/http"
"cluster"
log "code.google.com/p/log4go"
Expand All @@ -20,6 +21,7 @@ type Server struct {
ProtobufServer *coordinator.ProtobufServer
ClusterConfig *cluster.ClusterConfiguration
HttpApi *http.HttpServer
GraphiteApi *graphite.Server
AdminServer *admin.HttpServer
Coordinator coordinator.Coordinator
Config *configuration.Configuration
Expand Down Expand Up @@ -57,13 +59,15 @@ func NewServer(config *configuration.Configuration) (*Server, error) {
raftServer.AssignCoordinator(coord)
httpApi := http.NewHttpServer(config.ApiHttpPortString(), config.AdminAssetsDir, coord, coord, clusterConfig, raftServer)
httpApi.EnableSsl(config.ApiHttpSslPortString(), config.ApiHttpCertPath)
graphiteApi := graphite.NewServer(config, coord, clusterConfig)
adminServer := admin.NewHttpServer(config.AdminAssetsDir, config.AdminHttpPortString())

return &Server{
RaftServer: raftServer,
ProtobufServer: protobufServer,
ClusterConfig: clusterConfig,
HttpApi: httpApi,
GraphiteApi: graphiteApi,
Coordinator: coord,
AdminServer: adminServer,
Config: config,
Expand Down Expand Up @@ -100,6 +104,14 @@ func (self *Server) ListenAndServe() error {
go self.ListenForSignals()
log.Info("Starting admin interface on port %d", self.Config.AdminHttpPort)
go self.AdminServer.ListenAndServe()
if self.Config.GraphiteEnabled {
if self.Config.GraphitePort <= 0 || self.Config.GraphiteDatabase == "" {
log.Warn("Cannot start graphite server. please check your configuration")
} else {
log.Info("Starting Graphite Listener on port %d", self.Config.GraphitePort)
go self.GraphiteApi.ListenAndServe()
}
}
log.Info("Starting Http Api server on port %d", self.Config.ApiHttpPort)
self.HttpApi.ListenAndServe()
return nil
Expand Down

0 comments on commit 7e957ec

Please sign in to comment.