Skip to content

Commit

Permalink
Merge pull request #2336 from influxdb/2272-fix
Browse files Browse the repository at this point in the history
Handle distributed queries when shards != data nodes
  • Loading branch information
toddboom committed Apr 20, 2015
2 parents 3332f09 + 94f50ac commit 30b56ce
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 55 deletions.
8 changes: 4 additions & 4 deletions cmd/influxd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,14 +105,14 @@ func (s *Node) Close() error {
}
}

if s.Broker != nil {
if err := s.Broker.Close(); err != nil {
if s.raftLog != nil {
if err := s.raftLog.Close(); err != nil {
return err
}
}

if s.raftLog != nil {
if err := s.raftLog.Close(); err != nil {
if s.Broker != nil {
if err := s.Broker.Close(); err != nil {
return err
}
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/influxd/server_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1435,7 +1435,8 @@ func Test3NodeServer(t *testing.T) {
}

func Test3NodeServerFailover(t *testing.T) {
testName := "3-node server integration"
t.Parallel()
testName := "3-node server failover integration"

if testing.Short() {
t.Skip(fmt.Sprintf("skipping '%s'", testName))
Expand All @@ -1458,7 +1459,6 @@ func Test3NodeServerFailover(t *testing.T) {

// ensure that all queries work if there are more nodes in a cluster than the replication factor
func Test3NodeClusterPartiallyReplicated(t *testing.T) {
t.Skip("Skipping due to instability")
t.Parallel()
testName := "3-node server integration partial replication"
if testing.Short() {
Expand Down
100 changes: 51 additions & 49 deletions tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,60 +142,62 @@ func (tx *tx) CreateMapReduceJobs(stmt *influxql.SelectStatement, tagKeys []stri

// create mappers for each shard we need to hit
for _, sg := range shardGroups {
if len(sg.Shards) != 1 { // we'll only have more than 1 shard in a group when RF < # servers in cluster
// TODO: implement distributed queries.
panic("distributed queries not implemented yet and there are too many shards in this group")
}

shard := sg.Shards[0]

var mapper influxql.Mapper
shards := map[*Shard][]uint64{}
for _, sid := range t.SeriesIDs {
shard := sg.ShardBySeriesID(sid)
shards[shard] = append(shards[shard], sid)
}

// create either a remote or local mapper for this shard
if shard.store == nil {
nodes := tx.server.DataNodesByID(shard.DataNodeIDs)
if len(nodes) == 0 {
return nil, ErrShardNotFound
for shard, _ := range shards {
var mapper influxql.Mapper

// create either a remote or local mapper for this shard
if shard.store == nil {
nodes := tx.server.DataNodesByID(shard.DataNodeIDs)
if len(nodes) == 0 {
return nil, ErrShardNotFound
}

balancer := NewDataNodeBalancer(nodes)

mapper = &RemoteMapper{
dataNodes: balancer,
Database: mm.Database,
MeasurementName: m.Name,
TMin: tmin.UnixNano(),
TMax: tmax.UnixNano(),
SeriesIDs: t.SeriesIDs,
ShardID: shard.ID,
WhereFields: whereFields,
SelectFields: selectFields,
SelectTags: selectTags,
Limit: stmt.Limit,
Offset: stmt.Offset,
Interval: interval,
}
mapper.(*RemoteMapper).SetFilters(t.Filters)
} else {
mapper = &LocalMapper{
seriesIDs: t.SeriesIDs,
db: shard.store,
job: job,
decoder: NewFieldCodec(m),
filters: t.Filters,
whereFields: whereFields,
selectFields: selectFields,
selectTags: selectTags,
tmax: tmax.UnixNano(),
interval: interval,
// multiple mappers may need to be merged together to get the results
// for a raw query. So each mapper will have to read at least the
// limit plus the offset in data points to ensure we've hit our mark
limit: uint64(stmt.Limit) + uint64(stmt.Offset),
}
}

balancer := NewDataNodeBalancer(nodes)

mapper = &RemoteMapper{
dataNodes: balancer,
Database: mm.Database,
MeasurementName: m.Name,
TMin: tmin.UnixNano(),
TMax: tmax.UnixNano(),
SeriesIDs: t.SeriesIDs,
ShardID: shard.ID,
WhereFields: whereFields,
SelectFields: selectFields,
SelectTags: selectTags,
Limit: stmt.Limit,
Offset: stmt.Offset,
Interval: interval,
}
mapper.(*RemoteMapper).SetFilters(t.Filters)
} else {
mapper = &LocalMapper{
seriesIDs: t.SeriesIDs,
db: shard.store,
job: job,
decoder: NewFieldCodec(m),
filters: t.Filters,
whereFields: whereFields,
selectFields: selectFields,
selectTags: selectTags,
tmax: tmax.UnixNano(),
interval: interval,
// multiple mappers may need to be merged together to get the results
// for a raw query. So each mapper will have to read at least the
// limit plus the offset in data points to ensure we've hit our mark
limit: uint64(stmt.Limit) + uint64(stmt.Offset),
}
mappers = append(mappers, mapper)
}

mappers = append(mappers, mapper)
}

job.Mappers = mappers
Expand Down

0 comments on commit 30b56ce

Please sign in to comment.