diff --git a/CHANGELOG.md b/CHANGELOG.md index 99c1776d569..03609a822c5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ - [#2181](https://github.com/influxdb/influxdb/pull/2181): Fix panic on "SHOW DIAGNOSTICS". - [#2170](https://github.com/influxdb/influxdb/pull/2170): Make sure queries on missing tags return 200 status. - [#2197](https://github.com/influxdb/influxdb/pull/2197): Lock server during Open(). +- [#2200](https://github.com/influxdb/influxdb/pull/2200): Re-enable Continuous Queries. ## v0.9.0-rc20 [2015-04-04] diff --git a/broker.go b/broker.go index 2c5fb3cf613..5ba12b5343a 100644 --- a/broker.go +++ b/broker.go @@ -1,6 +1,11 @@ package influxdb import ( + "fmt" + "log" + "math/rand" + "net/http" + "net/url" "time" "github.com/influxdb/influxdb/messaging" @@ -26,9 +31,6 @@ type Broker struct { done chan struct{} - // send CQ processing requests to the same data node - // currentCQProcessingNode *messaging.Replica // FIX(benbjohnson) - // variables to control when to trigger processing and when to timeout TriggerInterval time.Duration TriggerTimeout time.Duration @@ -47,14 +49,10 @@ func NewBroker() *Broker { // RunContinuousQueryLoop starts running continuous queries on a background goroutine. func (b *Broker) RunContinuousQueryLoop() { - // FIX(benbjohnson) - // b.done = make(chan struct{}) - // go b.continuousQueryLoop(b.done) + b.done = make(chan struct{}) + go b.continuousQueryLoop(b.done) } -/* - - // Close closes the broker. func (b *Broker) Close() error { if b.done != nil { @@ -81,35 +79,36 @@ func (b *Broker) continuousQueryLoop(done chan struct{}) { } func (b *Broker) runContinuousQueries() { - next := 0 - for { - // if the current node hasn't been set it's our first time or we're reset. move to the next one - if b.currentCQProcessingNode == nil { - dataNodes := b.Broker.Replicas() - if len(dataNodes) == 0 { - return // don't have any nodes to try, give it up - } - next = next % len(dataNodes) - b.currentCQProcessingNode = dataNodes[next] - next++ - } + topic := b.Broker.Topic(BroadcastTopicID) + if topic == nil { + log.Println("broker cq: no broadcast topic currently available.") + return // don't have any topics to get data urls from, give it up + } + dataURLs := topic.DataURLs() + if len(dataURLs) == 0 { + log.Println("broker cq: no data nodes currently available.") + return // don't have any data urls to try, give it up + } + rand.Seed(time.Now().UnixNano()) + // get a set of random indexes so we can randomly distribute cq load over nodes + ri := rand.Perm(len(dataURLs)) + for _, i := range ri { + u := dataURLs[i] // if no error, we're all good - err := b.requestContinuousQueryProcessing() + err := b.requestContinuousQueryProcessing(u) if err == nil { return } - log.Printf("broker cq: error hitting data node: %s: %s\n", b.currentCQProcessingNode.URL, err.Error()) + log.Printf("broker cq: error hitting data node: %s: %s\n", u.String(), err.Error()) - // reset and let the loop try the next data node in the cluster - b.currentCQProcessingNode = nil + // let the loop try the next data node in the cluster <-time.After(DefaultFailureSleep) } } -func (b *Broker) requestContinuousQueryProcessing() error { +func (b *Broker) requestContinuousQueryProcessing(cqURL url.URL) error { // Send request. - cqURL := copyURL(b.currentCQProcessingNode.URL) cqURL.Path = "/process_continuous_queries" cqURL.Scheme = "http" client := &http.Client{ @@ -128,5 +127,3 @@ func (b *Broker) requestContinuousQueryProcessing() error { return nil } - -*/ diff --git a/cmd/influxd/run.go b/cmd/influxd/run.go index 2cc22b42daf..e695fa39b41 100644 --- a/cmd/influxd/run.go +++ b/cmd/influxd/run.go @@ -173,13 +173,6 @@ func (cmd *RunCommand) Open(config *Config, join string) (*messaging.Broker, *in }() log.Printf("TCP server listening on %s", cmd.config.ClusterAddr()) - // have it occasionally tell a data node in the cluster to run continuous queries - if cmd.config.ContinuousQuery.Disabled { - log.Printf("Not running continuous queries. [continuous_queries].disabled is set to true.") - } else { - cmd.node.broker.RunContinuousQueryLoop() - } - var s *influxdb.Server // Open server, initialize or join as necessary. if cmd.config.Data.Enabled { @@ -343,7 +336,15 @@ func (cmd *RunCommand) Open(config *Config, join string) (*messaging.Broker, *in var b *messaging.Broker if cmd.node.broker != nil { b = cmd.node.broker.Broker + + // have it occasionally tell a data node in the cluster to run continuous queries + if cmd.config.ContinuousQuery.Disabled { + log.Printf("Not running continuous queries. [continuous_queries].disabled is set to true.") + } else { + cmd.node.broker.RunContinuousQueryLoop() + } } + return b, s, cmd.node.raftLog } diff --git a/database.go b/database.go index b2cb800a87c..21520b70e23 100644 --- a/database.go +++ b/database.go @@ -813,7 +813,7 @@ func (f *FieldCodec) EncodeFields(values map[string]interface{}) ([]byte, error) buf[i+3] = byte(c) } default: - panic(fmt.Sprintf("unsupported value type: %T", v)) + panic(fmt.Sprintf("unsupported value type during encode fields: %T", v)) } // Always set the field ID as the leading byte. @@ -868,7 +868,7 @@ func (f *FieldCodec) DecodeByID(targetID uint8, b []byte) (interface{}, error) { // Move bytes forward. b = b[size+3:] default: - panic(fmt.Sprintf("unsupported value type: %T", field.Type)) + panic(fmt.Sprintf("unsupported value type during decode by id: %T", field.Type)) } if field.ID == targetID { @@ -922,7 +922,7 @@ func (f *FieldCodec) DecodeFields(b []byte) (map[uint8]interface{}, error) { // Move bytes forward. b = b[size+3:] default: - panic(fmt.Sprintf("unsupported value type: %T", f.fieldsByID[fieldID])) + panic(fmt.Sprintf("unsupported value type during decode fields: %T", f.fieldsByID[fieldID])) } values[fieldID] = value diff --git a/server.go b/server.go index 4c8ef8ff3c7..bea892af001 100644 --- a/server.go +++ b/server.go @@ -3808,7 +3808,7 @@ func (s *Server) runContinuousQuery(cq *ContinuousQuery) { } if err := s.runContinuousQueryAndWriteResult(cq); err != nil { - log.Printf("cq error: %s. running: %s\n", err.Error(), cq.cq.String()) + log.Printf("cq error during recompute previous: %s. running: %s\n", err.Error(), cq.cq.String()) } startTime = newStartTime @@ -3838,6 +3838,15 @@ func (s *Server) runContinuousQueryAndWriteResult(cq *ContinuousQuery) error { } if len(points) > 0 { + for _, p := range points { + for _, v := range p.Fields { + if v == nil { + // If we have any nil values, we can't write the data + // This happens the CQ is created and running before we write data to the measurement + return nil + } + } + } _, err = s.WriteSeries(cq.intoDB, cq.intoRP, points) if err != nil { log.Printf("[cq] err: %s", err) diff --git a/server_test.go b/server_test.go index 5a78f6fa08a..03fd85dbb2f 100644 --- a/server_test.go +++ b/server_test.go @@ -1690,9 +1690,8 @@ func TestServer_DropContinuousQuery(t *testing.T) { } } -// Ensure +// Ensure continuous queries run func TestServer_RunContinuousQueries(t *testing.T) { - t.Skip() c := test.NewDefaultMessagingClient() defer c.Close() s := OpenServer(c)