-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Re-enable Continuous Queries #2200
Changes from all commits
65184aa
cb4b18e
4d56c19
3c91700
913f895
861986e
f0fc233
6d60245
ed05cad
b244fac
8ca6ac3
a67e88c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,11 @@ | ||
package influxdb | ||
|
||
import ( | ||
"fmt" | ||
"log" | ||
"math/rand" | ||
"net/http" | ||
"net/url" | ||
"time" | ||
|
||
"github.com/influxdb/influxdb/messaging" | ||
|
@@ -26,9 +31,6 @@ type Broker struct { | |
|
||
done chan struct{} | ||
|
||
// send CQ processing requests to the same data node | ||
// currentCQProcessingNode *messaging.Replica // FIX(benbjohnson) | ||
|
||
// variables to control when to trigger processing and when to timeout | ||
TriggerInterval time.Duration | ||
TriggerTimeout time.Duration | ||
|
@@ -47,14 +49,10 @@ func NewBroker() *Broker { | |
|
||
// RunContinuousQueryLoop starts running continuous queries on a background goroutine. | ||
func (b *Broker) RunContinuousQueryLoop() { | ||
// FIX(benbjohnson) | ||
// b.done = make(chan struct{}) | ||
// go b.continuousQueryLoop(b.done) | ||
b.done = make(chan struct{}) | ||
go b.continuousQueryLoop(b.done) | ||
} | ||
|
||
/* | ||
|
||
|
||
// Close closes the broker. | ||
func (b *Broker) Close() error { | ||
if b.done != nil { | ||
|
@@ -81,35 +79,36 @@ func (b *Broker) continuousQueryLoop(done chan struct{}) { | |
} | ||
|
||
func (b *Broker) runContinuousQueries() { | ||
next := 0 | ||
for { | ||
// if the current node hasn't been set it's our first time or we're reset. move to the next one | ||
if b.currentCQProcessingNode == nil { | ||
dataNodes := b.Broker.Replicas() | ||
if len(dataNodes) == 0 { | ||
return // don't have any nodes to try, give it up | ||
} | ||
next = next % len(dataNodes) | ||
b.currentCQProcessingNode = dataNodes[next] | ||
next++ | ||
} | ||
topic := b.Broker.Topic(BroadcastTopicID) | ||
if topic == nil { | ||
log.Println("broker cq: no broadcast topic currently available.") | ||
return // don't have any topics to get data urls from, give it up | ||
} | ||
dataURLs := topic.DataURLs() | ||
if len(dataURLs) == 0 { | ||
log.Println("broker cq: no data nodes currently available.") | ||
return // don't have any data urls to try, give it up | ||
} | ||
|
||
rand.Seed(time.Now().UnixNano()) | ||
// get a set of random indexes so we can randomly distribute cq load over nodes | ||
ri := rand.Perm(len(dataURLs)) | ||
for _, i := range ri { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This means that if all nodes return an error, the loop will exit. Is that how the code worked before? I'm not sure. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The code did not work this way before, but I believe it was INTENDED to work that way. There were several bugs that effectively this would be stuck in an infinite loop only trying the first data node ever. So, because of that bug, I can only assume that we meant to randomly distribute the load and then if we couldn't talk to any of them, exit and wait for the next time we want to ask for continuous queries to run. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK, makes sense. I actually like this approach, seems quite sensible. |
||
u := dataURLs[i] | ||
// if no error, we're all good | ||
err := b.requestContinuousQueryProcessing() | ||
err := b.requestContinuousQueryProcessing(u) | ||
if err == nil { | ||
return | ||
} | ||
log.Printf("broker cq: error hitting data node: %s: %s\n", b.currentCQProcessingNode.URL, err.Error()) | ||
log.Printf("broker cq: error hitting data node: %s: %s\n", u.String(), err.Error()) | ||
|
||
// reset and let the loop try the next data node in the cluster | ||
b.currentCQProcessingNode = nil | ||
// let the loop try the next data node in the cluster | ||
<-time.After(DefaultFailureSleep) | ||
} | ||
} | ||
|
||
func (b *Broker) requestContinuousQueryProcessing() error { | ||
func (b *Broker) requestContinuousQueryProcessing(cqURL url.URL) error { | ||
// Send request. | ||
cqURL := copyURL(b.currentCQProcessingNode.URL) | ||
cqURL.Path = "/process_continuous_queries" | ||
cqURL.Scheme = "http" | ||
client := &http.Client{ | ||
|
@@ -128,5 +127,3 @@ func (b *Broker) requestContinuousQueryProcessing() error { | |
|
||
return nil | ||
} | ||
|
||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a strange way to get the URLs. :-) I know why a topic knows about the URLs, but is there no other way?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should probably add a convenience method to the broker which is something like:
And then we can hide some of this searching around.
There is no other way that a broker will know about data urls right now.