Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

first stab at a graphite listener #293

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions config.toml.sample
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ port = 8086 # binding is disabled if the port isn't set
# ssl-port = 8084 # Ssl support is enabled if you set a port and cert
# ssl-cert = /path/to/cert.pem

[graphite]
# optionally enable a graphite (carbon) compatible ingestion
enabled = false
port = 2003
database = "" # store graphite data in this database

# Raft configuration
[raft]
# The raft port should be open between all servers in a cluster.
Expand Down
171 changes: 171 additions & 0 deletions src/api/graphite/api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
// package Graphite provides a tcp listener that you can use to ingest metrics into influxdb
// via the graphite protocol.
// it behaves as a carbon daemon, except:
// no rounding of timestamps to the nearest interval. Upon ingestion of multiple datapoints
// for a given key within the same interval (possibly but not necessarily the same timestamp),
// graphite would use one (the latest received) value
// with a rounded timestamp representing that interval.
// We store values for every timestamp we receive (only the latest value for a given metric-timestamp pair)
// so it's up to the user to feed the data in proper intervals
// (and use round intervals if you plan to rely on that)
package graphite

import (
"bufio"
"cluster"
log "code.google.com/p/log4go"
. "common"
"coordinator"
"io"
"net"
"protocol"
"strconv"
"strings"
"time"
)

type Server struct {
listen_addr string
database string
coordinator coordinator.Coordinator
clusterConfig *cluster.ClusterConfiguration
conn net.Listener
user *cluster.ClusterAdmin
shutdown chan bool
}

type GraphiteListener interface {
Close()
getAuth()
ListenAndServe()
writePoints(protocol.Series) error
}

// TODO: check that database exists and create it if not
func NewServer(listen_addr, database string, coord coordinator.Coordinator, clusterConfig *cluster.ClusterConfiguration) *Server {
self := &Server{}
self.listen_addr = listen_addr
self.database = database
self.coordinator = coord
self.shutdown = make(chan bool, 1)
self.clusterConfig = clusterConfig
return self
}

// getAuth assures that the user property is a user with access to the graphite database
// only call this function after everything (i.e. Raft) is initialized, so that there's at least 1 admin user
func (self *Server) getAuth() {
// just use any (the first) of the list of admins.
names := self.clusterConfig.GetClusterAdmins()
self.user = self.clusterConfig.GetClusterAdmin(names[0])
}

func (self *Server) ListenAndServe() {
self.getAuth()
var err error
if self.listen_addr != "" {
self.conn, err = net.Listen("tcp", self.listen_addr)
if err != nil {
log.Error("GraphiteServer: Listen: ", err)
return
}
}
self.Serve(self.conn)
}

func (self *Server) Serve(listener net.Listener) {
// not really sure of the use of this shutdown channel,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the shutdown here doesn't do much; or at least, not what you hope.

It works in the HTTP API because the underlying net/http module's Serve() method has some intelligent handling of Accept() failures: http://golang.org/src/pkg/net/http/server.go?s=45477:45526#L1618

In short it determines if it's a temporary or permanent failure, and it aborts the accept loop on permanent failures. That allows the Serve() method to exit, which ends up triggering the defer to send the true on the shutdown channel.

I haven't fully thought through the implications of shutdown in InfluxDB (just started looking at the code today) so I don't know what the right solution in API modules like this is.

// as all handling is done through goroutines. maybe we should use a waitgroup
defer func() { self.shutdown <- true }()

for {
conn_in, err := listener.Accept()
if err != nil {
log.Error("GraphiteServer: Accept: ", err)
continue
}
go self.handleClient(conn_in)
}
}

func (self *Server) Close() {
if self.conn != nil {
log.Info("GraphiteServer: Closing graphite server")
self.conn.Close()
log.Info("GraphiteServer: Waiting for all graphite requests to finish before killing the process")
select {
case <-time.After(time.Second * 5):
log.Error("GraphiteServer: There seems to be a hanging graphite request. Closing anyway")
case <-self.shutdown:
}
}
}

func (self *Server) writePoints(series *protocol.Series) error {
err := self.coordinator.WriteSeriesData(self.user, self.database, series)
if err != nil {
switch err.(type) {
case AuthorizationError:
// user information got stale, get a fresh one (this should happen rarely)
self.getAuth()
err = self.coordinator.WriteSeriesData(self.user, self.database, series)
if err != nil {
log.Warn("GraphiteServer: failed to write series after getting new auth: %s\n", err.Error())
}
default:
log.Warn("GraphiteServer: failed write series: %s\n", err.Error())
}
}
return err
}

func (self *Server) handleClient(conn_in net.Conn) {
defer conn_in.Close()
reader := bufio.NewReader(conn_in)
for {
buf, err := reader.ReadBytes('\n')
if err != nil {
str := strings.TrimSpace(string(buf))
if err != io.EOF {
log.Warn("GraphiteServer: connection closed uncleanly/broken: %s\n", err.Error())
}
if len(str) > 0 {
log.Warn("GraphiteServer: incomplete read, line read: '%s'. neglecting line because connection closed because of %s\n", str, err.Error())
}
return
}
str := strings.TrimSpace(string(buf))
elements := strings.Split(str, " ")
if len(elements) != 3 {
continue // invalid line
}
val, err := strconv.ParseFloat(elements[1], 64)
if err != nil {
continue // invalid line
}
timestamp, err := strconv.ParseUint(elements[2], 10, 32)
if err != nil {
continue // invalid line
}
values := []*protocol.FieldValue{}
if i := int64(val); float64(i) == val {
values = append(values, &protocol.FieldValue{Int64Value: &i})
} else {
values = append(values, &protocol.FieldValue{DoubleValue: &val})
}
ts := int64(timestamp * 1000000)
sn := uint64(1) // use same SN makes sure that we'll only keep the latest value for a given metric_id-timestamp pair
point := &protocol.Point{
Timestamp: &ts,
Values: values,
SequenceNumber: &sn,
}
series := &protocol.Series{
Name: &elements[0],
Fields: []string{"value"},
Points: []*protocol.Point{point},
}
// little inefficient for now, later we might want to add multiple series in 1 writePoints request
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. #310 will add that functionality to requests, which will make it possible to update the interface on the coordinator. Will definitely make things more efficient.

self.writePoints(series)
}
}
21 changes: 21 additions & 0 deletions src/configuration/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ type ApiConfig struct {
Port int
}

type GraphiteConfig struct {
Enabled bool
Port int
Database string
}

type RaftConfig struct {
Port int
Dir string
Expand Down Expand Up @@ -125,6 +131,7 @@ type WalConfig struct {
type TomlConfiguration struct {
Admin AdminConfig
Api ApiConfig
Graphite GraphiteConfig
Raft RaftConfig
Storage StorageConfig
Cluster ClusterConfig
Expand All @@ -142,6 +149,9 @@ type Configuration struct {
ApiHttpSslPort int
ApiHttpCertPath string
ApiHttpPort int
GraphiteEnabled bool
GraphitePort int
GraphiteDatabase string
RaftServerPort int
SeedServers []string
DataDir string
Expand Down Expand Up @@ -214,6 +224,9 @@ func parseTomlConfiguration(filename string) (*Configuration, error) {
ApiHttpPort: tomlConfiguration.Api.Port,
ApiHttpCertPath: tomlConfiguration.Api.SslCertPath,
ApiHttpSslPort: tomlConfiguration.Api.SslPort,
GraphiteEnabled: tomlConfiguration.Graphite.Enabled,
GraphitePort: tomlConfiguration.Graphite.Port,
GraphiteDatabase: tomlConfiguration.Graphite.Database,
RaftServerPort: tomlConfiguration.Raft.Port,
RaftDir: tomlConfiguration.Raft.Dir,
ProtobufPort: tomlConfiguration.Cluster.ProtobufPort,
Expand Down Expand Up @@ -292,6 +305,14 @@ func (self *Configuration) ApiHttpSslPortString() string {
return fmt.Sprintf("%s:%d", self.BindAddress, self.ApiHttpSslPort)
}

func (self *Configuration) GraphitePortString() string {
if self.GraphitePort <= 0 {
return ""
}

return fmt.Sprintf("%s:%d", self.BindAddress, self.GraphitePort)
}

func (self *Configuration) ProtobufPortString() string {
return fmt.Sprintf("%s:%d", self.BindAddress, self.ProtobufPort)
}
Expand Down
12 changes: 12 additions & 0 deletions src/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package server

import (
"admin"
"api/graphite"
"api/http"
"cluster"
log "code.google.com/p/log4go"
Expand All @@ -16,6 +17,7 @@ type Server struct {
ProtobufServer *coordinator.ProtobufServer
ClusterConfig *cluster.ClusterConfiguration
HttpApi *http.HttpServer
GraphiteApi *graphite.Server
AdminServer *admin.HttpServer
Coordinator coordinator.Coordinator
Config *configuration.Configuration
Expand Down Expand Up @@ -53,13 +55,15 @@ func NewServer(config *configuration.Configuration) (*Server, error) {
raftServer.AssignCoordinator(coord)
httpApi := http.NewHttpServer(config.ApiHttpPortString(), config.AdminAssetsDir, coord, coord, clusterConfig, raftServer)
httpApi.EnableSsl(config.ApiHttpSslPortString(), config.ApiHttpCertPath)
graphiteApi := graphite.NewServer(config.GraphitePortString(), config.GraphiteDatabase, coord, clusterConfig)
adminServer := admin.NewHttpServer(config.AdminAssetsDir, config.AdminHttpPortString())

return &Server{
RaftServer: raftServer,
ProtobufServer: protobufServer,
ClusterConfig: clusterConfig,
HttpApi: httpApi,
GraphiteApi: graphiteApi,
Coordinator: coord,
AdminServer: adminServer,
Config: config,
Expand Down Expand Up @@ -93,6 +97,14 @@ func (self *Server) ListenAndServe() error {
}
log.Info("Starting admin interface on port %d", self.Config.AdminHttpPort)
go self.AdminServer.ListenAndServe()
if self.Config.GraphiteEnabled {
if self.Config.GraphitePort <= 0 || self.Config.GraphiteDatabase == "" {
log.Warn("Cannot start graphite server. please check your configuration")
} else {
log.Info("Starting Graphite Listener on port %d", self.Config.GraphitePort)
go self.GraphiteApi.ListenAndServe()
}
}
log.Info("Starting Http Api server on port %d", self.Config.ApiHttpPort)
self.HttpApi.ListenAndServe()
return nil
Expand Down