From 69944622bcc40abbe5125cf4f69b02d15b83737c Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Tue, 14 Apr 2015 12:03:55 -0600 Subject: [PATCH 1/6] Don't set data node until after it has joined or initialized MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit By setting it, data node requests can be served by the http handler before the data node is actually ready. Possible fix for: 2015/04/14 11:33:54 http: panic serving 10.0.1.8:62661: runtime error: invalid memory address or nil pointer dereference goroutine 11467 [running]: net/http.funcĀ·011() /usr/local/go/src/net/http/server.go:1130 +0xcc github.com/influxdb/influxdb.(*Server).broadcast(0xc20805cc00, 0xc208220000, 0x5d25e0, 0xc208869e80, 0x0, 0x0, 0x0) /Users/jason/go/src/github.com/influxdb/influxdb/server.go:568 +0x227 github.com/influxdb/influxdb.(*Server).CreateDataNode(0xc20805cc00, 0xc2081c6e70, 0x0, 0x0) /Users/jason/go/src/github.com/influxdb/influxdb/server.go:859 +0xe6 github.com/influxdb/influxdb/httpd.(*Handler).serveCreateDataNode(0xc20842ea00, 0x19378c0, 0xc2082207e0, 0xc2083191e0) --- cmd/influxd/run.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/influxd/run.go b/cmd/influxd/run.go index 2eaaf28a202..e48408597a3 100644 --- a/cmd/influxd/run.go +++ b/cmd/influxd/run.go @@ -279,6 +279,7 @@ func (cmd *RunCommand) Open(config *Config, join string) *Node { //FIXME: Need to also pass in dataURLs to bootstrap a data node s = cmd.openServer(joinURLs) + cmd.node.DataNode = s s.SetAuthenticationEnabled(cmd.config.Authentication.Enabled) log.Printf("authentication enabled: %v\n", cmd.config.Authentication.Enabled) @@ -568,7 +569,6 @@ func (cmd *RunCommand) openServer(joinURLs []url.URL) *influxdb.Server { s.ComputeNoMoreThan = time.Duration(cmd.config.ContinuousQuery.ComputeNoMoreThan) s.Version = version s.CommitHash = commit - cmd.node.DataNode = s // Open server with data directory and broker client. if err := s.Open(cmd.config.Data.Dir, c); err != nil { From fbaa37a5efd37ac4b386402efc8a18d693bf4d4b Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Tue, 14 Apr 2015 14:27:17 -0600 Subject: [PATCH 2/6] Close resp body during write tests --- cmd/influxd/server_integration_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cmd/influxd/server_integration_test.go b/cmd/influxd/server_integration_test.go index e05f6173780..ea61575ca4d 100644 --- a/cmd/influxd/server_integration_test.go +++ b/cmd/influxd/server_integration_test.go @@ -176,6 +176,7 @@ func write(t *testing.T, node *TestNode, data string) { if err != nil { t.Fatalf("Couldn't write data: %s", err) } + defer resp.Body.Close() body, _ := ioutil.ReadAll(resp.Body) fmt.Println("BODY: ", string(body)) if resp.StatusCode != http.StatusOK { From f18dbf4e49b248fc158fcc6392d5158a624a9270 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Tue, 14 Apr 2015 14:36:53 -0600 Subject: [PATCH 3/6] Use Limit Reader instead of fixed 1MB/1GB slice for DQ Fixes #2243 --- remote_mapper.go | 19 ++++++------------- 1 file changed, 6 insertions(+), 13 deletions(-) diff --git a/remote_mapper.go b/remote_mapper.go index cd56a9909f6..a1b8c9dd4b2 100644 --- a/remote_mapper.go +++ b/remote_mapper.go @@ -4,13 +4,14 @@ import ( "bytes" "encoding/json" "errors" + "io" "net/http" "github.com/influxdb/influxdb/influxql" ) const ( - MAX_MAP_RESPONSE_SIZE = 1024 * 1024 + MAX_MAP_RESPONSE_SIZE = 1024 * 1024 * 1024 ) // RemoteMapper implements the influxql.Mapper interface. The engine uses the remote mapper @@ -21,6 +22,7 @@ type RemoteMapper struct { results chan interface{} unmarshal influxql.UnmarshalFunc complete bool + decoder *json.Decoder Call string `json:",omitempty"` Database string `json:",omitempty"` @@ -83,6 +85,8 @@ func (m *RemoteMapper) Begin(c *influxql.Call, startingTime int64, chunkSize int return err } m.resp = resp + lr := io.LimitReader(m.resp.Body, MAX_MAP_RESPONSE_SIZE) + m.decoder = json.NewDecoder(lr) return nil } @@ -94,19 +98,8 @@ func (m *RemoteMapper) NextInterval() (interface{}, error) { return nil, nil } - // read the chunk - chunk := make([]byte, MAX_MAP_RESPONSE_SIZE, MAX_MAP_RESPONSE_SIZE) - n, err := m.resp.Body.Read(chunk) - if err != nil { - return nil, err - } - if n == 0 { - return nil, nil - } - - // marshal the response mr := &MapResponse{} - err = json.Unmarshal(chunk[:n], mr) + err := m.decoder.Decode(&mr) if err != nil { return nil, err } From c52dfce8974e0a88b9cb6b32c03e3e3aeca11da5 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Wed, 15 Apr 2015 14:27:55 -0600 Subject: [PATCH 4/6] Load balance distributed queries across data nodes Adds a Balancer interface to allow RemoteMappers to send data node requests to multiple nodes. It also provides the ability to failed requests to mark the data node as offline using exponential backoff with a 5 min max wait time. Fixes #2242 --- balancer.go | 77 +++++++++++++++++++++++++++++++++ balancer_test.go | 109 +++++++++++++++++++++++++++++++++++++++++++++++ influxdb.go | 3 ++ remote_mapper.go | 13 +++++- server.go | 45 +++++++++++++++++++ tx.go | 4 +- 6 files changed, 248 insertions(+), 3 deletions(-) create mode 100644 balancer.go create mode 100644 balancer_test.go diff --git a/balancer.go b/balancer.go new file mode 100644 index 00000000000..aab20564b60 --- /dev/null +++ b/balancer.go @@ -0,0 +1,77 @@ +package influxdb + +import ( + "math/rand" + "time" +) + +// Balancer represents a load-balancing algorithm for a set of DataNodes +type Balancer interface { + // Next returns the next DataNode according to the balancing method + // or nil if there are no nodes available + Next() *DataNode +} + +type dataNodeBalancer struct { + dataNodes []*DataNode // data nodes to balance between + p int // current node index +} + +// NewDataNodeBalancer create a shuffled, round-robin balancer so that +// multiple instances will return nodes in randomized order and each +// each returned DataNode will be repeated in a cycle +func NewDataNodeBalancer(dataNodes []*DataNode) Balancer { + // make a copy of the dataNode slice so we can randomize it + // without affecting the original instance as well as ensure + // that each Balancer returns nodes in a different order + nodes := make([]*DataNode, len(dataNodes)) + copy(nodes, dataNodes) + + b := &dataNodeBalancer{ + dataNodes: nodes, + } + b.shuffle() + return b +} + +// shuffle randomizes the ordering the balancers available DataNodes +func (b *dataNodeBalancer) shuffle() { + for i := range b.dataNodes { + j := rand.Intn(i + 1) + b.dataNodes[i], b.dataNodes[j] = b.dataNodes[j], b.dataNodes[i] + } +} + +// online returns a slice of the DataNodes that are online +func (b *dataNodeBalancer) online() []*DataNode { + now := time.Now().UTC() + up := []*DataNode{} + for _, n := range b.dataNodes { + if n.OfflineUntil.After(now) { + continue + } + up = append(up, n) + } + return up +} + +// Next returns the next available DataNode +func (b *dataNodeBalancer) Next() *DataNode { + // only use online nodes + up := b.online() + + // no nodes online + if len(up) == 0 { + return nil + } + + // rollover back to the beginning + if b.p >= len(up) { + b.p = 0 + } + + d := up[b.p] + b.p += 1 + + return d +} diff --git a/balancer_test.go b/balancer_test.go new file mode 100644 index 00000000000..b926763171c --- /dev/null +++ b/balancer_test.go @@ -0,0 +1,109 @@ +package influxdb_test + +import ( + "fmt" + "net/url" + "testing" + + "github.com/influxdb/influxdb" +) + +func newDataNodes() []*influxdb.DataNode { + nodes := []*influxdb.DataNode{} + for i := 1; i <= 2; i++ { + u, _ := url.Parse(fmt.Sprintf("http://localhost:999%d", i)) + nodes = append(nodes, &influxdb.DataNode{ID: uint64(i), URL: u}) + } + return nodes +} + +func TestBalancerEmptyNodes(t *testing.T) { + b := influxdb.NewDataNodeBalancer([]*influxdb.DataNode{}) + got := b.Next() + if got != nil { + t.Errorf("expected nil, got %v", got) + } +} + +func TestBalancerUp(t *testing.T) { + nodes := newDataNodes() + b := influxdb.NewDataNodeBalancer(nodes) + + // First node in randomized round-robin order + first := b.Next() + if first == nil { + t.Errorf("expected datanode, got %v", first) + } + + // Second node in randomized round-robin order + second := b.Next() + if second == nil { + t.Errorf("expected datanode, got %v", second) + } + + // Should never get the same node in order twice + if first.ID == second.ID { + t.Errorf("expected first != second. got %v = %v", first.ID, second.ID) + } +} + +func TestBalancerDown(t *testing.T) { + nodes := newDataNodes() + b := influxdb.NewDataNodeBalancer(nodes) + + nodes[0].Down() + + // First node in randomized round-robin order + first := b.Next() + if first == nil { + t.Errorf("expected datanode, got %v", first) + } + + // Second node should rollover to the first up node + second := b.Next() + if second == nil { + t.Errorf("expected datanode, got %v", second) + } + + // Health node should be returned each time + if first.ID != 2 && first.ID != second.ID { + t.Errorf("expected first != second. got %v = %v", first.ID, second.ID) + } +} + +func TestBalancerBackUp(t *testing.T) { + nodes := newDataNodes() + b := influxdb.NewDataNodeBalancer(nodes) + + nodes[0].Down() + + for i := 0; i < 3; i++ { + got := b.Next() + if got == nil { + t.Errorf("expected datanode, got %v", got) + } + + if exp := uint64(2); got.ID != exp { + t.Errorf("wrong node id: exp %v, got %v", exp, got.ID) + } + } + + nodes[0].Up() + + // First node in randomized round-robin order + first := b.Next() + if first == nil { + t.Errorf("expected datanode, got %v", first) + } + + // Second node should rollover to the first up node + second := b.Next() + if second == nil { + t.Errorf("expected datanode, got %v", second) + } + + // Should get both nodes returned + if first.ID == second.ID { + t.Errorf("expected first != second. got %v = %v", first.ID, second.ID) + } +} diff --git a/influxdb.go b/influxdb.go index 982fd6bb49f..5f990c086f8 100644 --- a/influxdb.go +++ b/influxdb.go @@ -33,6 +33,9 @@ var ( // ErrDataNodeURLRequired is returned when creating a data node without a URL. ErrDataNodeURLRequired = errors.New("data node url required") + // ErrNoDataNodeAvailable is returned when there are no data nodes available + ErrNoDataNodeAvailable = errors.New("data node not available") + // ErrDataNodeExists is returned when creating a duplicate data node. ErrDataNodeExists = errors.New("data node exists") diff --git a/remote_mapper.go b/remote_mapper.go index a1b8c9dd4b2..80012f58988 100644 --- a/remote_mapper.go +++ b/remote_mapper.go @@ -17,7 +17,7 @@ const ( // RemoteMapper implements the influxql.Mapper interface. The engine uses the remote mapper // to pull map results from shards that only exist on other servers in the cluster. type RemoteMapper struct { - dataNodes []*DataNode + dataNodes Balancer resp *http.Response results chan interface{} unmarshal influxql.UnmarshalFunc @@ -79,11 +79,20 @@ func (m *RemoteMapper) Begin(c *influxql.Call, startingTime int64, chunkSize int return err } + node := m.dataNodes.Next() + if node == nil { + // no data nodes are available to service this query + return ErrNoDataNodeAvailable + } + // request to start streaming results - resp, err := http.Post(m.dataNodes[0].URL.String()+"/data/run_mapper", "application/json", bytes.NewReader(b)) + resp, err := http.Post(node.URL.String()+"/data/run_mapper", "application/json", bytes.NewReader(b)) if err != nil { + node.Down() return err } + // Mark it as up + node.Up() m.resp = resp lr := io.LimitReader(m.resp.Body, MAX_MAP_RESPONSE_SIZE) m.decoder = json.NewDecoder(lr) diff --git a/server.go b/server.go index c1b32a0d161..b455e378ae3 100644 --- a/server.go +++ b/server.go @@ -3524,8 +3524,53 @@ type MessagingConn interface { // DataNode represents a data node in the cluster. type DataNode struct { + mu sync.RWMutex ID uint64 URL *url.URL + + // downCount is the number of times the DataNode has been marked as down + downCount uint + + // OfflineUntil is the time when the DataNode will no longer be consider down + OfflineUntil time.Time +} + +// Down marks the DataNode as offline for a period of time. Each successive +// call to Down will exponentially extend the offline time with a maximum +// offline time of 5 minutes. +func (d *DataNode) Down() { + d.mu.Lock() + defer d.mu.Unlock() + + // Clamp the timeout to 5 mins max + t := 2 << d.downCount + if t > 300 { + t = 300 + } + d.OfflineUntil = time.Now().Add(time.Duration(t) * time.Second) + d.downCount += 1 + + log.Printf("data node %s is offline for %ds", d.URL.String(), t) +} + +// Up marks this DataNode as online if was currently down +func (d *DataNode) Up() { + d.mu.RLock() + if d.downCount != 0 { + // Upgrade to a write lock + d.mu.RUnlock() + d.mu.Lock() + + // Reset state to online + d.downCount = 0 + d.OfflineUntil = time.Now() + + d.mu.Unlock() + + log.Printf("data node %s is online", d.URL.String()) + return + } + d.mu.RUnlock() } // newDataNode returns an instance of DataNode. diff --git a/tx.go b/tx.go index 65a752e7915..e0fb85d331c 100644 --- a/tx.go +++ b/tx.go @@ -155,8 +155,10 @@ func (tx *tx) CreateMapReduceJobs(stmt *influxql.SelectStatement, tagKeys []stri return nil, ErrShardNotFound } + balancer := NewDataNodeBalancer(nodes) + mapper = &RemoteMapper{ - dataNodes: nodes, + dataNodes: balancer, Database: mm.Database, MeasurementName: m.Name, TMin: tmin.UnixNano(), From 8aa0d32b6f5c50512ce2405ec3c7d3a2f9f2bad5 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Wed, 15 Apr 2015 15:26:44 -0600 Subject: [PATCH 5/6] Add failover to other data nodes for distributed queries Fixes #2190 --- cmd/influxd/server_integration_test.go | 21 +++++++++++++++++++ remote_mapper.go | 29 +++++++++++++++----------- server.go | 6 +++--- 3 files changed, 41 insertions(+), 15 deletions(-) diff --git a/cmd/influxd/server_integration_test.go b/cmd/influxd/server_integration_test.go index ea61575ca4d..3ef19ccc355 100644 --- a/cmd/influxd/server_integration_test.go +++ b/cmd/influxd/server_integration_test.go @@ -1400,7 +1400,28 @@ func Test3NodeServer(t *testing.T) { runTestsData(t, testName, nodes, "mydb", "myrp", len(nodes)) runTest_rawDataReturnsInOrder(t, testName, nodes, "mydb", "myrp", len(nodes)) +} + +func Test3NodeServerFailover(t *testing.T) { + testName := "3-node server integration" + + if testing.Short() { + t.Skip(fmt.Sprintf("skipping '%s'", testName)) + } + dir := tempfile() + defer func() { + os.RemoveAll(dir) + }() + + nodes := createCombinedNodeCluster(t, testName, dir, 3, nil) + + // kill the last node, cluster should expect it to be there + nodes[2].node.Close() + nodes = nodes[:len(nodes)-1] + runTestsData(t, testName, nodes, "mydb", "myrp", len(nodes)) + runTest_rawDataReturnsInOrder(t, testName, nodes, "mydb", "myrp", len(nodes)) + nodes.Close() } // ensure that all queries work if there are more nodes in a cluster than the replication factor diff --git a/remote_mapper.go b/remote_mapper.go index 80012f58988..54e76bfd785 100644 --- a/remote_mapper.go +++ b/remote_mapper.go @@ -79,20 +79,25 @@ func (m *RemoteMapper) Begin(c *influxql.Call, startingTime int64, chunkSize int return err } - node := m.dataNodes.Next() - if node == nil { - // no data nodes are available to service this query - return ErrNoDataNodeAvailable - } + var resp *http.Response + for { + node := m.dataNodes.Next() + if node == nil { + // no data nodes are available to service this query + return ErrNoDataNodeAvailable + } - // request to start streaming results - resp, err := http.Post(node.URL.String()+"/data/run_mapper", "application/json", bytes.NewReader(b)) - if err != nil { - node.Down() - return err + // request to start streaming results + resp, err = http.Post(node.URL.String()+"/data/run_mapper", "application/json", bytes.NewReader(b)) + if err != nil { + node.Down() + continue + } + // Mark the node as up + node.Up() + break } - // Mark it as up - node.Up() + m.resp = resp lr := io.LimitReader(m.resp.Body, MAX_MAP_RESPONSE_SIZE) m.decoder = json.NewDecoder(lr) diff --git a/server.go b/server.go index b455e378ae3..1b1b6a2287b 100644 --- a/server.go +++ b/server.go @@ -3550,10 +3550,10 @@ func (d *DataNode) Down() { d.OfflineUntil = time.Now().Add(time.Duration(t) * time.Second) d.downCount += 1 - log.Printf("data node %s is offline for %ds", d.URL.String(), t) + log.Printf("data node %s marked offline for %ds", d.URL.String(), t) } -// Up marks this DataNode as online if was currently down +// Up marks this DataNode as online if it was currently down func (d *DataNode) Up() { d.mu.RLock() if d.downCount != 0 { @@ -3567,7 +3567,7 @@ func (d *DataNode) Up() { d.mu.Unlock() - log.Printf("data node %s is online", d.URL.String()) + log.Printf("data node %s marked online", d.URL.String()) return } d.mu.RUnlock() From 5098774fc3d12879f53d9c0b5365a151bf15f9bf Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Fri, 17 Apr 2015 11:53:16 -0600 Subject: [PATCH 6/6] Update change log Add #2301 #2242 #2243 #2190 --- CHANGELOG.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index b86a923a4ed..722df4c8513 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,9 +1,15 @@ ## v0.9.0-rc26 [unreleased] +### Features +- [#2301](https://github.com/influxdb/influxdb/pull/2301): Distributed query load balancing and failover + ### Bugfixes - [#2297](https://github.com/influxdb/influxdb/pull/2297): create /var/run during startup. Thanks @neonstalwart. - [#2312](https://github.com/influxdb/influxdb/pull/2312): Re-use httpclient for continuous queries - [#2318](https://github.com/influxdb/influxdb/pull/2318): Remove pointless use of 'done' channel for collectd. +- [#2242](https://github.com/influxdb/influxdb/pull/2242): Distributed Query should balance requests +- [#2243](https://github.com/influxdb/influxdb/pull/2243): Use Limit Reader instead of fixed 1MB/1GB slice for DQ +- [#2190](https://github.com/influxdb/influxdb/pull/2190): Implement failover to other data nodes for distributed queries ## v0.9.0-rc25 [2015-04-15]