Skip to content

Commit

Permalink
Merge pull request #2858 from influxdb/config_opentsdb_consis
Browse files Browse the repository at this point in the history
Support setting openTSDB write consistency
  • Loading branch information
otoolep committed Jun 9, 2015
2 parents 4191e90 + de901c5 commit 4545abd
Show file tree
Hide file tree
Showing 7 changed files with 61 additions and 35 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
- [#2857](https://github.com/influxdb/influxdb/issues/2857) -- Fix parsing commas in string field values.
- [#2833](https://github.com/influxdb/influxdb/pull/2833) -- Make the default config valid.

### Features
- [2858](https://github.com/influxdb/influxdb/pull/2858) - Support setting openTSDB write consistency.

## v0.9.0-rc32 [2015-06-07]

### Release Notes
Expand Down
14 changes: 10 additions & 4 deletions cmd/influxd/run/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@ func NewServer(c *Config) (*Server, error) {
s.appendContinuousQueryService(c.ContinuousQuery)
s.appendHTTPDService(c.HTTPD)
s.appendCollectdService(c.Collectd)
s.appendOpenTSDBService(c.OpenTSDB)
if err := s.appendOpenTSDBService(c.OpenTSDB); err != nil {
return nil, err
}
s.appendUDPService(c.UDP)
s.appendRetentionPolicyService(c.Retention)
for _, g := range c.Graphites {
Expand Down Expand Up @@ -145,13 +147,17 @@ func (s *Server) appendCollectdService(c collectd.Config) {
s.Services = append(s.Services, srv)
}

func (s *Server) appendOpenTSDBService(c opentsdb.Config) {
func (s *Server) appendOpenTSDBService(c opentsdb.Config) error {
if !c.Enabled {
return
return nil
}
srv, err := opentsdb.NewService(c)
if err != nil {
return err
}
srv := opentsdb.NewService(c)
srv.PointsWriter = s.PointsWriter
s.Services = append(s.Services, srv)
return nil
}

func (s *Server) appendGraphiteService(c graphite.Config) error {
Expand Down
19 changes: 12 additions & 7 deletions services/opentsdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,24 @@ const (

// DefaultRetentionPolicy is the default retention policy used for writes.
DefaultRetentionPolicy = ""

// DefaultConsistencyLevel is the default write consistency level.
DefaultConsistencyLevel = "one"
)

type Config struct {
Enabled bool `toml:"enabled"`
BindAddress string `toml:"bind-address"`
Database string `toml:"database"`
RetentionPolicy string `toml:"retention-policy"`
Enabled bool `toml:"enabled"`
BindAddress string `toml:"bind-address"`
Database string `toml:"database"`
RetentionPolicy string `toml:"retention-policy"`
ConsistencyLevel string `toml:"consistency-level"`
}

func NewConfig() Config {
return Config{
BindAddress: DefaultBindAddress,
Database: DefaultDatabase,
RetentionPolicy: DefaultRetentionPolicy,
BindAddress: DefaultBindAddress,
Database: DefaultDatabase,
RetentionPolicy: DefaultRetentionPolicy,
ConsistencyLevel: DefaultConsistencyLevel,
}
}
3 changes: 3 additions & 0 deletions services/opentsdb/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ func TestConfig_Parse(t *testing.T) {
enabled = true
bind-address = ":9000"
database = "xxx"
consistency-level ="all"
`, &c); err != nil {
t.Fatal(err)
}
Expand All @@ -25,5 +26,7 @@ database = "xxx"
t.Fatalf("unexpected bind address: %s", c.BindAddress)
} else if c.Database != "xxx" {
t.Fatalf("unexpected database: %s", c.Database)
} else if c.ConsistencyLevel != "all" {
t.Fatalf("unexpected database: %s", c.ConsistencyLevel)
}
}
7 changes: 4 additions & 3 deletions services/opentsdb/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ import (
)

type Handler struct {
Database string
RetentionPolicy string
Database string
RetentionPolicy string
ConsistencyLevel cluster.ConsistencyLevel

PointsWriter interface {
WritePoints(p *cluster.WritePointsRequest) error
Expand Down Expand Up @@ -115,7 +116,7 @@ func (h *Handler) servePut(w http.ResponseWriter, r *http.Request) {
if err := h.PointsWriter.WritePoints(&cluster.WritePointsRequest{
Database: h.Database,
RetentionPolicy: h.RetentionPolicy,
ConsistencyLevel: cluster.ConsistencyLevelOne,
ConsistencyLevel: h.ConsistencyLevel,
Points: points,
}); influxdb.IsClientError(err) {
h.Logger.Println("write series error: ", err)
Expand Down
38 changes: 23 additions & 15 deletions services/opentsdb/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ type Service struct {
wg sync.WaitGroup
err chan error

BindAddress string
Database string
RetentionPolicy string
BindAddress string
Database string
RetentionPolicy string
ConsistencyLevel cluster.ConsistencyLevel

PointsWriter interface {
WritePoints(p *cluster.WritePointsRequest) error
Expand All @@ -38,15 +39,21 @@ type Service struct {
}

// NewService returns a new instance of Service.
func NewService(c Config) *Service {
func NewService(c Config) (*Service, error) {
consistencyLevel, err := cluster.ParseConsistencyLevel(c.ConsistencyLevel)
if err != nil {
return nil, err
}

s := &Service{
err: make(chan error),
BindAddress: c.BindAddress,
Database: c.Database,
RetentionPolicy: c.RetentionPolicy,
Logger: log.New(os.Stderr, "[opentsdb] ", log.LstdFlags),
err: make(chan error),
BindAddress: c.BindAddress,
Database: c.Database,
RetentionPolicy: c.RetentionPolicy,
ConsistencyLevel: consistencyLevel,
Logger: log.New(os.Stderr, "[opentsdb] ", log.LstdFlags),
}
return s
return s, nil
}

// Open starts the service
Expand Down Expand Up @@ -211,7 +218,7 @@ func (s *Service) handleTelnetConn(conn net.Conn) {
if err := s.PointsWriter.WritePoints(&cluster.WritePointsRequest{
Database: s.Database,
RetentionPolicy: s.RetentionPolicy,
ConsistencyLevel: cluster.ConsistencyLevelOne,
ConsistencyLevel: s.ConsistencyLevel,
Points: []tsdb.Point{p},
}); err != nil {
s.Logger.Println("TSDB cannot write data: ", err)
Expand All @@ -223,10 +230,11 @@ func (s *Service) handleTelnetConn(conn net.Conn) {
// serveHTTP handles connections in HTTP format.
func (s *Service) serveHTTP() {
srv := &http.Server{Handler: &Handler{
Database: s.Database,
RetentionPolicy: s.RetentionPolicy,
PointsWriter: s.PointsWriter,
Logger: s.Logger,
Database: s.Database,
RetentionPolicy: s.RetentionPolicy,
ConsistencyLevel: s.ConsistencyLevel,
PointsWriter: s.PointsWriter,
Logger: s.Logger,
}}
srv.Serve(s.httpln)
}
12 changes: 6 additions & 6 deletions services/opentsdb/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,12 @@ type Service struct {

// NewService returns a new instance of Service.
func NewService(database string) *Service {
s := &Service{
Service: opentsdb.NewService(opentsdb.Config{
BindAddress: "127.0.0.1:0",
Database: database,
}),
}
srv, _ := opentsdb.NewService(opentsdb.Config{
BindAddress: "127.0.0.1:0",
Database: database,
ConsistencyLevel: "one",
})
s := &Service{Service: srv}
s.Service.PointsWriter = &s.PointsWriter

if !testing.Verbose() {
Expand Down

0 comments on commit 4545abd

Please sign in to comment.