From d5bd804993d0156fbf18e041bb9a57fa58221428 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Fri, 17 Apr 2015 23:45:49 -0600 Subject: [PATCH 1/4] Handle distributed queries when shards != data nodes Fixes #2272 There was previously a explict panic put in the query engine to prevent queries where the number of shards was not equal to the number of data nodes in the cluster. This was waiting for the distributed queries branch to land but was not removed when that landed. --- tx.go | 100 ++++++++++++++++++++++++++++++---------------------------- 1 file changed, 51 insertions(+), 49 deletions(-) diff --git a/tx.go b/tx.go index 8cabd168bc8..d6db6316162 100644 --- a/tx.go +++ b/tx.go @@ -142,60 +142,62 @@ func (tx *tx) CreateMapReduceJobs(stmt *influxql.SelectStatement, tagKeys []stri // create mappers for each shard we need to hit for _, sg := range shardGroups { - if len(sg.Shards) != 1 { // we'll only have more than 1 shard in a group when RF < # servers in cluster - // TODO: implement distributed queries. - panic("distributed queries not implemented yet and there are too many shards in this group") - } - - shard := sg.Shards[0] - var mapper influxql.Mapper + shards := map[*Shard][]uint64{} + for _, sid := range t.SeriesIDs { + shard := sg.ShardBySeriesID(sid) + shards[shard] = append(shards[shard], sid) + } - // create either a remote or local mapper for this shard - if shard.store == nil { - nodes := tx.server.DataNodesByID(shard.DataNodeIDs) - if len(nodes) == 0 { - return nil, ErrShardNotFound + for shard, _ := range shards { + var mapper influxql.Mapper + + // create either a remote or local mapper for this shard + if shard.store == nil { + nodes := tx.server.DataNodesByID(shard.DataNodeIDs) + if len(nodes) == 0 { + return nil, ErrShardNotFound + } + + balancer := NewDataNodeBalancer(nodes) + + mapper = &RemoteMapper{ + dataNodes: balancer, + Database: mm.Database, + MeasurementName: m.Name, + TMin: tmin.UnixNano(), + TMax: tmax.UnixNano(), + SeriesIDs: t.SeriesIDs, + ShardID: shard.ID, + WhereFields: whereFields, + SelectFields: selectFields, + SelectTags: selectTags, + Limit: stmt.Limit, + Offset: stmt.Offset, + Interval: interval, + } + mapper.(*RemoteMapper).SetFilters(t.Filters) + } else { + mapper = &LocalMapper{ + seriesIDs: t.SeriesIDs, + db: shard.store, + job: job, + decoder: NewFieldCodec(m), + filters: t.Filters, + whereFields: whereFields, + selectFields: selectFields, + selectTags: selectTags, + tmax: tmax.UnixNano(), + interval: interval, + // multiple mappers may need to be merged together to get the results + // for a raw query. So each mapper will have to read at least the + // limit plus the offset in data points to ensure we've hit our mark + limit: uint64(stmt.Limit) + uint64(stmt.Offset), + } } - balancer := NewDataNodeBalancer(nodes) - - mapper = &RemoteMapper{ - dataNodes: balancer, - Database: mm.Database, - MeasurementName: m.Name, - TMin: tmin.UnixNano(), - TMax: tmax.UnixNano(), - SeriesIDs: t.SeriesIDs, - ShardID: shard.ID, - WhereFields: whereFields, - SelectFields: selectFields, - SelectTags: selectTags, - Limit: stmt.Limit, - Offset: stmt.Offset, - Interval: interval, - } - mapper.(*RemoteMapper).SetFilters(t.Filters) - } else { - mapper = &LocalMapper{ - seriesIDs: t.SeriesIDs, - db: shard.store, - job: job, - decoder: NewFieldCodec(m), - filters: t.Filters, - whereFields: whereFields, - selectFields: selectFields, - selectTags: selectTags, - tmax: tmax.UnixNano(), - interval: interval, - // multiple mappers may need to be merged together to get the results - // for a raw query. So each mapper will have to read at least the - // limit plus the offset in data points to ensure we've hit our mark - limit: uint64(stmt.Limit) + uint64(stmt.Offset), - } + mappers = append(mappers, mapper) } - - mappers = append(mappers, mapper) } job.Mappers = mappers From d8fba0a50dc6adc39af86e67ca73bb29cc22ee1c Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Sat, 18 Apr 2015 00:12:07 -0600 Subject: [PATCH 2/4] Close raft log before broker MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closing the broker before the raft log can trigger this panic since the raft log depends on the broker via the FSM. panic: apply: broker apply: broker already closed goroutine 29164 [running]: github.com/influxdb/influxdb/raft.(*Log).applier(0xc20833b040, 0xc20802bd40) /Users/jason/go/src/github.com/influxdb/influxdb/raft/log.go:1386 +0x278 created by github.com/influxdb/influxdb/raft.funcĀ·002 /Users/jason/go/src/github.com/influxdb/influxdb/raft/log.go:389 +0x764 --- cmd/influxd/run.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/cmd/influxd/run.go b/cmd/influxd/run.go index f5aaaf5b2c7..51cda41095e 100644 --- a/cmd/influxd/run.go +++ b/cmd/influxd/run.go @@ -105,14 +105,14 @@ func (s *Node) Close() error { } } - if s.Broker != nil { - if err := s.Broker.Close(); err != nil { + if s.raftLog != nil { + if err := s.raftLog.Close(); err != nil { return err } } - if s.raftLog != nil { - if err := s.raftLog.Close(); err != nil { + if s.Broker != nil { + if err := s.Broker.Close(); err != nil { return err } } From fd4a69855deb7732cfd3b64616b57028d915fa42 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Sat, 18 Apr 2015 23:15:53 -0600 Subject: [PATCH 3/4] Re-enable Test3NodeClusterPartiallyReplicated --- cmd/influxd/server_integration_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/cmd/influxd/server_integration_test.go b/cmd/influxd/server_integration_test.go index a996c9e98e3..4067ee784fa 100644 --- a/cmd/influxd/server_integration_test.go +++ b/cmd/influxd/server_integration_test.go @@ -1458,7 +1458,6 @@ func Test3NodeServerFailover(t *testing.T) { // ensure that all queries work if there are more nodes in a cluster than the replication factor func Test3NodeClusterPartiallyReplicated(t *testing.T) { - t.Skip("Skipping due to instability") t.Parallel() testName := "3-node server integration partial replication" if testing.Short() { From 94f50ac056df58f122d7ad46d2a6c05edf533019 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Sun, 19 Apr 2015 16:46:05 -0600 Subject: [PATCH 4/4] Make 3 node failover test parallel --- cmd/influxd/server_integration_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cmd/influxd/server_integration_test.go b/cmd/influxd/server_integration_test.go index 4067ee784fa..7ab6d3a07cd 100644 --- a/cmd/influxd/server_integration_test.go +++ b/cmd/influxd/server_integration_test.go @@ -1435,7 +1435,8 @@ func Test3NodeServer(t *testing.T) { } func Test3NodeServerFailover(t *testing.T) { - testName := "3-node server integration" + t.Parallel() + testName := "3-node server failover integration" if testing.Short() { t.Skip(fmt.Sprintf("skipping '%s'", testName))