Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Re-enable Continuous Queries #2200

Merged
merged 12 commits into from
Apr 8, 2015
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
- [#2181](https://github.com/influxdb/influxdb/pull/2181): Fix panic on "SHOW DIAGNOSTICS".
- [#2170](https://github.com/influxdb/influxdb/pull/2170): Make sure queries on missing tags return 200 status.
- [#2197](https://github.com/influxdb/influxdb/pull/2197): Lock server during Open().
- [#2200](https://github.com/influxdb/influxdb/pull/2200): Re-enable Continuous Queries.

## v0.9.0-rc20 [2015-04-04]

Expand Down
55 changes: 26 additions & 29 deletions broker.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
package influxdb

import (
"fmt"
"log"
"math/rand"
"net/http"
"net/url"
"time"

"github.com/influxdb/influxdb/messaging"
Expand All @@ -26,9 +31,6 @@ type Broker struct {

done chan struct{}

// send CQ processing requests to the same data node
// currentCQProcessingNode *messaging.Replica // FIX(benbjohnson)

// variables to control when to trigger processing and when to timeout
TriggerInterval time.Duration
TriggerTimeout time.Duration
Expand All @@ -47,14 +49,10 @@ func NewBroker() *Broker {

// RunContinuousQueryLoop starts running continuous queries on a background goroutine.
func (b *Broker) RunContinuousQueryLoop() {
// FIX(benbjohnson)
// b.done = make(chan struct{})
// go b.continuousQueryLoop(b.done)
b.done = make(chan struct{})
go b.continuousQueryLoop(b.done)
}

/*


// Close closes the broker.
func (b *Broker) Close() error {
if b.done != nil {
Expand All @@ -81,35 +79,36 @@ func (b *Broker) continuousQueryLoop(done chan struct{}) {
}

func (b *Broker) runContinuousQueries() {
next := 0
for {
// if the current node hasn't been set it's our first time or we're reset. move to the next one
if b.currentCQProcessingNode == nil {
dataNodes := b.Broker.Replicas()
if len(dataNodes) == 0 {
return // don't have any nodes to try, give it up
}
next = next % len(dataNodes)
b.currentCQProcessingNode = dataNodes[next]
next++
}
topic := b.Broker.Topic(BroadcastTopicID)
if topic == nil {
log.Println("broker cq: no broadcast topic currently available.")
return // don't have any topics to get data urls from, give it up
}
dataURLs := topic.DataURLs()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a strange way to get the URLs. :-) I know why a topic knows about the URLs, but is there no other way?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably add a convenience method to the broker which is something like:

func (b *Broker) DataURLs() ([]url.URL, bool)

And then we can hide some of this searching around.

There is no other way that a broker will know about data urls right now.

if len(dataURLs) == 0 {
log.Println("broker cq: no data nodes currently available.")
return // don't have any data urls to try, give it up
}

rand.Seed(time.Now().UnixNano())
// get a set of random indexes so we can randomly distribute cq load over nodes
ri := rand.Perm(len(dataURLs))
for _, i := range ri {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means that if all nodes return an error, the loop will exit. Is that how the code worked before? I'm not sure.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code did not work this way before, but I believe it was INTENDED to work that way. There were several bugs that effectively this would be stuck in an infinite loop only trying the first data node ever.

So, because of that bug, I can only assume that we meant to randomly distribute the load and then if we couldn't talk to any of them, exit and wait for the next time we want to ask for continuous queries to run.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, makes sense. I actually like this approach, seems quite sensible.

u := dataURLs[i]
// if no error, we're all good
err := b.requestContinuousQueryProcessing()
err := b.requestContinuousQueryProcessing(u)
if err == nil {
return
}
log.Printf("broker cq: error hitting data node: %s: %s\n", b.currentCQProcessingNode.URL, err.Error())
log.Printf("broker cq: error hitting data node: %s: %s\n", u.String(), err.Error())

// reset and let the loop try the next data node in the cluster
b.currentCQProcessingNode = nil
// let the loop try the next data node in the cluster
<-time.After(DefaultFailureSleep)
}
}

func (b *Broker) requestContinuousQueryProcessing() error {
func (b *Broker) requestContinuousQueryProcessing(cqURL url.URL) error {
// Send request.
cqURL := copyURL(b.currentCQProcessingNode.URL)
cqURL.Path = "/process_continuous_queries"
cqURL.Scheme = "http"
client := &http.Client{
Expand All @@ -128,5 +127,3 @@ func (b *Broker) requestContinuousQueryProcessing() error {

return nil
}

*/
15 changes: 8 additions & 7 deletions cmd/influxd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,13 +173,6 @@ func (cmd *RunCommand) Open(config *Config, join string) (*messaging.Broker, *in
}()
log.Printf("TCP server listening on %s", cmd.config.ClusterAddr())

// have it occasionally tell a data node in the cluster to run continuous queries
if cmd.config.ContinuousQuery.Disabled {
log.Printf("Not running continuous queries. [continuous_queries].disabled is set to true.")
} else {
cmd.node.broker.RunContinuousQueryLoop()
}

var s *influxdb.Server
// Open server, initialize or join as necessary.
if cmd.config.Data.Enabled {
Expand Down Expand Up @@ -343,7 +336,15 @@ func (cmd *RunCommand) Open(config *Config, join string) (*messaging.Broker, *in
var b *messaging.Broker
if cmd.node.broker != nil {
b = cmd.node.broker.Broker

// have it occasionally tell a data node in the cluster to run continuous queries
if cmd.config.ContinuousQuery.Disabled {
log.Printf("Not running continuous queries. [continuous_queries].disabled is set to true.")
} else {
cmd.node.broker.RunContinuousQueryLoop()
}
}

return b, s, cmd.node.raftLog
}

Expand Down
6 changes: 3 additions & 3 deletions database.go
Original file line number Diff line number Diff line change
Expand Up @@ -813,7 +813,7 @@ func (f *FieldCodec) EncodeFields(values map[string]interface{}) ([]byte, error)
buf[i+3] = byte(c)
}
default:
panic(fmt.Sprintf("unsupported value type: %T", v))
panic(fmt.Sprintf("unsupported value type during encode fields: %T", v))
}

// Always set the field ID as the leading byte.
Expand Down Expand Up @@ -868,7 +868,7 @@ func (f *FieldCodec) DecodeByID(targetID uint8, b []byte) (interface{}, error) {
// Move bytes forward.
b = b[size+3:]
default:
panic(fmt.Sprintf("unsupported value type: %T", field.Type))
panic(fmt.Sprintf("unsupported value type during decode by id: %T", field.Type))
}

if field.ID == targetID {
Expand Down Expand Up @@ -922,7 +922,7 @@ func (f *FieldCodec) DecodeFields(b []byte) (map[uint8]interface{}, error) {
// Move bytes forward.
b = b[size+3:]
default:
panic(fmt.Sprintf("unsupported value type: %T", f.fieldsByID[fieldID]))
panic(fmt.Sprintf("unsupported value type during decode fields: %T", f.fieldsByID[fieldID]))
}

values[fieldID] = value
Expand Down
11 changes: 10 additions & 1 deletion server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3808,7 +3808,7 @@ func (s *Server) runContinuousQuery(cq *ContinuousQuery) {
}

if err := s.runContinuousQueryAndWriteResult(cq); err != nil {
log.Printf("cq error: %s. running: %s\n", err.Error(), cq.cq.String())
log.Printf("cq error during recompute previous: %s. running: %s\n", err.Error(), cq.cq.String())
}

startTime = newStartTime
Expand Down Expand Up @@ -3838,6 +3838,15 @@ func (s *Server) runContinuousQueryAndWriteResult(cq *ContinuousQuery) error {
}

if len(points) > 0 {
for _, p := range points {
for _, v := range p.Fields {
if v == nil {
// If we have any nil values, we can't write the data
// This happens the CQ is created and running before we write data to the measurement
return nil
}
}
}
_, err = s.WriteSeries(cq.intoDB, cq.intoRP, points)
if err != nil {
log.Printf("[cq] err: %s", err)
Expand Down
3 changes: 1 addition & 2 deletions server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1690,9 +1690,8 @@ func TestServer_DropContinuousQuery(t *testing.T) {
}
}

// Ensure
// Ensure continuous queries run
func TestServer_RunContinuousQueries(t *testing.T) {
t.Skip()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
Expand Down