Skip to content

Commit

Permalink
Merge pull request #2779 from influxdb/hook-CQs-back-2733
Browse files Browse the repository at this point in the history
fix #2733: hook CQs back in
  • Loading branch information
toddboom committed Jun 6, 2015
2 parents 9ec6e4f + fcf89c8 commit 4bf0241
Show file tree
Hide file tree
Showing 6 changed files with 982 additions and 2 deletions.
2 changes: 1 addition & 1 deletion client/influxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (c *Client) Write(bp BatchPoints) (*Response, error) {
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Content-Type", "")
req.Header.Set("User-Agent", c.userAgent)
if c.username != "" {
req.SetBasicAuth(c.username, c.password)
Expand Down
20 changes: 20 additions & 0 deletions cmd/influxd/run/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/influxdb/influxdb/meta"
"github.com/influxdb/influxdb/services/admin"
"github.com/influxdb/influxdb/services/collectd"
"github.com/influxdb/influxdb/services/continuous_querier"
"github.com/influxdb/influxdb/services/graphite"
"github.com/influxdb/influxdb/services/httpd"
"github.com/influxdb/influxdb/services/opentsdb"
Expand Down Expand Up @@ -70,6 +71,7 @@ func NewServer(c *Config) *Server {
// Append services.
s.appendClusterService(c.Cluster)
s.appendAdminService(c.Admin)
s.appendContinuousQueryService(c.ContinuousQuery)
s.appendHTTPDService(c.HTTPD)
s.appendCollectdService(c.Collectd)
s.appendOpenTSDBService(c.OpenTSDB)
Expand Down Expand Up @@ -114,6 +116,14 @@ func (s *Server) appendHTTPDService(c httpd.Config) {
srv.Handler.MetaStore = s.MetaStore
srv.Handler.QueryExecutor = s.QueryExecutor
srv.Handler.PointsWriter = s.PointsWriter

// If a ContinuousQuerier service has been started, attach it.
for _, srvc := range s.Services {
if cqsrvc, ok := srvc.(continuous_querier.ContinuousQuerier); ok {
srv.Handler.ContinuousQuerier = cqsrvc
}
}

s.Services = append(s.Services, srv)
}

Expand Down Expand Up @@ -147,6 +157,16 @@ func (s *Server) appendUDPService(c udp.Config) {
}
srv := udp.NewService(c)
srv.PointsWriter = s.PointsWriter
}

func (s *Server) appendContinuousQueryService(c continuous_querier.Config) {
if !c.Enabled {
return
}
srv := continuous_querier.NewService(c)
srv.MetaStore = s.MetaStore
srv.QueryExecutor = s.QueryExecutor
srv.PointsWriter = s.PointsWriter
s.Services = append(s.Services, srv)
}

Expand Down
6 changes: 5 additions & 1 deletion meta/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,11 @@ func (s *Store) Ready() <-chan struct{} { return s.ready }
func (s *Store) Err() <-chan error { return s.err }

// IsLeader returns true if the store is currently the leader.
func (s *Store) IsLeader() bool { return s.raft.State() == raft.Leader }
func (s *Store) IsLeader() bool {
s.mu.RLock()
defer s.mu.RUnlock()
return s.raft.State() == raft.Leader
}

// LeaderCh returns a channel that notifies on leadership change.
// Panics when the store has not been opened yet.
Expand Down
Loading

0 comments on commit 4bf0241

Please sign in to comment.