From 65184aabc46ce27bfb2bd4d0677ede55e9776d49 Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Wed, 8 Apr 2015 12:25:04 -0600 Subject: [PATCH 01/11] enable continuous query --- broker.go | 37 ++++++++++++++++++++++--------------- cmd/influxd/run.go | 2 +- 2 files changed, 23 insertions(+), 16 deletions(-) diff --git a/broker.go b/broker.go index 2c5fb3cf613..54a5f93ad80 100644 --- a/broker.go +++ b/broker.go @@ -1,6 +1,10 @@ package influxdb import ( + "fmt" + "log" + "net/http" + "net/url" "time" "github.com/influxdb/influxdb/messaging" @@ -27,7 +31,7 @@ type Broker struct { done chan struct{} // send CQ processing requests to the same data node - // currentCQProcessingNode *messaging.Replica // FIX(benbjohnson) + currentCQProcessingNode *url.URL // variables to control when to trigger processing and when to timeout TriggerInterval time.Duration @@ -47,14 +51,12 @@ func NewBroker() *Broker { // RunContinuousQueryLoop starts running continuous queries on a background goroutine. func (b *Broker) RunContinuousQueryLoop() { - // FIX(benbjohnson) - // b.done = make(chan struct{}) - // go b.continuousQueryLoop(b.done) + log.Println("and...") + b.done = make(chan struct{}) + log.Println("boom!") + go b.continuousQueryLoop(b.done) } -/* - - // Close closes the broker. func (b *Broker) Close() error { if b.done != nil { @@ -85,12 +87,19 @@ func (b *Broker) runContinuousQueries() { for { // if the current node hasn't been set it's our first time or we're reset. move to the next one if b.currentCQProcessingNode == nil { - dataNodes := b.Broker.Replicas() - if len(dataNodes) == 0 { + topic := b.Broker.Topic(BroadcastTopicID) + if topic == nil { + log.Println("broker cq: no topics currently available.") return // don't have any nodes to try, give it up } - next = next % len(dataNodes) - b.currentCQProcessingNode = dataNodes[next] + dataURLs := topic.DataURLs() + if len(dataURLs) == 0 { + log.Println("broker cq: no data nodes currently available.") + return // don't have any nodes to try, give it up + } + next = next % len(dataURLs) + u := dataURLs[next] + b.currentCQProcessingNode = &u next++ } @@ -99,7 +108,7 @@ func (b *Broker) runContinuousQueries() { if err == nil { return } - log.Printf("broker cq: error hitting data node: %s: %s\n", b.currentCQProcessingNode.URL, err.Error()) + log.Printf("broker cq: error hitting data node: %s: %s\n", b.currentCQProcessingNode, err.Error()) // reset and let the loop try the next data node in the cluster b.currentCQProcessingNode = nil @@ -109,7 +118,7 @@ func (b *Broker) runContinuousQueries() { func (b *Broker) requestContinuousQueryProcessing() error { // Send request. - cqURL := copyURL(b.currentCQProcessingNode.URL) + cqURL := copyURL(b.currentCQProcessingNode) cqURL.Path = "/process_continuous_queries" cqURL.Scheme = "http" client := &http.Client{ @@ -128,5 +137,3 @@ func (b *Broker) requestContinuousQueryProcessing() error { return nil } - -*/ diff --git a/cmd/influxd/run.go b/cmd/influxd/run.go index 2cc22b42daf..7005ea1a3e9 100644 --- a/cmd/influxd/run.go +++ b/cmd/influxd/run.go @@ -176,7 +176,7 @@ func (cmd *RunCommand) Open(config *Config, join string) (*messaging.Broker, *in // have it occasionally tell a data node in the cluster to run continuous queries if cmd.config.ContinuousQuery.Disabled { log.Printf("Not running continuous queries. [continuous_queries].disabled is set to true.") - } else { + } else if cmd.node.broker != nil { cmd.node.broker.RunContinuousQueryLoop() } From cb4b18ebe7a6da04a9b1e75a7b03099fab590011 Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Wed, 8 Apr 2015 13:58:34 -0600 Subject: [PATCH 02/11] enable continuous query testing --- server_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/server_test.go b/server_test.go index 5a78f6fa08a..2830f0623d1 100644 --- a/server_test.go +++ b/server_test.go @@ -1692,7 +1692,6 @@ func TestServer_DropContinuousQuery(t *testing.T) { // Ensure func TestServer_RunContinuousQueries(t *testing.T) { - t.Skip() c := test.NewDefaultMessagingClient() defer c.Close() s := OpenServer(c) From 4d56c19ed493381d995f1504a63a80b8185609da Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Wed, 8 Apr 2015 13:59:17 -0600 Subject: [PATCH 03/11] remove debugging --- broker.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/broker.go b/broker.go index 54a5f93ad80..fa6de188a20 100644 --- a/broker.go +++ b/broker.go @@ -51,9 +51,7 @@ func NewBroker() *Broker { // RunContinuousQueryLoop starts running continuous queries on a background goroutine. func (b *Broker) RunContinuousQueryLoop() { - log.Println("and...") b.done = make(chan struct{}) - log.Println("boom!") go b.continuousQueryLoop(b.done) } From 3c91700da2d96a1186cf810bd9fcead7fbefef12 Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Wed, 8 Apr 2015 14:00:18 -0600 Subject: [PATCH 04/11] clarify comments --- broker.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/broker.go b/broker.go index fa6de188a20..8a1797de67f 100644 --- a/broker.go +++ b/broker.go @@ -88,12 +88,12 @@ func (b *Broker) runContinuousQueries() { topic := b.Broker.Topic(BroadcastTopicID) if topic == nil { log.Println("broker cq: no topics currently available.") - return // don't have any nodes to try, give it up + return // don't have any topics to get data urls from, give it up } dataURLs := topic.DataURLs() if len(dataURLs) == 0 { log.Println("broker cq: no data nodes currently available.") - return // don't have any nodes to try, give it up + return // don't have any data urls to try, give it up } next = next % len(dataURLs) u := dataURLs[next] From 913f8955bd9ca1eea2ab7b39fe7cb030d2d2dbe9 Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Wed, 8 Apr 2015 15:14:59 -0600 Subject: [PATCH 05/11] refactor runContinousQueries --- broker.go | 48 ++++++++++++++++++++---------------------------- 1 file changed, 20 insertions(+), 28 deletions(-) diff --git a/broker.go b/broker.go index 8a1797de67f..87024519b6f 100644 --- a/broker.go +++ b/broker.go @@ -3,6 +3,7 @@ package influxdb import ( "fmt" "log" + "math/rand" "net/http" "net/url" "time" @@ -30,9 +31,6 @@ type Broker struct { done chan struct{} - // send CQ processing requests to the same data node - currentCQProcessingNode *url.URL - // variables to control when to trigger processing and when to timeout TriggerInterval time.Duration TriggerTimeout time.Duration @@ -81,42 +79,36 @@ func (b *Broker) continuousQueryLoop(done chan struct{}) { } func (b *Broker) runContinuousQueries() { - next := 0 - for { - // if the current node hasn't been set it's our first time or we're reset. move to the next one - if b.currentCQProcessingNode == nil { - topic := b.Broker.Topic(BroadcastTopicID) - if topic == nil { - log.Println("broker cq: no topics currently available.") - return // don't have any topics to get data urls from, give it up - } - dataURLs := topic.DataURLs() - if len(dataURLs) == 0 { - log.Println("broker cq: no data nodes currently available.") - return // don't have any data urls to try, give it up - } - next = next % len(dataURLs) - u := dataURLs[next] - b.currentCQProcessingNode = &u - next++ - } + topic := b.Broker.Topic(BroadcastTopicID) + if topic == nil { + log.Println("broker cq: no topics currently available.") + return // don't have any topics to get data urls from, give it up + } + dataURLs := topic.DataURLs() + if len(dataURLs) == 0 { + log.Println("broker cq: no data nodes currently available.") + return // don't have any data urls to try, give it up + } + rand.Seed(time.Now().UnixNano()) + // get a set of random indexes so we can randomly distribute cq load over nodes + ri := rand.Perm(len(dataURLs)) + for _, i := range ri { + u := dataURLs[i] // if no error, we're all good - err := b.requestContinuousQueryProcessing() + err := b.requestContinuousQueryProcessing(u) if err == nil { return } - log.Printf("broker cq: error hitting data node: %s: %s\n", b.currentCQProcessingNode, err.Error()) + log.Printf("broker cq: error hitting data node: %s: %s\n", u.String(), err.Error()) - // reset and let the loop try the next data node in the cluster - b.currentCQProcessingNode = nil + // let the loop try the next data node in the cluster <-time.After(DefaultFailureSleep) } } -func (b *Broker) requestContinuousQueryProcessing() error { +func (b *Broker) requestContinuousQueryProcessing(cqURL url.URL) error { // Send request. - cqURL := copyURL(b.currentCQProcessingNode) cqURL.Path = "/process_continuous_queries" cqURL.Scheme = "http" client := &http.Client{ From 861986e062368856a354dbe42c3a0534c77c7003 Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Wed, 8 Apr 2015 15:20:54 -0600 Subject: [PATCH 06/11] Update CHANGELOG.md --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index d50f842b423..a3b5c9243a8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ - [#2181](https://github.com/influxdb/influxdb/pull/2181): Fix panic on "SHOW DIAGNOSTICS". - [#2170](https://github.com/influxdb/influxdb/pull/2170): Make sure queries on missing tags return 200 status. - [#2197](https://github.com/influxdb/influxdb/pull/2197): Lock server during Open(). +- [#2200](https://github.com/influxdb/influxdb/pull/2200): Re-enable Continuous Queries. ## v0.9.0-rc20 [2015-04-04] From f0fc23344105eb55c07783863ddeb0b71eec1746 Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Wed, 8 Apr 2015 15:35:00 -0600 Subject: [PATCH 07/11] update log message --- broker.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/broker.go b/broker.go index 87024519b6f..5ba12b5343a 100644 --- a/broker.go +++ b/broker.go @@ -81,7 +81,7 @@ func (b *Broker) continuousQueryLoop(done chan struct{}) { func (b *Broker) runContinuousQueries() { topic := b.Broker.Topic(BroadcastTopicID) if topic == nil { - log.Println("broker cq: no topics currently available.") + log.Println("broker cq: no broadcast topic currently available.") return // don't have any topics to get data urls from, give it up } dataURLs := topic.DataURLs() From 6d602456abc4783761bcac50bfc8e2c2616c5d26 Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Wed, 8 Apr 2015 15:35:57 -0600 Subject: [PATCH 08/11] update test comment --- server_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server_test.go b/server_test.go index 2830f0623d1..03fd85dbb2f 100644 --- a/server_test.go +++ b/server_test.go @@ -1690,7 +1690,7 @@ func TestServer_DropContinuousQuery(t *testing.T) { } } -// Ensure +// Ensure continuous queries run func TestServer_RunContinuousQueries(t *testing.T) { c := test.NewDefaultMessagingClient() defer c.Close() From ed05cadf3500617ca0eeca20b4dcaed74ce9b785 Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Wed, 8 Apr 2015 15:43:09 -0600 Subject: [PATCH 09/11] move where we enable continous queries when starting up --- cmd/influxd/run.go | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/cmd/influxd/run.go b/cmd/influxd/run.go index 7005ea1a3e9..e695fa39b41 100644 --- a/cmd/influxd/run.go +++ b/cmd/influxd/run.go @@ -173,13 +173,6 @@ func (cmd *RunCommand) Open(config *Config, join string) (*messaging.Broker, *in }() log.Printf("TCP server listening on %s", cmd.config.ClusterAddr()) - // have it occasionally tell a data node in the cluster to run continuous queries - if cmd.config.ContinuousQuery.Disabled { - log.Printf("Not running continuous queries. [continuous_queries].disabled is set to true.") - } else if cmd.node.broker != nil { - cmd.node.broker.RunContinuousQueryLoop() - } - var s *influxdb.Server // Open server, initialize or join as necessary. if cmd.config.Data.Enabled { @@ -343,7 +336,15 @@ func (cmd *RunCommand) Open(config *Config, join string) (*messaging.Broker, *in var b *messaging.Broker if cmd.node.broker != nil { b = cmd.node.broker.Broker + + // have it occasionally tell a data node in the cluster to run continuous queries + if cmd.config.ContinuousQuery.Disabled { + log.Printf("Not running continuous queries. [continuous_queries].disabled is set to true.") + } else { + cmd.node.broker.RunContinuousQueryLoop() + } } + return b, s, cmd.node.raftLog } From 8ca6ac37ff163044462b017db823f0046829f196 Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Wed, 8 Apr 2015 17:11:53 -0600 Subject: [PATCH 10/11] enhance error messages to know where they happened. --- database.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/database.go b/database.go index b2cb800a87c..21520b70e23 100644 --- a/database.go +++ b/database.go @@ -813,7 +813,7 @@ func (f *FieldCodec) EncodeFields(values map[string]interface{}) ([]byte, error) buf[i+3] = byte(c) } default: - panic(fmt.Sprintf("unsupported value type: %T", v)) + panic(fmt.Sprintf("unsupported value type during encode fields: %T", v)) } // Always set the field ID as the leading byte. @@ -868,7 +868,7 @@ func (f *FieldCodec) DecodeByID(targetID uint8, b []byte) (interface{}, error) { // Move bytes forward. b = b[size+3:] default: - panic(fmt.Sprintf("unsupported value type: %T", field.Type)) + panic(fmt.Sprintf("unsupported value type during decode by id: %T", field.Type)) } if field.ID == targetID { @@ -922,7 +922,7 @@ func (f *FieldCodec) DecodeFields(b []byte) (map[uint8]interface{}, error) { // Move bytes forward. b = b[size+3:] default: - panic(fmt.Sprintf("unsupported value type: %T", f.fieldsByID[fieldID])) + panic(fmt.Sprintf("unsupported value type during decode fields: %T", f.fieldsByID[fieldID])) } values[fieldID] = value From a67e88ceef99037375382c7021ffb99b2688e727 Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Wed, 8 Apr 2015 17:12:37 -0600 Subject: [PATCH 11/11] fix nil writes to data for cq --- server.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/server.go b/server.go index a5a0b5542d2..4844f32a5a7 100644 --- a/server.go +++ b/server.go @@ -3808,7 +3808,7 @@ func (s *Server) runContinuousQuery(cq *ContinuousQuery) { } if err := s.runContinuousQueryAndWriteResult(cq); err != nil { - log.Printf("cq error: %s. running: %s\n", err.Error(), cq.cq.String()) + log.Printf("cq error during recompute previous: %s. running: %s\n", err.Error(), cq.cq.String()) } startTime = newStartTime @@ -3838,6 +3838,15 @@ func (s *Server) runContinuousQueryAndWriteResult(cq *ContinuousQuery) error { } if len(points) > 0 { + for _, p := range points { + for _, v := range p.Fields { + if v == nil { + // If we have any nil values, we can't write the data + // This happens the CQ is created and running before we write data to the measurement + return nil + } + } + } _, err = s.WriteSeries(cq.intoDB, cq.intoRP, points) if err != nil { log.Printf("[cq] err: %s", err)