diff --git a/CHANGELOG.md b/CHANGELOG.md index b86a923a4ed..722df4c8513 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,9 +1,15 @@ ## v0.9.0-rc26 [unreleased] +### Features +- [#2301](https://github.com/influxdb/influxdb/pull/2301): Distributed query load balancing and failover + ### Bugfixes - [#2297](https://github.com/influxdb/influxdb/pull/2297): create /var/run during startup. Thanks @neonstalwart. - [#2312](https://github.com/influxdb/influxdb/pull/2312): Re-use httpclient for continuous queries - [#2318](https://github.com/influxdb/influxdb/pull/2318): Remove pointless use of 'done' channel for collectd. +- [#2242](https://github.com/influxdb/influxdb/pull/2242): Distributed Query should balance requests +- [#2243](https://github.com/influxdb/influxdb/pull/2243): Use Limit Reader instead of fixed 1MB/1GB slice for DQ +- [#2190](https://github.com/influxdb/influxdb/pull/2190): Implement failover to other data nodes for distributed queries ## v0.9.0-rc25 [2015-04-15] diff --git a/balancer.go b/balancer.go new file mode 100644 index 00000000000..aab20564b60 --- /dev/null +++ b/balancer.go @@ -0,0 +1,77 @@ +package influxdb + +import ( + "math/rand" + "time" +) + +// Balancer represents a load-balancing algorithm for a set of DataNodes +type Balancer interface { + // Next returns the next DataNode according to the balancing method + // or nil if there are no nodes available + Next() *DataNode +} + +type dataNodeBalancer struct { + dataNodes []*DataNode // data nodes to balance between + p int // current node index +} + +// NewDataNodeBalancer create a shuffled, round-robin balancer so that +// multiple instances will return nodes in randomized order and each +// each returned DataNode will be repeated in a cycle +func NewDataNodeBalancer(dataNodes []*DataNode) Balancer { + // make a copy of the dataNode slice so we can randomize it + // without affecting the original instance as well as ensure + // that each Balancer returns nodes in a different order + nodes := make([]*DataNode, len(dataNodes)) + copy(nodes, dataNodes) + + b := &dataNodeBalancer{ + dataNodes: nodes, + } + b.shuffle() + return b +} + +// shuffle randomizes the ordering the balancers available DataNodes +func (b *dataNodeBalancer) shuffle() { + for i := range b.dataNodes { + j := rand.Intn(i + 1) + b.dataNodes[i], b.dataNodes[j] = b.dataNodes[j], b.dataNodes[i] + } +} + +// online returns a slice of the DataNodes that are online +func (b *dataNodeBalancer) online() []*DataNode { + now := time.Now().UTC() + up := []*DataNode{} + for _, n := range b.dataNodes { + if n.OfflineUntil.After(now) { + continue + } + up = append(up, n) + } + return up +} + +// Next returns the next available DataNode +func (b *dataNodeBalancer) Next() *DataNode { + // only use online nodes + up := b.online() + + // no nodes online + if len(up) == 0 { + return nil + } + + // rollover back to the beginning + if b.p >= len(up) { + b.p = 0 + } + + d := up[b.p] + b.p += 1 + + return d +} diff --git a/balancer_test.go b/balancer_test.go new file mode 100644 index 00000000000..b926763171c --- /dev/null +++ b/balancer_test.go @@ -0,0 +1,109 @@ +package influxdb_test + +import ( + "fmt" + "net/url" + "testing" + + "github.com/influxdb/influxdb" +) + +func newDataNodes() []*influxdb.DataNode { + nodes := []*influxdb.DataNode{} + for i := 1; i <= 2; i++ { + u, _ := url.Parse(fmt.Sprintf("http://localhost:999%d", i)) + nodes = append(nodes, &influxdb.DataNode{ID: uint64(i), URL: u}) + } + return nodes +} + +func TestBalancerEmptyNodes(t *testing.T) { + b := influxdb.NewDataNodeBalancer([]*influxdb.DataNode{}) + got := b.Next() + if got != nil { + t.Errorf("expected nil, got %v", got) + } +} + +func TestBalancerUp(t *testing.T) { + nodes := newDataNodes() + b := influxdb.NewDataNodeBalancer(nodes) + + // First node in randomized round-robin order + first := b.Next() + if first == nil { + t.Errorf("expected datanode, got %v", first) + } + + // Second node in randomized round-robin order + second := b.Next() + if second == nil { + t.Errorf("expected datanode, got %v", second) + } + + // Should never get the same node in order twice + if first.ID == second.ID { + t.Errorf("expected first != second. got %v = %v", first.ID, second.ID) + } +} + +func TestBalancerDown(t *testing.T) { + nodes := newDataNodes() + b := influxdb.NewDataNodeBalancer(nodes) + + nodes[0].Down() + + // First node in randomized round-robin order + first := b.Next() + if first == nil { + t.Errorf("expected datanode, got %v", first) + } + + // Second node should rollover to the first up node + second := b.Next() + if second == nil { + t.Errorf("expected datanode, got %v", second) + } + + // Health node should be returned each time + if first.ID != 2 && first.ID != second.ID { + t.Errorf("expected first != second. got %v = %v", first.ID, second.ID) + } +} + +func TestBalancerBackUp(t *testing.T) { + nodes := newDataNodes() + b := influxdb.NewDataNodeBalancer(nodes) + + nodes[0].Down() + + for i := 0; i < 3; i++ { + got := b.Next() + if got == nil { + t.Errorf("expected datanode, got %v", got) + } + + if exp := uint64(2); got.ID != exp { + t.Errorf("wrong node id: exp %v, got %v", exp, got.ID) + } + } + + nodes[0].Up() + + // First node in randomized round-robin order + first := b.Next() + if first == nil { + t.Errorf("expected datanode, got %v", first) + } + + // Second node should rollover to the first up node + second := b.Next() + if second == nil { + t.Errorf("expected datanode, got %v", second) + } + + // Should get both nodes returned + if first.ID == second.ID { + t.Errorf("expected first != second. got %v = %v", first.ID, second.ID) + } +} diff --git a/cmd/influxd/run.go b/cmd/influxd/run.go index 2eaaf28a202..e48408597a3 100644 --- a/cmd/influxd/run.go +++ b/cmd/influxd/run.go @@ -279,6 +279,7 @@ func (cmd *RunCommand) Open(config *Config, join string) *Node { //FIXME: Need to also pass in dataURLs to bootstrap a data node s = cmd.openServer(joinURLs) + cmd.node.DataNode = s s.SetAuthenticationEnabled(cmd.config.Authentication.Enabled) log.Printf("authentication enabled: %v\n", cmd.config.Authentication.Enabled) @@ -568,7 +569,6 @@ func (cmd *RunCommand) openServer(joinURLs []url.URL) *influxdb.Server { s.ComputeNoMoreThan = time.Duration(cmd.config.ContinuousQuery.ComputeNoMoreThan) s.Version = version s.CommitHash = commit - cmd.node.DataNode = s // Open server with data directory and broker client. if err := s.Open(cmd.config.Data.Dir, c); err != nil { diff --git a/cmd/influxd/server_integration_test.go b/cmd/influxd/server_integration_test.go index e05f6173780..3ef19ccc355 100644 --- a/cmd/influxd/server_integration_test.go +++ b/cmd/influxd/server_integration_test.go @@ -176,6 +176,7 @@ func write(t *testing.T, node *TestNode, data string) { if err != nil { t.Fatalf("Couldn't write data: %s", err) } + defer resp.Body.Close() body, _ := ioutil.ReadAll(resp.Body) fmt.Println("BODY: ", string(body)) if resp.StatusCode != http.StatusOK { @@ -1399,7 +1400,28 @@ func Test3NodeServer(t *testing.T) { runTestsData(t, testName, nodes, "mydb", "myrp", len(nodes)) runTest_rawDataReturnsInOrder(t, testName, nodes, "mydb", "myrp", len(nodes)) +} + +func Test3NodeServerFailover(t *testing.T) { + testName := "3-node server integration" + + if testing.Short() { + t.Skip(fmt.Sprintf("skipping '%s'", testName)) + } + dir := tempfile() + defer func() { + os.RemoveAll(dir) + }() + + nodes := createCombinedNodeCluster(t, testName, dir, 3, nil) + + // kill the last node, cluster should expect it to be there + nodes[2].node.Close() + nodes = nodes[:len(nodes)-1] + runTestsData(t, testName, nodes, "mydb", "myrp", len(nodes)) + runTest_rawDataReturnsInOrder(t, testName, nodes, "mydb", "myrp", len(nodes)) + nodes.Close() } // ensure that all queries work if there are more nodes in a cluster than the replication factor diff --git a/influxdb.go b/influxdb.go index 982fd6bb49f..5f990c086f8 100644 --- a/influxdb.go +++ b/influxdb.go @@ -33,6 +33,9 @@ var ( // ErrDataNodeURLRequired is returned when creating a data node without a URL. ErrDataNodeURLRequired = errors.New("data node url required") + // ErrNoDataNodeAvailable is returned when there are no data nodes available + ErrNoDataNodeAvailable = errors.New("data node not available") + // ErrDataNodeExists is returned when creating a duplicate data node. ErrDataNodeExists = errors.New("data node exists") diff --git a/remote_mapper.go b/remote_mapper.go index cd56a9909f6..54e76bfd785 100644 --- a/remote_mapper.go +++ b/remote_mapper.go @@ -4,23 +4,25 @@ import ( "bytes" "encoding/json" "errors" + "io" "net/http" "github.com/influxdb/influxdb/influxql" ) const ( - MAX_MAP_RESPONSE_SIZE = 1024 * 1024 + MAX_MAP_RESPONSE_SIZE = 1024 * 1024 * 1024 ) // RemoteMapper implements the influxql.Mapper interface. The engine uses the remote mapper // to pull map results from shards that only exist on other servers in the cluster. type RemoteMapper struct { - dataNodes []*DataNode + dataNodes Balancer resp *http.Response results chan interface{} unmarshal influxql.UnmarshalFunc complete bool + decoder *json.Decoder Call string `json:",omitempty"` Database string `json:",omitempty"` @@ -77,12 +79,28 @@ func (m *RemoteMapper) Begin(c *influxql.Call, startingTime int64, chunkSize int return err } - // request to start streaming results - resp, err := http.Post(m.dataNodes[0].URL.String()+"/data/run_mapper", "application/json", bytes.NewReader(b)) - if err != nil { - return err + var resp *http.Response + for { + node := m.dataNodes.Next() + if node == nil { + // no data nodes are available to service this query + return ErrNoDataNodeAvailable + } + + // request to start streaming results + resp, err = http.Post(node.URL.String()+"/data/run_mapper", "application/json", bytes.NewReader(b)) + if err != nil { + node.Down() + continue + } + // Mark the node as up + node.Up() + break } + m.resp = resp + lr := io.LimitReader(m.resp.Body, MAX_MAP_RESPONSE_SIZE) + m.decoder = json.NewDecoder(lr) return nil } @@ -94,19 +112,8 @@ func (m *RemoteMapper) NextInterval() (interface{}, error) { return nil, nil } - // read the chunk - chunk := make([]byte, MAX_MAP_RESPONSE_SIZE, MAX_MAP_RESPONSE_SIZE) - n, err := m.resp.Body.Read(chunk) - if err != nil { - return nil, err - } - if n == 0 { - return nil, nil - } - - // marshal the response mr := &MapResponse{} - err = json.Unmarshal(chunk[:n], mr) + err := m.decoder.Decode(&mr) if err != nil { return nil, err } diff --git a/server.go b/server.go index c1b32a0d161..1b1b6a2287b 100644 --- a/server.go +++ b/server.go @@ -3524,8 +3524,53 @@ type MessagingConn interface { // DataNode represents a data node in the cluster. type DataNode struct { + mu sync.RWMutex ID uint64 URL *url.URL + + // downCount is the number of times the DataNode has been marked as down + downCount uint + + // OfflineUntil is the time when the DataNode will no longer be consider down + OfflineUntil time.Time +} + +// Down marks the DataNode as offline for a period of time. Each successive +// call to Down will exponentially extend the offline time with a maximum +// offline time of 5 minutes. +func (d *DataNode) Down() { + d.mu.Lock() + defer d.mu.Unlock() + + // Clamp the timeout to 5 mins max + t := 2 << d.downCount + if t > 300 { + t = 300 + } + d.OfflineUntil = time.Now().Add(time.Duration(t) * time.Second) + d.downCount += 1 + + log.Printf("data node %s marked offline for %ds", d.URL.String(), t) +} + +// Up marks this DataNode as online if it was currently down +func (d *DataNode) Up() { + d.mu.RLock() + if d.downCount != 0 { + // Upgrade to a write lock + d.mu.RUnlock() + d.mu.Lock() + + // Reset state to online + d.downCount = 0 + d.OfflineUntil = time.Now() + + d.mu.Unlock() + + log.Printf("data node %s marked online", d.URL.String()) + return + } + d.mu.RUnlock() } // newDataNode returns an instance of DataNode. diff --git a/tx.go b/tx.go index 65a752e7915..e0fb85d331c 100644 --- a/tx.go +++ b/tx.go @@ -155,8 +155,10 @@ func (tx *tx) CreateMapReduceJobs(stmt *influxql.SelectStatement, tagKeys []stri return nil, ErrShardNotFound } + balancer := NewDataNodeBalancer(nodes) + mapper = &RemoteMapper{ - dataNodes: nodes, + dataNodes: balancer, Database: mm.Database, MeasurementName: m.Name, TMin: tmin.UnixNano(),