Skip to content

Commit

Permalink
Merge pull request #2200 from influxdb/renenable-cq
Browse files Browse the repository at this point in the history
Re-enable Continuous Queries
  • Loading branch information
corylanou committed Apr 8, 2015
2 parents 553c94e + a67e88c commit 8e19d52
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 42 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
- [#2181](https://github.com/influxdb/influxdb/pull/2181): Fix panic on "SHOW DIAGNOSTICS".
- [#2170](https://github.com/influxdb/influxdb/pull/2170): Make sure queries on missing tags return 200 status.
- [#2197](https://github.com/influxdb/influxdb/pull/2197): Lock server during Open().
- [#2200](https://github.com/influxdb/influxdb/pull/2200): Re-enable Continuous Queries.

## v0.9.0-rc20 [2015-04-04]

Expand Down
55 changes: 26 additions & 29 deletions broker.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
package influxdb

import (
"fmt"
"log"
"math/rand"
"net/http"
"net/url"
"time"

"github.com/influxdb/influxdb/messaging"
Expand All @@ -26,9 +31,6 @@ type Broker struct {

done chan struct{}

// send CQ processing requests to the same data node
// currentCQProcessingNode *messaging.Replica // FIX(benbjohnson)

// variables to control when to trigger processing and when to timeout
TriggerInterval time.Duration
TriggerTimeout time.Duration
Expand All @@ -47,14 +49,10 @@ func NewBroker() *Broker {

// RunContinuousQueryLoop starts running continuous queries on a background goroutine.
func (b *Broker) RunContinuousQueryLoop() {
// FIX(benbjohnson)
// b.done = make(chan struct{})
// go b.continuousQueryLoop(b.done)
b.done = make(chan struct{})
go b.continuousQueryLoop(b.done)
}

/*
// Close closes the broker.
func (b *Broker) Close() error {
if b.done != nil {
Expand All @@ -81,35 +79,36 @@ func (b *Broker) continuousQueryLoop(done chan struct{}) {
}

func (b *Broker) runContinuousQueries() {
next := 0
for {
// if the current node hasn't been set it's our first time or we're reset. move to the next one
if b.currentCQProcessingNode == nil {
dataNodes := b.Broker.Replicas()
if len(dataNodes) == 0 {
return // don't have any nodes to try, give it up
}
next = next % len(dataNodes)
b.currentCQProcessingNode = dataNodes[next]
next++
}
topic := b.Broker.Topic(BroadcastTopicID)
if topic == nil {
log.Println("broker cq: no broadcast topic currently available.")
return // don't have any topics to get data urls from, give it up
}
dataURLs := topic.DataURLs()
if len(dataURLs) == 0 {
log.Println("broker cq: no data nodes currently available.")
return // don't have any data urls to try, give it up
}

rand.Seed(time.Now().UnixNano())
// get a set of random indexes so we can randomly distribute cq load over nodes
ri := rand.Perm(len(dataURLs))
for _, i := range ri {
u := dataURLs[i]
// if no error, we're all good
err := b.requestContinuousQueryProcessing()
err := b.requestContinuousQueryProcessing(u)
if err == nil {
return
}
log.Printf("broker cq: error hitting data node: %s: %s\n", b.currentCQProcessingNode.URL, err.Error())
log.Printf("broker cq: error hitting data node: %s: %s\n", u.String(), err.Error())

// reset and let the loop try the next data node in the cluster
b.currentCQProcessingNode = nil
// let the loop try the next data node in the cluster
<-time.After(DefaultFailureSleep)
}
}

func (b *Broker) requestContinuousQueryProcessing() error {
func (b *Broker) requestContinuousQueryProcessing(cqURL url.URL) error {
// Send request.
cqURL := copyURL(b.currentCQProcessingNode.URL)
cqURL.Path = "/process_continuous_queries"
cqURL.Scheme = "http"
client := &http.Client{
Expand All @@ -128,5 +127,3 @@ func (b *Broker) requestContinuousQueryProcessing() error {

return nil
}
*/
15 changes: 8 additions & 7 deletions cmd/influxd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,13 +173,6 @@ func (cmd *RunCommand) Open(config *Config, join string) (*messaging.Broker, *in
}()
log.Printf("TCP server listening on %s", cmd.config.ClusterAddr())

// have it occasionally tell a data node in the cluster to run continuous queries
if cmd.config.ContinuousQuery.Disabled {
log.Printf("Not running continuous queries. [continuous_queries].disabled is set to true.")
} else {
cmd.node.broker.RunContinuousQueryLoop()
}

var s *influxdb.Server
// Open server, initialize or join as necessary.
if cmd.config.Data.Enabled {
Expand Down Expand Up @@ -343,7 +336,15 @@ func (cmd *RunCommand) Open(config *Config, join string) (*messaging.Broker, *in
var b *messaging.Broker
if cmd.node.broker != nil {
b = cmd.node.broker.Broker

// have it occasionally tell a data node in the cluster to run continuous queries
if cmd.config.ContinuousQuery.Disabled {
log.Printf("Not running continuous queries. [continuous_queries].disabled is set to true.")
} else {
cmd.node.broker.RunContinuousQueryLoop()
}
}

return b, s, cmd.node.raftLog
}

Expand Down
6 changes: 3 additions & 3 deletions database.go
Original file line number Diff line number Diff line change
Expand Up @@ -813,7 +813,7 @@ func (f *FieldCodec) EncodeFields(values map[string]interface{}) ([]byte, error)
buf[i+3] = byte(c)
}
default:
panic(fmt.Sprintf("unsupported value type: %T", v))
panic(fmt.Sprintf("unsupported value type during encode fields: %T", v))
}

// Always set the field ID as the leading byte.
Expand Down Expand Up @@ -868,7 +868,7 @@ func (f *FieldCodec) DecodeByID(targetID uint8, b []byte) (interface{}, error) {
// Move bytes forward.
b = b[size+3:]
default:
panic(fmt.Sprintf("unsupported value type: %T", field.Type))
panic(fmt.Sprintf("unsupported value type during decode by id: %T", field.Type))
}

if field.ID == targetID {
Expand Down Expand Up @@ -922,7 +922,7 @@ func (f *FieldCodec) DecodeFields(b []byte) (map[uint8]interface{}, error) {
// Move bytes forward.
b = b[size+3:]
default:
panic(fmt.Sprintf("unsupported value type: %T", f.fieldsByID[fieldID]))
panic(fmt.Sprintf("unsupported value type during decode fields: %T", f.fieldsByID[fieldID]))
}

values[fieldID] = value
Expand Down
11 changes: 10 additions & 1 deletion server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3808,7 +3808,7 @@ func (s *Server) runContinuousQuery(cq *ContinuousQuery) {
}

if err := s.runContinuousQueryAndWriteResult(cq); err != nil {
log.Printf("cq error: %s. running: %s\n", err.Error(), cq.cq.String())
log.Printf("cq error during recompute previous: %s. running: %s\n", err.Error(), cq.cq.String())
}

startTime = newStartTime
Expand Down Expand Up @@ -3838,6 +3838,15 @@ func (s *Server) runContinuousQueryAndWriteResult(cq *ContinuousQuery) error {
}

if len(points) > 0 {
for _, p := range points {
for _, v := range p.Fields {
if v == nil {
// If we have any nil values, we can't write the data
// This happens the CQ is created and running before we write data to the measurement
return nil
}
}
}
_, err = s.WriteSeries(cq.intoDB, cq.intoRP, points)
if err != nil {
log.Printf("[cq] err: %s", err)
Expand Down
3 changes: 1 addition & 2 deletions server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1690,9 +1690,8 @@ func TestServer_DropContinuousQuery(t *testing.T) {
}
}

// Ensure
// Ensure continuous queries run
func TestServer_RunContinuousQueries(t *testing.T) {
t.Skip()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
Expand Down

0 comments on commit 8e19d52

Please sign in to comment.