Skip to content

Commit

Permalink
Merge pull request #4198 from influxdb/cluster_stats2
Browse files Browse the repository at this point in the history
Add cluster-service stats
  • Loading branch information
otoolep committed Sep 22, 2015
2 parents 79e6e3e + 1084d73 commit f857818
Showing 1 changed file with 20 additions and 1 deletion.
21 changes: 20 additions & 1 deletion cluster/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cluster
import (
"encoding/binary"
"encoding/json"
"expvar"
"fmt"
"io"
"log"
Expand All @@ -11,6 +12,7 @@ import (
"strings"
"sync"

"github.com/influxdb/influxdb"
"github.com/influxdb/influxdb/influxql"
"github.com/influxdb/influxdb/meta"
"github.com/influxdb/influxdb/models"
Expand All @@ -23,6 +25,15 @@ const MaxMessageSize = 1024 * 1024 * 1024 // 1GB
// MuxHeader is the header byte used in the TCP mux.
const MuxHeader = 2

// Statistics maintained by the cluster package
const (
writeShardReq = "write_shard_req"
writeShardPointsReq = "write_shard_points_req"
writeShardFail = "write_shard_fail"
mapShardReq = "map_shard_req"
mapShardResp = "map_shard_resp"
)

// Service processes data received over raw TCP connections.
type Service struct {
mu sync.RWMutex
Expand All @@ -42,14 +53,16 @@ type Service struct {
CreateMapper(shardID uint64, stmt influxql.Statement, chunkSize int) (tsdb.Mapper, error)
}

Logger *log.Logger
Logger *log.Logger
statMap *expvar.Map
}

// NewService returns a new instance of Service.
func NewService(c Config) *Service {
return &Service{
closing: make(chan struct{}),
Logger: log.New(os.Stderr, "[tcp] ", log.LstdFlags),
statMap: influxdb.NewStatistics("cluster", "cluster", nil),
}
}

Expand Down Expand Up @@ -145,12 +158,14 @@ func (s *Service) handleConn(conn net.Conn) {
// Delegate message processing by type.
switch typ {
case writeShardRequestMessage:
s.statMap.Add(writeShardReq, 1)
err := s.processWriteShardRequest(buf)
if err != nil {
s.Logger.Printf("process write shard error: %s", err)
}
s.writeShardResponse(conn, err)
case mapShardRequestMessage:
s.statMap.Add(mapShardReq, 1)
err := s.processMapShardRequest(conn, buf)
if err != nil {
s.Logger.Printf("process map shard error: %s", err)
Expand All @@ -171,6 +186,8 @@ func (s *Service) processWriteShardRequest(buf []byte) error {
return err
}

points := req.Points()
s.statMap.Add(writeShardPointsReq, int64(len(points)))
err := s.TSDBStore.WriteToShard(req.ShardID(), req.Points())

// We may have received a write for a shard that we don't have locally because the
Expand Down Expand Up @@ -198,6 +215,7 @@ func (s *Service) processWriteShardRequest(buf []byte) error {
}

if err != nil {
s.statMap.Add(writeShardFail, 1)
return fmt.Errorf("write shard %d: %s", req.ShardID(), err)
}

Expand Down Expand Up @@ -286,6 +304,7 @@ func (s *Service) processMapShardRequest(w io.Writer, buf []byte) error {
if err := writeMapShardResponseMessage(w, &resp); err != nil {
return err
}
s.statMap.Add(mapShardResp, 1)

if chunk == nil {
// All mapper data sent.
Expand Down

0 comments on commit f857818

Please sign in to comment.