-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Handle distributed queries when shards != data nodes #2336
Conversation
👍 |
@jwilder -- one thing you should be aware of is that |
@@ -105,14 +105,14 @@ func (s *Node) Close() error { | |||
} | |||
} | |||
|
|||
if s.Broker != nil { | |||
if err := s.Broker.Close(); err != nil { | |||
if s.raftLog != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch.
ec35428
to
9612ea7
Compare
|
||
shard := sg.Shards[0] | ||
// pick a shard to query | ||
shard := sg.Shards[rand.Intn(len(sg.Shards))] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I might be missing something, but I don't see how this can work. Shard groups are created here:
https://github.com/influxdb/influxdb/blob/master/server.go#L1090
If there are 3 nodes, and a replication factor of 1, then 3 shards are created on the cluster, each with different data (there is no replication, sharding takes place purely for write throughput). Therefore when a shard group is selected for query, then selecting only 1 of the shards at random means that 2/3 of the data in that shard group is not queried. It seems that this code assumes that all shards in a shard group contain the same data, which is not always the case.
Am I missing something? Furthermore I'm pretty sure this whole thing needs to be more complex than this. Say I have to query series IDs 3 & 4, and a certain shard group is the one I want (determined by time). It may be possible that I don't need to query 1 of the shards in the shard group, because I know that data for Series IDs 3 & 4 doesn't exist in that 1 shard, only the other two. I can determine this by reversing the shard routing that takes place at write-time.
If my reasoning is correct, our testing need work, since it should catch this, but it did not. If it's not correct, can you explain what I am missing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am also curios how will it behave with 3 nodes and replicaN = 2
https://github.com/influxdb/influxdb/blob/master/server.go#L1112
Fixes #2272 There was previously a explict panic put in the query engine to prevent queries where the number of shards was not equal to the number of data nodes in the cluster. This was waiting for the distributed queries branch to land but was not removed when that landed.
Closing the broker before the raft log can trigger this panic since the raft log depends on the broker via the FSM. panic: apply: broker apply: broker already closed goroutine 29164 [running]: github.com/influxdb/influxdb/raft.(*Log).applier(0xc20833b040, 0xc20802bd40) /Users/jason/go/src/github.com/influxdb/influxdb/raft/log.go:1386 +0x278 created by github.com/influxdb/influxdb/raft.func·002 /Users/jason/go/src/github.com/influxdb/influxdb/raft/log.go:389 +0x764
shards := map[*Shard][]uint64{} | ||
for _, sid := range t.SeriesIDs { | ||
shard := sg.ShardBySeriesID(sid) | ||
shards[shard] = append(shards[shard], sid) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, very good. You're using "map" as a set.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To make it even clearer, you might just like to store struct{}
in there, and store nil
as the value, since we don't care about the value. Right now sid
will be overwritten with newer values so it's no use.
Looks good -- I think you've got it. All makes sense to me. +1 |
Don't forget the changelog. |
Handle distributed queries when shards != data nodes
@jwilder merged. i did the changelog on master. |
This PR has 3 changes:
queries where the number of shards was not equal to the number of data nodes
in the cluster. This was waiting for the distributed queries branch to land
but was not removed when that landed. This PR removes that panic.
LocalMapper
or aRemoteMapper
Fixes #2272