-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
first stab at a graphite listener #293
Closed
Closed
Changes from all commits
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,171 @@ | ||
// package Graphite provides a tcp listener that you can use to ingest metrics into influxdb | ||
// via the graphite protocol. | ||
// it behaves as a carbon daemon, except: | ||
// no rounding of timestamps to the nearest interval. Upon ingestion of multiple datapoints | ||
// for a given key within the same interval (possibly but not necessarily the same timestamp), | ||
// graphite would use one (the latest received) value | ||
// with a rounded timestamp representing that interval. | ||
// We store values for every timestamp we receive (only the latest value for a given metric-timestamp pair) | ||
// so it's up to the user to feed the data in proper intervals | ||
// (and use round intervals if you plan to rely on that) | ||
package graphite | ||
|
||
import ( | ||
"bufio" | ||
"cluster" | ||
log "code.google.com/p/log4go" | ||
. "common" | ||
"coordinator" | ||
"io" | ||
"net" | ||
"protocol" | ||
"strconv" | ||
"strings" | ||
"time" | ||
) | ||
|
||
type Server struct { | ||
listen_addr string | ||
database string | ||
coordinator coordinator.Coordinator | ||
clusterConfig *cluster.ClusterConfiguration | ||
conn net.Listener | ||
user *cluster.ClusterAdmin | ||
shutdown chan bool | ||
} | ||
|
||
type GraphiteListener interface { | ||
Close() | ||
getAuth() | ||
ListenAndServe() | ||
writePoints(protocol.Series) error | ||
} | ||
|
||
// TODO: check that database exists and create it if not | ||
func NewServer(listen_addr, database string, coord coordinator.Coordinator, clusterConfig *cluster.ClusterConfiguration) *Server { | ||
self := &Server{} | ||
self.listen_addr = listen_addr | ||
self.database = database | ||
self.coordinator = coord | ||
self.shutdown = make(chan bool, 1) | ||
self.clusterConfig = clusterConfig | ||
return self | ||
} | ||
|
||
// getAuth assures that the user property is a user with access to the graphite database | ||
// only call this function after everything (i.e. Raft) is initialized, so that there's at least 1 admin user | ||
func (self *Server) getAuth() { | ||
// just use any (the first) of the list of admins. | ||
names := self.clusterConfig.GetClusterAdmins() | ||
self.user = self.clusterConfig.GetClusterAdmin(names[0]) | ||
} | ||
|
||
func (self *Server) ListenAndServe() { | ||
self.getAuth() | ||
var err error | ||
if self.listen_addr != "" { | ||
self.conn, err = net.Listen("tcp", self.listen_addr) | ||
if err != nil { | ||
log.Error("GraphiteServer: Listen: ", err) | ||
return | ||
} | ||
} | ||
self.Serve(self.conn) | ||
} | ||
|
||
func (self *Server) Serve(listener net.Listener) { | ||
// not really sure of the use of this shutdown channel, | ||
// as all handling is done through goroutines. maybe we should use a waitgroup | ||
defer func() { self.shutdown <- true }() | ||
|
||
for { | ||
conn_in, err := listener.Accept() | ||
if err != nil { | ||
log.Error("GraphiteServer: Accept: ", err) | ||
continue | ||
} | ||
go self.handleClient(conn_in) | ||
} | ||
} | ||
|
||
func (self *Server) Close() { | ||
if self.conn != nil { | ||
log.Info("GraphiteServer: Closing graphite server") | ||
self.conn.Close() | ||
log.Info("GraphiteServer: Waiting for all graphite requests to finish before killing the process") | ||
select { | ||
case <-time.After(time.Second * 5): | ||
log.Error("GraphiteServer: There seems to be a hanging graphite request. Closing anyway") | ||
case <-self.shutdown: | ||
} | ||
} | ||
} | ||
|
||
func (self *Server) writePoints(series *protocol.Series) error { | ||
err := self.coordinator.WriteSeriesData(self.user, self.database, series) | ||
if err != nil { | ||
switch err.(type) { | ||
case AuthorizationError: | ||
// user information got stale, get a fresh one (this should happen rarely) | ||
self.getAuth() | ||
err = self.coordinator.WriteSeriesData(self.user, self.database, series) | ||
if err != nil { | ||
log.Warn("GraphiteServer: failed to write series after getting new auth: %s\n", err.Error()) | ||
} | ||
default: | ||
log.Warn("GraphiteServer: failed write series: %s\n", err.Error()) | ||
} | ||
} | ||
return err | ||
} | ||
|
||
func (self *Server) handleClient(conn_in net.Conn) { | ||
defer conn_in.Close() | ||
reader := bufio.NewReader(conn_in) | ||
for { | ||
buf, err := reader.ReadBytes('\n') | ||
if err != nil { | ||
str := strings.TrimSpace(string(buf)) | ||
if err != io.EOF { | ||
log.Warn("GraphiteServer: connection closed uncleanly/broken: %s\n", err.Error()) | ||
} | ||
if len(str) > 0 { | ||
log.Warn("GraphiteServer: incomplete read, line read: '%s'. neglecting line because connection closed because of %s\n", str, err.Error()) | ||
} | ||
return | ||
} | ||
str := strings.TrimSpace(string(buf)) | ||
elements := strings.Split(str, " ") | ||
if len(elements) != 3 { | ||
continue // invalid line | ||
} | ||
val, err := strconv.ParseFloat(elements[1], 64) | ||
if err != nil { | ||
continue // invalid line | ||
} | ||
timestamp, err := strconv.ParseUint(elements[2], 10, 32) | ||
if err != nil { | ||
continue // invalid line | ||
} | ||
values := []*protocol.FieldValue{} | ||
if i := int64(val); float64(i) == val { | ||
values = append(values, &protocol.FieldValue{Int64Value: &i}) | ||
} else { | ||
values = append(values, &protocol.FieldValue{DoubleValue: &val}) | ||
} | ||
ts := int64(timestamp * 1000000) | ||
sn := uint64(1) // use same SN makes sure that we'll only keep the latest value for a given metric_id-timestamp pair | ||
point := &protocol.Point{ | ||
Timestamp: &ts, | ||
Values: values, | ||
SequenceNumber: &sn, | ||
} | ||
series := &protocol.Series{ | ||
Name: &elements[0], | ||
Fields: []string{"value"}, | ||
Points: []*protocol.Point{point}, | ||
} | ||
// little inefficient for now, later we might want to add multiple series in 1 writePoints request | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agreed. #310 will add that functionality to requests, which will make it possible to update the interface on the coordinator. Will definitely make things more efficient. |
||
self.writePoints(series) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the shutdown here doesn't do much; or at least, not what you hope.
It works in the HTTP API because the underlying
net/http
module'sServe()
method has some intelligent handling ofAccept()
failures: http://golang.org/src/pkg/net/http/server.go?s=45477:45526#L1618In short it determines if it's a temporary or permanent failure, and it aborts the accept loop on permanent failures. That allows the
Serve()
method to exit, which ends up triggering thedefer
to send the true on the shutdown channel.I haven't fully thought through the implications of shutdown in InfluxDB (just started looking at the code today) so I don't know what the right solution in API modules like this is.