From 7e957ec6807f5d7dbf187095d9dcee09af466eb2 Mon Sep 17 00:00:00 2001 From: Dieter Plaetinck Date: Mon, 3 Mar 2014 20:52:57 -0500 Subject: [PATCH] implement graphite listener --- config.toml.sample | 6 ++ src/api/graphite/api.go | 145 ++++++++++++++++++++++++++++ src/api/graphite/graphite_metric.go | 50 ++++++++++ src/configuration/configuration.go | 21 ++++ src/server/server.go | 12 +++ 5 files changed, 234 insertions(+) create mode 100644 src/api/graphite/api.go create mode 100644 src/api/graphite/graphite_metric.go diff --git a/config.toml.sample b/config.toml.sample index b757939cc30..5d16aaa4e6b 100644 --- a/config.toml.sample +++ b/config.toml.sample @@ -23,6 +23,12 @@ port = 8086 # binding is disabled if the port isn't set # ssl-port = 8084 # Ssl support is enabled if you set a port and cert # ssl-cert = /path/to/cert.pem +[graphite] +# optionally enable a graphite (carbon) compatible ingestion +enabled = false +port = 2003 +database = "" # store graphite data in this database + # Raft configuration [raft] # The raft port should be open between all servers in a cluster. diff --git a/src/api/graphite/api.go b/src/api/graphite/api.go new file mode 100644 index 00000000000..e3ff416a69e --- /dev/null +++ b/src/api/graphite/api.go @@ -0,0 +1,145 @@ +// package Graphite provides a tcp listener that you can use to ingest metrics into influxdb +// via the graphite protocol. +// it behaves as a carbon daemon, except: + +// no rounding of timestamps to the nearest interval. Upon ingestion +// of multiple datapoints for a given key within the same interval +// (possibly but not necessarily the same timestamp), graphite would +// use one (the latest received) value with a rounded timestamp +// representing that interval. We store values for every timestamp we +// receive (only the latest value for a given metric-timestamp pair) +// so it's up to the user to feed the data in proper intervals (and +// use round intervals if you plan to rely on that) +package graphite + +import ( + "bufio" + "cluster" + log "code.google.com/p/log4go" + . "common" + "configuration" + "coordinator" + "net" + "protocol" + "time" +) + +type Server struct { + listenAddress string + database string + coordinator coordinator.Coordinator + clusterConfig *cluster.ClusterConfiguration + conn net.Listener + user *cluster.ClusterAdmin + shutdown chan bool +} + +// TODO: check that database exists and create it if not +func NewServer(config *configuration.Configuration, coord coordinator.Coordinator, clusterConfig *cluster.ClusterConfiguration) *Server { + self := &Server{} + self.listenAddress = config.GraphitePortString() + self.database = config.GraphiteDatabase + self.coordinator = coord + self.shutdown = make(chan bool, 1) + self.clusterConfig = clusterConfig + return self +} + +// getAuth assures that the user property is a user with access to the graphite database +// only call this function after everything (i.e. Raft) is initialized, so that there's at least 1 admin user +func (self *Server) getAuth() { + // just use any (the first) of the list of admins. + names := self.clusterConfig.GetClusterAdmins() + self.user = self.clusterConfig.GetClusterAdmin(names[0]) +} + +func (self *Server) ListenAndServe() { + self.getAuth() + var err error + if self.listenAddress != "" { + self.conn, err = net.Listen("tcp", self.listenAddress) + if err != nil { + log.Error("GraphiteServer: Listen: ", err) + return + } + } + self.Serve(self.conn) +} + +func (self *Server) Serve(listener net.Listener) { + // not really sure of the use of this shutdown channel, + // as all handling is done through goroutines. maybe we should use a waitgroup + defer func() { self.shutdown <- true }() + + for { + conn_in, err := listener.Accept() + if err != nil { + log.Error("GraphiteServer: Accept: ", err) + continue + } + go self.handleClient(conn_in) + } +} + +func (self *Server) Close() { + if self.conn != nil { + log.Info("GraphiteServer: Closing graphite server") + self.conn.Close() + log.Info("GraphiteServer: Waiting for all graphite requests to finish before killing the process") + select { + case <-time.After(time.Second * 5): + log.Error("GraphiteServer: There seems to be a hanging graphite request. Closing anyway") + case <-self.shutdown: + } + } +} + +func (self *Server) writePoints(series *protocol.Series) error { + err := self.coordinator.WriteSeriesData(self.user, self.database, series) + if err != nil { + switch err.(type) { + case AuthorizationError: + // user information got stale, get a fresh one (this should happen rarely) + self.getAuth() + err = self.coordinator.WriteSeriesData(self.user, self.database, series) + if err != nil { + log.Warn("GraphiteServer: failed to write series after getting new auth: %s\n", err.Error()) + } + default: + log.Warn("GraphiteServer: failed write series: %s\n", err.Error()) + } + } + return err +} + +func (self *Server) handleClient(conn net.Conn) { + defer conn.Close() + reader := bufio.NewReader(conn) + for { + graphiteMetric := &GraphiteMetric{} + err := graphiteMetric.Read(reader) + if err != nil { + log.Error(err) + return + } + values := []*protocol.FieldValue{} + if graphiteMetric.isInt { + values = append(values, &protocol.FieldValue{Int64Value: &graphiteMetric.integerValue}) + } else { + values = append(values, &protocol.FieldValue{DoubleValue: &graphiteMetric.floatValue}) + } + sn := uint64(1) // use same SN makes sure that we'll only keep the latest value for a given metric_id-timestamp pair + point := &protocol.Point{ + Timestamp: &graphiteMetric.timestamp, + Values: values, + SequenceNumber: &sn, + } + series := &protocol.Series{ + Name: &graphiteMetric.name, + Fields: []string{"value"}, + Points: []*protocol.Point{point}, + } + // little inefficient for now, later we might want to add multiple series in 1 writePoints request + self.writePoints(series) + } +} diff --git a/src/api/graphite/graphite_metric.go b/src/api/graphite/graphite_metric.go new file mode 100644 index 00000000000..805d1e37899 --- /dev/null +++ b/src/api/graphite/graphite_metric.go @@ -0,0 +1,50 @@ +package graphite + +import ( + "bufio" + "fmt" + "io" + "strconv" + "strings" +) + +type GraphiteMetric struct { + name string + isInt bool + integerValue int64 + floatValue float64 + timestamp int64 +} + +func (self *GraphiteMetric) Read(reader *bufio.Reader) error { + buf, err := reader.ReadBytes('\n') + str := strings.TrimSpace(string(buf)) + if err != nil { + if err != io.EOF { + return fmt.Errorf("GraphiteServer: connection closed uncleanly/broken: %s\n", err.Error()) + } + if len(str) > 0 { + return fmt.Errorf("GraphiteServer: incomplete read, line read: '%s'. neglecting line because connection closed because of %s\n", str, err.Error()) + } + return err + } + elements := strings.Split(str, " ") + if len(elements) != 3 { + return fmt.Errorf("Received '%s' which doesn't have three fields", str) + } + self.name = elements[0] + self.floatValue, err = strconv.ParseFloat(elements[1], 64) + if err != nil { + return err + } + if i := int64(self.floatValue); float64(i) == self.floatValue { + self.isInt = true + self.integerValue = int64(self.floatValue) + } + timestamp, err := strconv.ParseUint(elements[2], 10, 32) + if err != nil { + return err + } + self.timestamp = int64(timestamp * 1000000) + return nil +} diff --git a/src/configuration/configuration.go b/src/configuration/configuration.go index 4181bf4ba12..5ed9b60bb6a 100644 --- a/src/configuration/configuration.go +++ b/src/configuration/configuration.go @@ -62,6 +62,12 @@ type ApiConfig struct { Port int } +type GraphiteConfig struct { + Enabled bool + Port int + Database string +} + type RaftConfig struct { Port int Dir string @@ -157,6 +163,7 @@ type WalConfig struct { type TomlConfiguration struct { Admin AdminConfig Api ApiConfig + Graphite GraphiteConfig Raft RaftConfig Storage StorageConfig Cluster ClusterConfig @@ -174,6 +181,9 @@ type Configuration struct { ApiHttpSslPort int ApiHttpCertPath string ApiHttpPort int + GraphiteEnabled bool + GraphitePort int + GraphiteDatabase string RaftServerPort int SeedServers []string DataDir string @@ -249,6 +259,9 @@ func parseTomlConfiguration(filename string) (*Configuration, error) { ApiHttpPort: tomlConfiguration.Api.Port, ApiHttpCertPath: tomlConfiguration.Api.SslCertPath, ApiHttpSslPort: tomlConfiguration.Api.SslPort, + GraphiteEnabled: tomlConfiguration.Graphite.Enabled, + GraphitePort: tomlConfiguration.Graphite.Port, + GraphiteDatabase: tomlConfiguration.Graphite.Database, RaftServerPort: tomlConfiguration.Raft.Port, RaftDir: tomlConfiguration.Raft.Dir, ProtobufPort: tomlConfiguration.Cluster.ProtobufPort, @@ -339,6 +352,14 @@ func (self *Configuration) ApiHttpSslPortString() string { return fmt.Sprintf("%s:%d", self.BindAddress, self.ApiHttpSslPort) } +func (self *Configuration) GraphitePortString() string { + if self.GraphitePort <= 0 { + return "" + } + + return fmt.Sprintf("%s:%d", self.BindAddress, self.GraphitePort) +} + func (self *Configuration) ProtobufPortString() string { return fmt.Sprintf("%s:%d", self.BindAddress, self.ProtobufPort) } diff --git a/src/server/server.go b/src/server/server.go index f761a855236..0ea7dd4b078 100644 --- a/src/server/server.go +++ b/src/server/server.go @@ -2,6 +2,7 @@ package server import ( "admin" + "api/graphite" "api/http" "cluster" log "code.google.com/p/log4go" @@ -20,6 +21,7 @@ type Server struct { ProtobufServer *coordinator.ProtobufServer ClusterConfig *cluster.ClusterConfiguration HttpApi *http.HttpServer + GraphiteApi *graphite.Server AdminServer *admin.HttpServer Coordinator coordinator.Coordinator Config *configuration.Configuration @@ -57,6 +59,7 @@ func NewServer(config *configuration.Configuration) (*Server, error) { raftServer.AssignCoordinator(coord) httpApi := http.NewHttpServer(config.ApiHttpPortString(), config.AdminAssetsDir, coord, coord, clusterConfig, raftServer) httpApi.EnableSsl(config.ApiHttpSslPortString(), config.ApiHttpCertPath) + graphiteApi := graphite.NewServer(config, coord, clusterConfig) adminServer := admin.NewHttpServer(config.AdminAssetsDir, config.AdminHttpPortString()) return &Server{ @@ -64,6 +67,7 @@ func NewServer(config *configuration.Configuration) (*Server, error) { ProtobufServer: protobufServer, ClusterConfig: clusterConfig, HttpApi: httpApi, + GraphiteApi: graphiteApi, Coordinator: coord, AdminServer: adminServer, Config: config, @@ -100,6 +104,14 @@ func (self *Server) ListenAndServe() error { go self.ListenForSignals() log.Info("Starting admin interface on port %d", self.Config.AdminHttpPort) go self.AdminServer.ListenAndServe() + if self.Config.GraphiteEnabled { + if self.Config.GraphitePort <= 0 || self.Config.GraphiteDatabase == "" { + log.Warn("Cannot start graphite server. please check your configuration") + } else { + log.Info("Starting Graphite Listener on port %d", self.Config.GraphitePort) + go self.GraphiteApi.ListenAndServe() + } + } log.Info("Starting Http Api server on port %d", self.Config.ApiHttpPort) self.HttpApi.ListenAndServe() return nil