diff --git a/cluster/service.go b/cluster/service.go index e74e7c30e74..5860aeb4cde 100644 --- a/cluster/service.go +++ b/cluster/service.go @@ -3,6 +3,7 @@ package cluster import ( "encoding/binary" "encoding/json" + "expvar" "fmt" "io" "log" @@ -11,6 +12,7 @@ import ( "strings" "sync" + "github.com/influxdb/influxdb" "github.com/influxdb/influxdb/influxql" "github.com/influxdb/influxdb/meta" "github.com/influxdb/influxdb/models" @@ -23,6 +25,15 @@ const MaxMessageSize = 1024 * 1024 * 1024 // 1GB // MuxHeader is the header byte used in the TCP mux. const MuxHeader = 2 +// Statistics maintained by the cluster package +const ( + writeShardReq = "write_shard_req" + writeShardPointsReq = "write_shard_points_req" + writeShardFail = "write_shard_fail" + mapShardReq = "map_shard_req" + mapShardResp = "map_shard_resp" +) + // Service processes data received over raw TCP connections. type Service struct { mu sync.RWMutex @@ -42,7 +53,8 @@ type Service struct { CreateMapper(shardID uint64, stmt influxql.Statement, chunkSize int) (tsdb.Mapper, error) } - Logger *log.Logger + Logger *log.Logger + statMap *expvar.Map } // NewService returns a new instance of Service. @@ -50,6 +62,7 @@ func NewService(c Config) *Service { return &Service{ closing: make(chan struct{}), Logger: log.New(os.Stderr, "[tcp] ", log.LstdFlags), + statMap: influxdb.NewStatistics("cluster", "cluster", nil), } } @@ -145,12 +158,14 @@ func (s *Service) handleConn(conn net.Conn) { // Delegate message processing by type. switch typ { case writeShardRequestMessage: + s.statMap.Add(writeShardReq, 1) err := s.processWriteShardRequest(buf) if err != nil { s.Logger.Printf("process write shard error: %s", err) } s.writeShardResponse(conn, err) case mapShardRequestMessage: + s.statMap.Add(mapShardReq, 1) err := s.processMapShardRequest(conn, buf) if err != nil { s.Logger.Printf("process map shard error: %s", err) @@ -171,6 +186,8 @@ func (s *Service) processWriteShardRequest(buf []byte) error { return err } + points := req.Points() + s.statMap.Add(writeShardPointsReq, int64(len(points))) err := s.TSDBStore.WriteToShard(req.ShardID(), req.Points()) // We may have received a write for a shard that we don't have locally because the @@ -198,6 +215,7 @@ func (s *Service) processWriteShardRequest(buf []byte) error { } if err != nil { + s.statMap.Add(writeShardFail, 1) return fmt.Errorf("write shard %d: %s", req.ShardID(), err) } @@ -286,6 +304,7 @@ func (s *Service) processMapShardRequest(w io.Writer, buf []byte) error { if err := writeMapShardResponseMessage(w, &resp); err != nil { return err } + s.statMap.Add(mapShardResp, 1) if chunk == nil { // All mapper data sent.