Skip to content

Commit

Permalink
Merge pull request #2301 from influxdb/races
Browse files Browse the repository at this point in the history
Distributed query load balancing and failover
  • Loading branch information
jwilder committed Apr 17, 2015
2 parents 033ab7a + 5098774 commit 8ee8218
Show file tree
Hide file tree
Showing 9 changed files with 291 additions and 20 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
## v0.9.0-rc26 [unreleased]

### Features
- [#2301](https://github.com/influxdb/influxdb/pull/2301): Distributed query load balancing and failover

### Bugfixes
- [#2297](https://github.com/influxdb/influxdb/pull/2297): create /var/run during startup. Thanks @neonstalwart.
- [#2312](https://github.com/influxdb/influxdb/pull/2312): Re-use httpclient for continuous queries
- [#2318](https://github.com/influxdb/influxdb/pull/2318): Remove pointless use of 'done' channel for collectd.
- [#2242](https://github.com/influxdb/influxdb/pull/2242): Distributed Query should balance requests
- [#2243](https://github.com/influxdb/influxdb/pull/2243): Use Limit Reader instead of fixed 1MB/1GB slice for DQ
- [#2190](https://github.com/influxdb/influxdb/pull/2190): Implement failover to other data nodes for distributed queries

## v0.9.0-rc25 [2015-04-15]

Expand Down
77 changes: 77 additions & 0 deletions balancer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package influxdb

import (
"math/rand"
"time"
)

// Balancer represents a load-balancing algorithm for a set of DataNodes
type Balancer interface {
// Next returns the next DataNode according to the balancing method
// or nil if there are no nodes available
Next() *DataNode
}

type dataNodeBalancer struct {
dataNodes []*DataNode // data nodes to balance between
p int // current node index
}

// NewDataNodeBalancer create a shuffled, round-robin balancer so that
// multiple instances will return nodes in randomized order and each
// each returned DataNode will be repeated in a cycle
func NewDataNodeBalancer(dataNodes []*DataNode) Balancer {
// make a copy of the dataNode slice so we can randomize it
// without affecting the original instance as well as ensure
// that each Balancer returns nodes in a different order
nodes := make([]*DataNode, len(dataNodes))
copy(nodes, dataNodes)

b := &dataNodeBalancer{
dataNodes: nodes,
}
b.shuffle()
return b
}

// shuffle randomizes the ordering the balancers available DataNodes
func (b *dataNodeBalancer) shuffle() {
for i := range b.dataNodes {
j := rand.Intn(i + 1)
b.dataNodes[i], b.dataNodes[j] = b.dataNodes[j], b.dataNodes[i]
}
}

// online returns a slice of the DataNodes that are online
func (b *dataNodeBalancer) online() []*DataNode {
now := time.Now().UTC()
up := []*DataNode{}
for _, n := range b.dataNodes {
if n.OfflineUntil.After(now) {
continue
}
up = append(up, n)
}
return up
}

// Next returns the next available DataNode
func (b *dataNodeBalancer) Next() *DataNode {
// only use online nodes
up := b.online()

// no nodes online
if len(up) == 0 {
return nil
}

// rollover back to the beginning
if b.p >= len(up) {
b.p = 0
}

d := up[b.p]
b.p += 1

return d
}
109 changes: 109 additions & 0 deletions balancer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package influxdb_test

import (
"fmt"
"net/url"
"testing"

"github.com/influxdb/influxdb"
)

func newDataNodes() []*influxdb.DataNode {
nodes := []*influxdb.DataNode{}
for i := 1; i <= 2; i++ {
u, _ := url.Parse(fmt.Sprintf("http://localhost:999%d", i))
nodes = append(nodes, &influxdb.DataNode{ID: uint64(i), URL: u})
}
return nodes
}

func TestBalancerEmptyNodes(t *testing.T) {
b := influxdb.NewDataNodeBalancer([]*influxdb.DataNode{})
got := b.Next()
if got != nil {
t.Errorf("expected nil, got %v", got)
}
}

func TestBalancerUp(t *testing.T) {
nodes := newDataNodes()
b := influxdb.NewDataNodeBalancer(nodes)

// First node in randomized round-robin order
first := b.Next()
if first == nil {
t.Errorf("expected datanode, got %v", first)
}

// Second node in randomized round-robin order
second := b.Next()
if second == nil {
t.Errorf("expected datanode, got %v", second)
}

// Should never get the same node in order twice
if first.ID == second.ID {
t.Errorf("expected first != second. got %v = %v", first.ID, second.ID)
}
}

func TestBalancerDown(t *testing.T) {
nodes := newDataNodes()
b := influxdb.NewDataNodeBalancer(nodes)

nodes[0].Down()

// First node in randomized round-robin order
first := b.Next()
if first == nil {
t.Errorf("expected datanode, got %v", first)
}

// Second node should rollover to the first up node
second := b.Next()
if second == nil {
t.Errorf("expected datanode, got %v", second)
}

// Health node should be returned each time
if first.ID != 2 && first.ID != second.ID {
t.Errorf("expected first != second. got %v = %v", first.ID, second.ID)
}
}

func TestBalancerBackUp(t *testing.T) {
nodes := newDataNodes()
b := influxdb.NewDataNodeBalancer(nodes)

nodes[0].Down()

for i := 0; i < 3; i++ {
got := b.Next()
if got == nil {
t.Errorf("expected datanode, got %v", got)
}

if exp := uint64(2); got.ID != exp {
t.Errorf("wrong node id: exp %v, got %v", exp, got.ID)
}
}

nodes[0].Up()

// First node in randomized round-robin order
first := b.Next()
if first == nil {
t.Errorf("expected datanode, got %v", first)
}

// Second node should rollover to the first up node
second := b.Next()
if second == nil {
t.Errorf("expected datanode, got %v", second)
}

// Should get both nodes returned
if first.ID == second.ID {
t.Errorf("expected first != second. got %v = %v", first.ID, second.ID)
}
}
2 changes: 1 addition & 1 deletion cmd/influxd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ func (cmd *RunCommand) Open(config *Config, join string) *Node {

//FIXME: Need to also pass in dataURLs to bootstrap a data node
s = cmd.openServer(joinURLs)
cmd.node.DataNode = s
s.SetAuthenticationEnabled(cmd.config.Authentication.Enabled)
log.Printf("authentication enabled: %v\n", cmd.config.Authentication.Enabled)

Expand Down Expand Up @@ -568,7 +569,6 @@ func (cmd *RunCommand) openServer(joinURLs []url.URL) *influxdb.Server {
s.ComputeNoMoreThan = time.Duration(cmd.config.ContinuousQuery.ComputeNoMoreThan)
s.Version = version
s.CommitHash = commit
cmd.node.DataNode = s

// Open server with data directory and broker client.
if err := s.Open(cmd.config.Data.Dir, c); err != nil {
Expand Down
22 changes: 22 additions & 0 deletions cmd/influxd/server_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ func write(t *testing.T, node *TestNode, data string) {
if err != nil {
t.Fatalf("Couldn't write data: %s", err)
}
defer resp.Body.Close()
body, _ := ioutil.ReadAll(resp.Body)
fmt.Println("BODY: ", string(body))
if resp.StatusCode != http.StatusOK {
Expand Down Expand Up @@ -1399,7 +1400,28 @@ func Test3NodeServer(t *testing.T) {

runTestsData(t, testName, nodes, "mydb", "myrp", len(nodes))
runTest_rawDataReturnsInOrder(t, testName, nodes, "mydb", "myrp", len(nodes))
}

func Test3NodeServerFailover(t *testing.T) {
testName := "3-node server integration"

if testing.Short() {
t.Skip(fmt.Sprintf("skipping '%s'", testName))
}
dir := tempfile()
defer func() {
os.RemoveAll(dir)
}()

nodes := createCombinedNodeCluster(t, testName, dir, 3, nil)

// kill the last node, cluster should expect it to be there
nodes[2].node.Close()
nodes = nodes[:len(nodes)-1]

runTestsData(t, testName, nodes, "mydb", "myrp", len(nodes))
runTest_rawDataReturnsInOrder(t, testName, nodes, "mydb", "myrp", len(nodes))
nodes.Close()
}

// ensure that all queries work if there are more nodes in a cluster than the replication factor
Expand Down
3 changes: 3 additions & 0 deletions influxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ var (
// ErrDataNodeURLRequired is returned when creating a data node without a URL.
ErrDataNodeURLRequired = errors.New("data node url required")

// ErrNoDataNodeAvailable is returned when there are no data nodes available
ErrNoDataNodeAvailable = errors.New("data node not available")

// ErrDataNodeExists is returned when creating a duplicate data node.
ErrDataNodeExists = errors.New("data node exists")

Expand Down
43 changes: 25 additions & 18 deletions remote_mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,25 @@ import (
"bytes"
"encoding/json"
"errors"
"io"
"net/http"

"github.com/influxdb/influxdb/influxql"
)

const (
MAX_MAP_RESPONSE_SIZE = 1024 * 1024
MAX_MAP_RESPONSE_SIZE = 1024 * 1024 * 1024
)

// RemoteMapper implements the influxql.Mapper interface. The engine uses the remote mapper
// to pull map results from shards that only exist on other servers in the cluster.
type RemoteMapper struct {
dataNodes []*DataNode
dataNodes Balancer
resp *http.Response
results chan interface{}
unmarshal influxql.UnmarshalFunc
complete bool
decoder *json.Decoder

Call string `json:",omitempty"`
Database string `json:",omitempty"`
Expand Down Expand Up @@ -77,12 +79,28 @@ func (m *RemoteMapper) Begin(c *influxql.Call, startingTime int64, chunkSize int
return err
}

// request to start streaming results
resp, err := http.Post(m.dataNodes[0].URL.String()+"/data/run_mapper", "application/json", bytes.NewReader(b))
if err != nil {
return err
var resp *http.Response
for {
node := m.dataNodes.Next()
if node == nil {
// no data nodes are available to service this query
return ErrNoDataNodeAvailable
}

// request to start streaming results
resp, err = http.Post(node.URL.String()+"/data/run_mapper", "application/json", bytes.NewReader(b))
if err != nil {
node.Down()
continue
}
// Mark the node as up
node.Up()
break
}

m.resp = resp
lr := io.LimitReader(m.resp.Body, MAX_MAP_RESPONSE_SIZE)
m.decoder = json.NewDecoder(lr)

return nil
}
Expand All @@ -94,19 +112,8 @@ func (m *RemoteMapper) NextInterval() (interface{}, error) {
return nil, nil
}

// read the chunk
chunk := make([]byte, MAX_MAP_RESPONSE_SIZE, MAX_MAP_RESPONSE_SIZE)
n, err := m.resp.Body.Read(chunk)
if err != nil {
return nil, err
}
if n == 0 {
return nil, nil
}

// marshal the response
mr := &MapResponse{}
err = json.Unmarshal(chunk[:n], mr)
err := m.decoder.Decode(&mr)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 8ee8218

Please sign in to comment.