Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-40407][SQL] Fix the potential data skew caused by df.repartition #37855

Closed
wants to merge 8 commits into from

Conversation

wbo4958
Copy link
Contributor

@wbo4958 wbo4958 commented Sep 12, 2022

What changes were proposed in this pull request?

val df = spark.range(0, 100, 1, 50).repartition(4)
val v = df.rdd.mapPartitions { iter => {
        Iterator.single(iter.length)
}.collect()
println(v.mkString(","))

The above simple code outputs 50,0,0,50, which means there is no data in partition 1 and partition 2.

The RoundRobin seems to ensure to distribute the records evenly in the same partition, and not guarantee it between partitions.

Below is the code to generate the key

      case RoundRobinPartitioning(numPartitions) =>
        // Distributes elements evenly across output partitions, starting from a random partition.
        var position = new Random(TaskContext.get().partitionId()).nextInt(numPartitions)  
        (row: InternalRow) =>
{         // The HashPartitioner will handle the `mod` by the number of partitions  
         position += 1        
         position        
 }

In this case, There are 50 partitions, each partition will only compute 2 elements. The issue for RoundRobin here is it always starts with position=2 to do the Roundrobin.

See the output of Random

scala> (1 to 200).foreach(partitionId => print(new Random(partitionId).nextInt(4) + " "))  // the position is always 2.
2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 

Similarly, the below Random code also outputs the same value,

(1 to 200).foreach(partitionId => print(new Random(partitionId).nextInt(2) + " "))
(1 to 200).foreach(partitionId => print(new Random(partitionId).nextInt(4) + " "))
(1 to 200).foreach(partitionId => print(new Random(partitionId).nextInt(8) + " "))
(1 to 200).foreach(partitionId => print(new Random(partitionId).nextInt(16) + " "))
(1 to 200).foreach(partitionId => print(new Random(partitionId).nextInt(32) + " "))

Consider partition 0, the total elements are [0, 1], so when shuffle writes, for element 0, the key will be (position + 1) = 2 + 1 = 3%4=3, the element 1, the key will be (position + 1)=(3+1)=4%4 = 0
consider partition 1, the total elements are [2, 3], so when shuffle writes, for element 2, the key will be (position + 1) = 2 + 1 = 3%4=3, the element 3, the key will be (position + 1)=(3+1)=4%4 = 0

The calculation is also applied for other left partitions since the starting position is always 2 for this case.

So, as you can see, each partition will write its elements to Partition [0, 3], which results in Partition [1, 2] without any data.

This PR changes the starting position of RoundRobin. The default position calculated by new Random(partitionId).nextInt(numPartitions) may always be the same for different partitions, which means each partition will output the data into the same keys when shuffle writes, and some keys may not have any data in some special cases.

Why are the changes needed?

The PR can fix the data skew issue for the special cases.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Will add some tests and watch CI pass

This PR changes the starting position of RoundRobin. The default position
calculated by `new Random(partitionId).nextInt(numPartitions)` may
always be same for different partitions, which means each partition will
output the data into the same keys when shuffle writes, and some key
may not have any data in some special cases.

The PR can fix the data skew issue for the special cases.

No

Will add some tests and watch CI pass
@github-actions github-actions bot added the SQL label Sep 12, 2022
@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@HyukjinKwon HyukjinKwon marked this pull request as draft September 13, 2022 06:18
@@ -298,8 +297,7 @@ object ShuffleExchangeExec {
}
def getPartitionKeyExtractor(): InternalRow => Any = newPartitioning match {
case RoundRobinPartitioning(numPartitions) =>
// Distributes elements evenly across output partitions, starting from a random partition.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think removing this can actually cause a regression such as skewed data since the starting partition is always same?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon Thx for reviewing.

The original comment "starting from a random partition", I think please correct me if I am wrong, is meaning "from which reducer partition beginning to do the shuffle write with RoundRobin manner. Basically, the big data should be distributed evenly in the same partition. But the issue here is the shuffle partition does not contain much data, the data actually is smaller than the total reducer partition, which means if the starting position is the same for all the shuffle partitions, then all the data will be distributed into the same reducer partitions for all the shuffle partitions, and some reducer partitions will not have any data.

This PR just makes the partitionId the default starting position to do the RoundRobin, which means each shuffle partition has a different starting position,

I tested the below code

      val df = spark.range(0, 100, 1, 50).repartition(4)
      val v = df.rdd.mapPartitions { iter => {
        Iterator.single(iter.length)
      }
      }.collect()
      println(v.mkString(","))

w/ my PR, It outputs 24,25,26,25, w/ o my PR, it outputs 50,0,0,50

Similarly, if I change to repartition(8)

w/ my PR, It outputs 12,13,14,13,12,12,12,12, w/ o my PR, it outputs 0,0,0,0,0,0,50,50

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the Random to XORShiftRandom.

@wbo4958
Copy link
Contributor Author

wbo4958 commented Sep 13, 2022

@HyukjinKwon, This PR just uses the partitionId as the default starting position to do the RoundRobin and there is also an alternative simple way to replace Random by XORShiftRandom which can also avoid outputting the same position for different partition id. see the tests

scala> (1 to 50).foreach(numPartitions => { println(s"\n numPartitions is $numPartitions"); (0 to 200).foreach(partitionId => print(new XORShiftRandom(partitionId).nextInt(numPartitions) + " "))})

 numPartitions is 1
0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 
 numPartitions is 2
0 0 1 1 1 1 1 0 1 1 0 1 1 1 0 1 1 0 1 1 0 1 1 0 0 0 0 0 0 1 1 0 1 0 0 0 1 0 1 0 1 1 0 0 0 0 1 0 1 0 0 1 0 0 1 0 0 0 0 0 1 1 0 1 1 1 0 0 0 1 0 0 0 1 0 1 0 0 1 0 1 1 0 1 0 1 1 0 0 0 0 0 1 1 1 1 1 0 0 1 0 1 0 1 0 1 0 1 0 0 0 0 1 0 0 1 0 1 0 0 1 0 1 0 1 0 0 0 1 1 0 1 0 1 0 0 1 1 1 0 1 1 1 0 1 1 1 0 0 0 1 0 1 0 1 0 1 0 0 1 1 0 1 0 0 0 0 0 0 1 0 0 0 0 0 1 0 1 0 0 1 1 1 0 1 1 0 1 1 1 1 0 0 0 0 0 1 0 0 0 1 
 numPartitions is 3
2 0 1 2 1 1 2 1 2 2 2 2 0 2 1 1 0 2 1 0 0 0 1 0 2 1 0 0 0 2 2 1 1 1 2 2 1 1 2 0 0 1 2 0 0 0 0 1 0 0 0 2 1 0 1 1 1 0 2 0 2 2 1 0 0 0 2 2 2 2 0 2 0 0 0 2 0 1 1 0 0 1 2 2 2 0 1 1 0 1 2 1 2 0 2 0 1 1 1 2 1 2 2 1 0 1 1 1 1 2 2 1 0 2 0 2 0 1 0 2 2 0 0 2 1 0 0 2 0 0 1 1 1 1 2 1 1 2 0 0 0 2 1 2 2 2 0 1 0 2 0 1 1 0 0 0 1 0 2 0 0 2 1 2 1 1 1 0 2 1 0 0 0 1 2 2 1 2 1 2 0 2 1 2 2 1 2 0 0 1 1 1 0 2 1 1 2 2 1 2 0 
 numPartitions is 4
0 0 3 2 2 3 3 1 2 2 0 2 2 2 1 2 2 0 3 2 0 3 3 1 1 1 1 0 0 2 3 1 3 0 1 1 2 0 2 1 3 3 0 0 1 1 3 0 2 1 1 2 0 1 3 0 0 0 1 0 3 2 1 3 3 3 0 1 0 3 0 0 0 3 0 2 1 0 2 0 2 2 0 3 0 2 3 0 1 1 1 1 3 3 3 3 3 0 1 3 1 3 0 2 1 2 1 3 0 1 1 0 2 0 0 3 0 2 1 1 2 0 3 1 3 1 0 0 2 3 1 2 1 3 0 1 2 2 2 1 3 3 3 0 2 2 3 0 1 0 3 0 2 1 2 1 2 1 1 3 3 0 2 0 0 1 1 1 1 2 0 0 1 0 1 3 0 3 1 1 3 2 3 1 2 3 0 3 3 2 3 0 0 1 1 0 2 1 0 0 2 
 numPartitions is 5
3 3 3 2 3 0 0 0 0 3 4 2 0 4 1 3 4 0 3 4 3 4 4 3 1 1 3 4 3 3 0 2 0 3 0 3 4 0 4 2 0 0 4 2 3 2 3 3 3 0 2 0 3 1 2 1 4 2 1 4 1 3 2 3 0 0 1 2 0 4 0 4 2 1 4 3 2 1 3 2 1 4 4 3 3 2 4 4 4 0 0 2 4 0 3 2 0 3 3 0 0 1 1 0 0 4 4 4 2 4 2 2 4 3 2 3 3 3 3 2 3 3 3 1 0 4 3 3 3 0 1 0 3 0 2 1 3 3 3 3 0 1 2 3 1 2 3 0 0 3 2 3 4 0 2 4 3 3 0 0 0 4 0 1 1 4 1 2 0 2 3 1 4 2 0 3 1 3 1 4 4 0 2 1 3 2 1 1 3 0 1 1 1 0 0 4 4 1 1 3 3 
 numPartitions is 6
2 3 4 5 1 1 2 4 5 2 5 2 3 2 1 4 3 2 1 3 3 3 1 0 5 4 3 3 3 2 2 1 1 4 2 5 1 1 2 3 3 4 5 3 3 3 0 1 0 0 3 2 4 3 4 4 4 3 5 0 5 2 4 3 3 3 2 5 5 2 3 5 0 3 0 2 3 1 1 3 0 4 2 2 5 0 1 1 0 1 5 4 5 3 5 3 4 1 4 5 1 5 5 4 3 1 4 1 4 2 2 4 3 5 3 5 3 4 0 5 2 0 3 2 1 3 3 2 3 0 1 4 4 4 5 4 4 5 0 3 3 2 1 5 2 5 3 1 3 5 3 1 4 0 0 0 4 0 2 3 0 2 4 2 4 1 1 3 5 1 3 3 0 4 5 5 4 2 4 2 0 5 4 2 5 4 2 0 3 4 4 1 0 5 1 4 2 2 1 5 3 
 numPartitions is 7
2 5 1 6 1 1 3 1 1 4 4 2 4 3 6 4 4 3 5 3 3 1 5 6 6 6 2 6 5 1 4 6 0 6 1 3 4 0 5 1 5 6 6 6 2 4 4 6 6 2 2 2 5 5 3 2 5 4 0 5 6 0 6 0 0 5 2 0 2 4 5 0 4 1 1 2 4 4 0 2 1 4 4 4 2 3 4 5 3 1 4 4 0 6 6 6 4 0 1 2 4 2 6 1 0 2 5 5 4 2 3 6 5 2 5 3 5 6 3 4 5 3 0 4 0 3 1 2 1 4 3 2 3 6 1 2 1 1 5 5 4 0 0 6 2 0 3 2 1 2 3 5 3 3 5 2 4 4 4 5 5 6 2 2 3 1 1 3 5 1 6 4 2 3 1 5 5 1 0 3 0 3 3 0 5 1 6 4 6 3 6 5 1 4 1 3 0 4 2 6 6 
 numPartitions is 8
1 1 7 5 4 7 6 3 4 5 1 5 4 4 3 4 5 0 7 5 1 7 6 3 2 2 2 0 1 5 7 3 6 1 3 2 5 0 5 3 6 7 1 0 3 3 7 1 5 3 2 5 0 2 7 0 0 1 3 1 6 4 3 6 7 6 0 2 1 7 0 0 0 7 0 4 2 0 5 1 4 4 1 6 0 4 7 0 2 2 3 3 6 6 7 6 6 1 3 7 2 6 0 5 3 5 2 6 1 2 2 0 5 1 0 7 0 5 2 3 5 1 6 2 7 2 0 1 4 6 2 4 2 7 1 2 4 4 5 2 7 6 7 0 4 5 6 1 3 1 6 1 4 2 4 3 5 3 3 6 6 1 5 1 1 3 2 2 2 5 1 0 2 0 3 6 1 7 3 3 6 4 7 3 4 6 1 6 7 4 6 0 0 2 3 1 4 3 0 0 4 
 numPartitions is 9
5 0 4 2 7 1 8 4 5 5 5 2 0 2 1 7 6 2 7 3 6 0 1 6 8 7 0 6 6 2 5 4 4 7 2 8 7 7 8 3 3 4 2 3 6 6 3 4 0 3 0 8 4 3 4 7 1 0 2 3 5 5 1 6 3 3 2 8 5 8 3 2 3 0 3 5 6 1 1 6 0 1 5 2 5 0 1 7 6 7 2 7 2 6 8 0 1 7 7 2 7 2 2 1 3 1 1 4 1 5 5 1 6 2 6 2 6 4 6 8 8 6 3 2 1 3 6 5 0 0 7 1 1 7 2 4 4 8 0 3 3 8 4 5 8 5 6 1 3 8 0 7 4 0 0 6 1 6 5 6 6 5 4 5 1 1 7 0 2 4 6 6 0 7 2 5 1 8 7 2 3 2 4 2 8 7 5 6 0 7 7 1 3 5 4 1 5 2 7 5 0 
 numPartitions is 10
8 3 8 7 3 5 0 0 5 8 9 2 5 4 1 8 9 0 3 9 3 9 9 8 1 6 3 9 3 8 0 7 5 8 0 3 9 5 4 7 5 0 9 7 3 7 8 3 8 0 7 0 8 1 2 6 4 7 1 4 1 8 2 3 5 5 6 7 5 4 5 9 2 1 4 8 7 1 3 7 6 4 4 8 3 2 9 9 4 5 5 2 9 5 3 7 0 3 8 5 5 1 1 0 5 9 4 9 2 4 2 2 9 3 7 3 3 8 8 7 8 8 3 6 5 9 3 8 3 0 1 0 8 0 7 6 8 3 8 3 5 6 7 3 6 7 3 5 5 3 7 3 4 0 2 4 8 8 0 5 0 4 0 6 6 9 1 7 5 7 3 1 4 2 5 3 6 8 6 4 4 5 2 6 3 2 6 6 3 0 6 1 6 5 5 4 4 6 1 3 3 
 numPartitions is 11
3 10 10 10 4 5 4 2 5 6 8 6 9 6 4 1 3 8 5 9 10 5 3 5 1 0 0 8 8 2 4 3 3 0 6 2 4 8 1 7 8 6 0 5 9 4 5 5 0 1 0 7 3 7 3 2 2 6 2 7 5 0 2 1 10 9 7 8 0 10 1 8 7 10 2 7 5 8 8 6 0 8 6 10 7 8 9 4 7 8 8 10 0 8 2 2 2 0 10 4 4 3 5 2 8 10 7 6 6 5 7 10 5 1 7 3 5 1 6 10 9 2 2 3 5 8 9 1 8 1 10 0 10 0 10 10 2 9 1 10 8 10 9 8 1 1 4 6 6 1 7 3 7 10 7 10 3 5 1 1 2 8 6 1 8 10 1 4 1 8 5 7 10 7 5 10 5 4 10 2 8 8 0 9 8 0 9 1 5 7 2 5 2 0 0 1 0 0 3 8 2 
 numPartitions is 12
8 3 4 11 1 1 2 4 11 8 5 2 3 2 7 10 9 8 7 9 9 9 1 0 11 4 3 3 9 8 8 1 7 4 8 5 7 1 2 9 3 10 11 9 9 9 0 1 6 6 3 8 4 9 4 4 4 3 5 0 5 8 4 3 9 9 2 11 5 2 9 5 6 3 0 8 3 7 7 9 0 4 8 8 11 0 1 1 6 7 11 10 5 3 5 3 10 7 10 11 1 5 5 10 3 1 4 7 10 8 2 10 3 5 9 5 3 10 0 5 2 0 9 8 7 9 9 8 3 6 7 10 10 4 11 10 10 11 0 3 3 8 1 11 2 11 3 7 9 11 9 1 10 0 6 0 10 0 8 3 6 8 4 2 10 1 1 3 5 1 3 9 6 10 11 5 4 8 10 2 6 5 10 2 5 10 2 6 9 10 10 1 6 5 1 4 2 8 7 5 3 
 numPartitions is 13
10 6 5 9 10 12 3 12 1 1 3 12 3 8 5 0 3 10 2 9 7 8 5 1 10 7 2 10 7 11 5 3 8 0 10 11 1 9 4 8 6 0 10 6 3 12 3 11 5 9 11 11 11 11 11 6 8 1 11 7 4 4 12 2 5 6 5 1 10 1 10 9 0 11 4 2 2 12 7 10 12 3 3 8 11 4 6 3 1 5 2 11 5 5 9 0 2 11 4 4 12 8 8 8 8 2 5 3 9 4 8 8 3 5 6 12 2 10 9 1 12 7 1 6 0 8 4 6 5 2 2 0 12 1 10 2 4 3 2 3 3 12 9 10 9 6 6 10 3 9 12 4 1 5 1 4 5 1 0 0 6 9 11 4 7 4 7 11 1 9 7 10 11 3 5 11 11 5 8 9 8 3 5 8 9 9 0 6 7 1 9 4 8 5 7 10 2 0 4 6 11 
 numPartitions is 14
2 5 8 13 1 1 10 8 1 4 11 2 11 10 13 4 11 10 5 3 3 1 5 6 13 6 9 13 5 8 4 13 7 6 8 3 11 7 12 1 5 6 13 13 9 11 4 13 6 2 9 2 12 5 10 2 12 11 7 12 13 0 6 7 7 5 2 7 9 4 5 7 4 1 8 2 11 11 7 9 8 4 4 4 9 10 11 5 10 1 11 4 7 13 13 13 4 7 8 9 11 9 13 8 7 9 12 5 4 2 10 6 5 9 5 3 5 6 10 11 12 10 7 4 7 3 1 2 1 4 3 2 10 6 1 2 8 1 12 5 11 0 7 13 2 7 3 9 1 9 3 5 10 10 12 2 4 4 4 5 12 6 2 2 10 1 1 3 5 1 13 11 2 10 1 5 12 8 0 10 0 3 10 0 5 8 6 4 13 10 6 5 8 11 1 10 0 4 9 13 13 
 numPartitions is 15
8 3 13 2 13 10 5 10 5 8 14 2 0 14 1 13 9 5 13 9 3 9 4 3 11 1 3 9 3 8 5 7 10 13 5 8 4 10 14 12 0 10 14 12 3 12 3 13 3 0 12 5 13 6 7 1 4 12 11 9 11 8 7 3 0 0 11 2 5 14 0 14 12 6 9 8 12 1 13 12 6 4 14 8 8 12 4 4 9 10 5 7 14 0 8 12 10 13 13 5 10 11 11 10 0 4 4 4 7 14 2 7 9 8 12 8 3 13 3 2 8 3 3 11 10 9 3 8 3 0 1 10 13 10 2 1 13 8 3 3 0 11 7 8 11 2 3 10 0 8 12 13 4 0 12 9 13 3 5 0 0 14 10 11 1 4 1 12 5 7 3 6 9 7 5 8 1 8 1 14 9 5 7 11 8 7 11 6 3 10 1 1 6 5 10 4 14 11 1 8 3 
 numPartitions is 16
3 3 15 11 9 14 12 6 9 11 3 11 9 9 7 9 11 1 15 11 2 14 12 6 4 4 5 1 3 10 15 6 13 3 6 4 10 0 10 6 13 15 3 1 7 7 15 3 11 6 4 10 1 4 14 0 0 3 6 3 13 8 6 13 14 12 1 4 2 15 0 1 1 14 1 9 5 0 11 2 9 9 3 12 0 9 15 1 5 5 7 7 12 12 15 12 13 3 6 14 4 12 1 10 7 10 4 13 2 5 5 0 10 2 1 14 1 10 4 7 11 2 13 5 14 5 1 3 9 13 4 8 5 14 3 5 9 9 10 4 14 13 15 1 8 10 13 2 6 3 12 3 9 5 9 6 10 6 7 12 12 2 11 2 3 6 5 4 5 10 3 0 5 0 7 12 2 15 6 6 13 8 14 7 9 13 3 12 14 8 12 0 1 4 7 3 8 6 1 0 9 
 numPartitions is 17
2 14 12 4 16 8 14 13 0 13 6 10 6 12 9 16 5 8 2 15 1 11 5 12 10 5 12 5 14 4 15 3 8 7 5 12 2 5 12 2 13 5 2 12 7 11 12 15 7 8 16 1 11 6 13 2 13 1 13 3 8 15 1 12 5 15 10 16 8 4 15 13 16 8 2 2 12 0 0 14 15 14 8 15 0 13 11 5 10 3 0 16 13 7 1 15 12 11 4 10 10 16 7 10 11 11 16 14 12 7 14 6 6 2 0 11 9 13 6 2 6 15 6 15 15 13 11 9 11 8 13 2 11 5 9 2 11 4 1 4 9 8 5 12 2 0 16 15 9 13 6 3 2 8 2 1 6 12 6 2 15 12 1 3 8 13 4 4 15 9 14 16 16 12 0 10 5 8 8 1 13 16 14 8 15 5 11 14 11 1 13 6 7 7 16 1 13 13 12 0 15 
 numPartitions is 18
14 9 4 11 7 1 8 4 5 14 5 2 9 2 1 16 15 2 7 3 15 9 1 6 17 16 9 15 15 2 14 13 13 16 2 17 7 7 8 3 3 4 11 3 15 15 12 13 0 12 9 8 4 3 4 16 10 9 11 12 5 14 10 15 3 3 2 17 5 8 3 11 12 9 12 14 15 1 1 15 0 10 14 2 5 0 1 7 6 7 11 16 11 15 17 9 10 7 16 11 7 11 11 10 3 1 10 13 10 14 14 10 15 11 15 11 15 4 6 17 8 6 3 2 1 3 15 14 9 0 7 10 10 16 11 4 4 17 0 3 3 8 13 5 8 5 15 1 3 17 9 7 4 0 0 6 10 6 14 15 6 14 4 14 10 1 7 9 11 13 15 15 0 16 11 5 10 8 16 2 12 11 4 2 17 16 14 6 9 16 16 1 12 5 13 10 14 2 7 5 9 
 numPartitions is 19
10 0 14 15 4 4 11 8 15 11 9 11 9 13 6 18 14 3 6 11 10 8 13 13 10 6 4 17 16 11 4 16 3 8 0 8 5 4 18 3 0 18 16 16 11 18 8 12 3 0 12 6 0 9 14 2 3 8 18 6 14 12 0 3 5 17 9 4 9 16 13 9 11 13 8 7 2 12 14 16 1 17 11 7 2 18 2 9 15 17 3 5 1 13 6 7 5 12 4 12 18 9 8 5 5 8 8 14 0 5 2 2 7 10 9 0 5 15 2 14 10 13 13 12 8 4 7 16 2 7 12 5 1 11 6 15 12 0 18 12 11 4 17 16 3 0 8 13 13 11 8 0 2 10 14 6 16 4 15 4 11 1 11 12 16 6 3 11 5 4 6 18 3 18 15 10 3 1 13 13 18 1 1 3 10 18 5 6 11 16 10 10 7 0 1 13 7 11 7 7 14 
 numPartitions is 20
8 3 8 7 13 5 10 0 15 8 9 2 15 14 11 18 9 0 3 9 13 9 9 8 11 16 3 19 13 8 0 17 15 8 0 13 19 5 14 17 15 10 19 17 13 17 8 13 18 10 7 0 8 1 12 16 4 7 1 4 1 8 12 3 5 5 6 7 5 14 5 9 2 11 4 8 7 11 3 17 16 4 4 8 3 12 9 9 14 15 15 2 9 15 13 7 10 3 18 15 5 1 1 10 15 9 4 19 2 4 2 2 19 13 17 13 3 18 8 17 18 8 13 16 15 9 13 8 3 10 11 10 18 0 7 6 18 3 8 3 15 16 17 3 6 7 3 15 5 3 17 13 14 0 2 4 18 8 0 15 10 4 0 6 6 9 1 7 5 17 3 1 14 2 15 13 16 8 6 14 14 5 2 6 13 2 6 6 13 10 6 1 6 5 5 4 14 16 11 13 3 
 numPartitions is 21
2 12 1 20 1 1 17 1 8 11 11 2 18 17 13 4 18 17 19 3 3 15 19 6 20 13 9 6 12 8 11 13 7 13 8 17 4 7 5 15 12 13 20 6 9 18 18 13 6 9 9 2 19 12 10 16 19 18 14 12 20 14 13 0 0 12 2 14 2 11 12 14 18 15 15 2 18 4 7 9 15 4 11 11 2 3 4 19 3 1 11 4 14 6 20 6 4 7 1 2 4 2 20 1 0 16 19 19 4 2 17 13 12 2 12 17 12 13 3 11 5 3 0 11 7 3 15 2 15 18 10 16 10 13 8 16 1 8 12 12 18 14 7 20 2 14 3 16 15 2 3 19 10 3 12 9 4 18 11 12 12 20 16 2 10 1 1 3 5 1 6 18 9 10 8 5 19 8 7 17 0 17 10 14 5 1 20 18 6 10 13 19 15 11 1 10 14 11 16 20 6 
 numPartitions is 22
14 21 10 21 15 5 4 2 5 6 19 6 9 6 15 12 3 8 5 9 21 5 3 16 1 0 11 19 19 2 4 3 3 0 6 13 15 19 12 7 19 6 11 5 9 15 16 5 0 12 11 18 14 7 14 2 2 17 13 18 5 0 2 1 21 9 18 19 11 10 1 19 18 21 2 18 5 19 19 17 0 8 6 10 7 8 9 15 18 19 19 10 11 19 13 13 2 11 10 15 15 3 5 2 19 21 18 17 6 16 18 10 5 1 7 3 5 12 6 21 20 2 13 14 5 19 9 12 19 12 21 0 10 0 21 10 2 9 12 21 19 10 9 19 12 1 15 17 17 1 7 3 18 10 18 10 14 16 12 1 2 8 6 12 8 21 1 15 1 19 5 7 10 18 5 21 16 4 10 2 8 19 0 20 19 0 20 12 5 18 2 5 2 11 11 12 0 0 3 19 13 
 numPartitions is 23
12 22 12 12 18 8 9 11 9 6 22 6 10 12 18 16 10 0 20 15 14 15 1 2 10 22 18 4 9 2 19 2 22 8 0 6 8 14 19 9 18 14 11 7 17 3 5 1 20 9 10 19 0 13 15 3 17 7 14 5 8 21 21 5 13 7 1 12 20 8 10 19 9 5 20 2 10 0 8 1 18 21 8 2 4 3 16 3 19 22 1 6 7 21 8 9 17 11 16 14 14 7 3 0 0 4 20 14 6 14 19 20 19 6 13 15 19 22 7 5 20 17 8 14 1 15 5 6 16 15 4 4 14 0 20 2 10 18 2 9 11 9 7 16 2 14 7 14 18 17 20 10 3 6 13 0 22 15 11 18 5 1 11 12 8 4 6 18 7 0 4 17 2 18 14 0 6 12 17 10 5 12 6 18 3 16 10 4 14 20 22 4 15 8 7 10 5 8 14 22 20 
 numPartitions is 24
20 15 4 23 13 1 2 16 11 20 17 14 3 14 19 10 9 20 7 9 21 9 13 12 11 4 15 3 21 8 20 1 19 16 8 5 19 1 2 21 3 22 23 21 9 9 12 1 18 6 15 8 16 9 4 16 4 3 5 0 17 20 16 3 9 9 14 23 17 2 9 5 6 3 0 8 15 7 19 21 12 16 20 20 23 12 1 1 6 7 11 22 17 3 17 3 10 7 22 23 13 5 5 10 15 1 4 19 22 20 2 10 15 17 21 17 3 22 12 17 14 12 9 8 7 21 9 8 3 6 19 22 10 16 11 22 10 23 12 15 3 20 13 11 2 23 15 19 9 11 21 1 22 12 6 0 22 12 20 15 6 8 16 2 10 13 1 15 17 13 15 21 6 22 11 17 4 8 10 14 6 17 10 2 17 22 14 6 21 22 10 13 6 17 1 16 14 20 19 5 3 
 numPartitions is 25
3 3 3 22 3 0 15 10 0 13 14 2 20 19 21 3 4 5 8 19 3 19 19 18 1 1 3 14 23 8 0 17 15 23 10 3 14 10 9 12 0 20 4 2 8 7 13 3 13 0 7 5 13 6 22 1 9 12 16 4 21 8 22 3 10 15 1 22 15 4 5 9 7 1 9 8 2 21 3 7 11 14 14 13 18 7 14 19 19 0 20 7 9 10 3 22 20 13 23 15 5 21 6 15 0 19 4 9 12 24 7 17 19 18 12 3 23 13 3 17 13 23 23 11 0 9 8 13 3 5 1 20 8 20 12 6 18 23 18 13 20 11 12 18 11 22 13 20 15 3 7 8 9 15 17 19 13 23 10 15 0 24 10 16 11 4 6 17 10 2 18 1 24 12 10 23 6 18 16 19 14 0 17 21 13 12 11 6 13 0 1 21 16 0 20 9 19 21 21 23 3 
 numPartitions is 26
10 19 18 9 23 25 16 12 1 14 3 12 3 8 5 0 3 10 15 9 7 21 5 14 23 20 15 23 7 24 18 3 21 0 10 11 1 9 4 21 19 0 23 19 3 25 16 11 18 22 11 24 24 11 24 6 8 1 11 20 17 4 12 15 5 19 18 1 23 14 23 9 0 11 4 2 15 25 7 23 12 16 16 8 11 4 19 3 14 5 15 24 5 5 9 13 2 11 4 17 25 21 21 8 21 15 18 3 22 4 8 8 3 5 19 25 15 10 22 1 12 20 1 6 13 21 17 6 5 2 15 0 12 14 23 2 4 3 2 3 3 12 9 23 22 19 19 23 3 9 25 17 14 18 14 4 18 14 0 13 6 22 24 4 20 17 7 11 1 9 7 23 24 16 5 11 24 18 8 22 8 3 18 8 9 22 0 6 7 14 22 17 8 5 7 10 2 0 17 19 11 
 numPartitions is 27
23 9 4 2 7 1 17 22 23 14 5 20 9 20 1 7 24 11 7 3 6 0 10 24 8 7 0 24 15 11 23 22 4 16 20 26 25 25 26 3 12 4 11 12 24 24 12 13 9 21 0 26 13 3 13 25 19 18 11 3 5 5 10 6 21 21 20 26 23 8 12 20 12 9 21 5 24 19 19 15 18 1 14 2 5 18 1 7 6 16 20 7 2 15 8 0 1 7 7 2 7 11 11 10 3 1 1 13 10 14 23 10 24 2 15 2 15 4 15 17 17 24 3 2 1 21 24 23 9 0 25 10 10 25 11 22 13 26 9 21 12 17 22 14 26 5 24 19 3 8 0 25 13 0 9 24 1 6 14 15 15 23 4 5 19 1 16 9 11 22 6 24 18 16 20 5 1 17 16 11 12 2 4 20 26 25 5 24 18 16 16 10 3 23 13 1 23 2 16 14 0 
 numPartitions is 28
16 19 8 27 1 1 10 8 15 4 25 2 11 10 27 18 25 24 19 17 17 1 5 20 27 20 23 27 5 8 4 13 7 20 8 17 11 21 26 1 19 6 27 13 9 25 4 13 6 2 23 16 12 5 24 16 12 11 21 12 13 0 20 7 21 5 2 7 9 18 5 21 18 15 8 16 11 11 7 9 8 4 4 4 23 24 25 5 10 15 11 18 21 27 13 27 18 7 22 23 25 9 13 22 7 9 12 19 18 16 10 6 19 9 5 17 19 6 24 25 26 24 21 4 7 17 1 16 15 18 3 2 10 20 15 2 22 15 12 19 11 0 21 27 2 7 3 23 1 23 17 5 10 24 26 16 18 4 4 19 26 20 16 2 10 1 1 3 5 1 27 25 2 10 15 5 12 8 14 10 14 17 10 14 5 22 6 18 13 10 6 5 22 25 1 24 14 4 23 13 27 
 numPartitions is 29
20 17 16 14 3 2 28 24 7 9 24 27 27 1 13 23 2 28 9 8 27 14 27 9 9 23 10 2 0 24 2 20 28 8 19 5 0 5 21 15 27 6 26 20 19 6 0 16 15 13 14 28 14 1 2 23 27 21 0 16 27 20 15 26 7 14 25 11 19 17 25 8 0 23 1 0 14 21 21 24 14 21 7 14 28 13 7 6 0 16 12 6 27 13 20 18 17 14 0 14 18 16 0 14 15 2 2 14 26 3 17 22 12 17 14 5 5 6 25 16 10 3 26 20 15 7 0 15 23 1 23 3 5 27 27 28 15 22 12 6 3 20 5 18 7 14 11 12 3 25 23 25 18 15 19 22 11 16 7 11 9 7 6 9 8 12 1 6 16 15 12 5 22 19 5 17 10 24 26 1 0 6 1 14 3 10 5 10 26 21 17 24 8 23 13 24 3 18 4 17 8 
 numPartitions is 30
8 3 28 17 13 25 20 10 5 8 29 2 15 14 1 28 9 20 13 9 3 9 19 18 11 16 3 9 3 8 20 7 25 28 20 23 19 25 14 27 15 10 29 27 3 27 18 13 18 0 27 20 28 21 22 16 4 27 11 24 11 8 22 3 15 15 26 17 5 14 15 29 12 21 24 8 27 1 13 27 6 4 14 8 23 12 19 19 24 25 5 22 29 15 23 27 10 13 28 5 25 11 11 10 15 19 4 19 22 14 2 22 9 23 27 23 3 28 18 17 8 18 3 26 25 9 3 8 3 0 1 10 28 10 17 16 28 23 18 3 15 26 7 23 26 17 3 25 15 23 27 13 4 0 12 24 28 18 20 15 0 14 10 26 16 19 1 27 5 7 3 21 24 22 5 23 16 8 16 14 24 5 22 26 23 22 26 6 3 10 16 1 6 5 25 4 14 26 1 23 3 
 numPartitions is 31
12 15 5 10 5 22 2 27 18 17 23 24 12 6 5 15 15 15 1 1 11 24 9 18 8 9 12 0 11 23 18 29 9 8 5 30 20 11 30 7 6 20 25 6 26 28 15 24 30 19 9 19 2 18 11 29 26 3 4 24 20 26 18 24 14 5 9 12 30 27 19 10 29 28 27 2 29 15 27 14 26 9 20 23 21 20 8 24 29 6 10 6 14 12 20 23 3 6 29 4 8 12 20 3 21 7 16 23 0 14 1 23 20 18 24 21 29 22 0 29 25 1 1 7 9 15 30 3 25 6 29 5 6 29 3 30 13 12 13 23 20 18 4 27 20 11 0 10 4 27 12 14 15 17 14 30 16 3 15 5 9 11 5 0 13 0 18 16 22 25 21 10 15 4 20 9 19 28 19 17 11 10 2 26 4 19 11 11 22 19 2 22 22 7 15 14 19 19 3 25 18 
 numPartitions is 32
7 7 30 23 18 29 24 13 19 22 7 22 19 19 14 19 23 2 31 23 5 28 25 12 8 8 11 2 6 21 30 12 27 7 13 9 21 1 20 13 27 31 6 2 15 15 30 7 23 12 8 21 3 8 28 1 0 6 13 6 27 17 12 26 28 25 2 8 5 30 0 2 2 28 3 18 10 0 22 4 18 18 7 24 1 18 31 3 10 11 15 14 24 24 30 24 26 6 13 28 8 25 3 21 15 20 8 26 5 11 11 1 20 5 2 29 3 21 9 14 22 4 27 11 28 10 2 6 18 27 9 17 10 29 6 10 18 19 20 9 29 27 30 2 16 21 26 4 13 7 25 6 19 11 19 12 20 12 14 25 24 5 22 5 6 13 11 9 10 21 6 0 11 1 14 24 4 30 13 13 26 17 29 15 18 26 6 25 29 16 24 0 2 9 14 7 16 12 3 1 19 
 numPartitions is 33
14 21 10 32 4 16 26 13 5 17 8 17 9 17 4 1 3 8 16 9 21 27 25 27 23 22 0 30 30 2 26 25 25 22 17 2 4 19 23 18 30 28 11 27 9 15 27 16 0 12 0 29 25 18 25 13 13 6 2 18 5 11 13 12 21 9 29 8 11 32 12 8 18 21 24 29 27 19 19 6 0 19 17 32 29 30 31 4 18 19 8 10 11 30 2 24 13 22 10 26 4 14 5 13 30 10 7 28 28 5 29 10 27 23 18 14 27 1 6 32 20 24 24 14 16 30 9 23 30 12 10 22 10 22 32 10 13 20 12 21 30 32 31 8 23 23 15 28 6 23 18 25 7 21 18 21 25 27 23 12 24 8 28 23 19 10 1 15 23 19 27 18 21 7 5 32 16 26 10 2 30 8 22 20 8 22 20 12 27 7 13 16 24 11 22 1 11 11 25 8 24 
 numPartitions is 34
2 31 12 21 33 25 14 30 17 30 23 10 23 12 9 16 5 8 19 15 1 11 5 12 27 22 29 5 31 4 32 3 25 24 22 29 19 5 12 19 13 22 19 29 7 11 12 15 24 8 33 18 28 23 30 2 30 1 13 20 25 32 18 29 5 15 10 33 25 4 15 13 16 25 2 2 29 17 17 31 32 14 8 32 17 30 11 5 10 3 17 16 13 7 1 15 12 11 4 27 27 33 7 10 11 11 16 31 12 24 14 6 23 19 17 11 9 30 6 19 6 32 23 32 15 13 11 26 11 8 13 2 28 22 9 2 28 21 18 21 9 8 5 29 2 17 33 15 9 13 23 3 2 8 2 18 6 12 6 19 32 12 18 20 8 13 21 21 15 9 31 33 16 12 17 27 22 8 8 18 30 33 14 8 15 22 28 14 11 18 30 23 24 7 33 18 30 30 29 17 15 
 numPartitions is 35
23 33 8 27 8 15 10 15 15 18 4 2 25 24 6 18 4 10 33 24 3 29 19 13 6 6 23 34 33 8 25 27 0 13 15 3 4 0 19 22 5 20 34 27 23 32 18 13 13 30 2 30 33 26 17 16 19 32 21 19 6 28 27 28 0 5 16 7 30 4 5 14 32 1 29 23 32 11 28 2 1 4 4 18 23 17 4 19 24 15 25 32 14 20 13 27 25 28 8 30 25 16 6 15 0 9 19 19 32 9 17 27 19 23 12 3 33 13 3 32 33 3 28 11 0 24 8 23 8 25 31 30 3 20 22 16 8 8 33 33 25 21 7 13 16 7 3 30 15 23 17 33 24 10 12 9 18 18 25 5 5 34 30 16 31 29 1 17 5 22 13 11 9 17 15 33 26 8 21 24 14 10 17 21 33 22 6 11 13 10 6 26 1 25 15 24 14 11 16 13 13 
 numPartitions is 36
32 27 4 11 25 1 26 4 23 32 5 2 27 2 19 34 33 20 7 21 33 9 1 24 35 16 27 15 33 20 32 13 31 16 20 17 7 25 26 21 3 22 11 21 33 33 12 13 18 30 27 8 4 21 4 16 28 27 29 12 5 32 28 15 21 21 2 35 5 26 21 29 30 27 12 32 15 19 19 33 0 28 32 20 23 0 1 25 6 7 11 34 29 15 17 27 10 7 34 11 25 29 29 10 3 1 28 31 10 32 14 10 15 29 33 29 15 22 24 17 26 24 21 20 19 21 33 32 27 18 7 10 10 16 11 22 22 35 0 3 3 8 13 23 26 23 15 19 21 35 9 25 22 0 18 24 10 24 32 15 6 32 4 14 10 1 25 27 29 13 15 33 18 34 11 5 28 8 34 2 30 29 22 2 17 34 14 6 9 34 34 1 30 5 13 28 14 20 7 5 27 
 numPartitions is 37
14 18 10 8 28 25 7 36 28 33 5 0 16 33 6 6 35 24 33 17 2 13 19 16 30 6 33 18 14 8 18 26 19 16 22 2 26 32 7 3 0 20 4 21 27 16 30 26 8 1 4 14 36 6 8 32 21 27 25 28 9 4 19 12 35 20 8 24 23 14 26 1 34 11 1 35 23 14 17 30 23 4 34 8 27 0 23 9 29 31 21 26 25 25 27 26 33 30 5 36 23 4 14 15 31 21 22 0 30 18 11 7 6 34 19 0 13 36 22 0 33 30 23 9 35 8 21 1 32 15 1 17 27 17 11 19 31 19 24 36 27 34 15 2 23 14 28 10 22 21 34 6 26 17 4 20 2 26 22 1 25 21 16 33 16 15 24 20 25 23 28 32 33 2 2 34 27 9 9 11 29 26 35 29 15 35 26 19 36 23 16 27 22 32 32 22 34 22 14 20 2 
 numPartitions is 38
10 19 14 15 23 23 30 8 15 30 9 30 9 32 25 18 33 22 25 11 29 27 13 32 29 6 23 17 35 30 4 35 3 8 0 27 5 23 18 3 19 18 35 35 11 37 8 31 22 0 31 6 0 9 14 2 22 27 37 6 33 12 0 3 5 17 28 23 9 16 13 9 30 13 8 26 21 31 33 35 20 36 30 26 21 18 21 9 34 17 3 24 1 13 25 7 24 31 4 31 37 9 27 24 5 27 8 33 0 24 2 2 7 29 9 19 5 34 2 33 10 32 13 12 27 23 7 16 21 26 31 24 20 30 25 34 12 19 18 31 11 4 17 35 22 19 27 13 13 11 27 19 2 10 14 6 16 4 34 23 30 20 30 12 16 25 3 11 5 23 25 37 22 18 15 29 22 20 32 32 18 1 20 22 29 18 24 6 11 16 10 29 26 19 1 32 26 30 7 7 33 
 numPartitions is 39
23 6 31 35 10 25 29 25 14 14 29 38 3 8 31 13 3 23 28 9 33 21 31 27 23 7 15 36 33 11 5 16 34 13 23 11 1 22 17 21 6 13 23 6 3 12 3 37 18 9 24 11 37 24 37 19 34 27 11 33 17 17 25 15 18 6 5 14 23 14 36 35 0 24 30 2 15 25 7 36 12 16 29 8 11 30 19 16 27 31 2 37 5 18 35 0 28 37 4 17 25 8 8 34 21 28 31 16 22 17 8 34 3 5 6 38 15 10 9 14 38 33 27 32 13 21 30 32 18 15 28 13 25 1 23 28 4 29 15 3 3 38 22 23 35 32 6 10 3 35 12 4 1 18 27 30 31 27 26 0 6 35 37 17 7 4 7 24 14 22 33 36 24 16 5 11 37 5 34 35 21 29 31 8 35 22 26 6 33 1 22 4 21 5 7 10 2 26 4 32 24 
 numPartitions is 40
28 23 28 7 13 25 10 0 35 28 9 22 35 14 11 18 9 20 23 9 13 9 29 28 11 36 23 19 13 8 20 17 35 8 0 13 19 25 34 37 35 30 39 37 33 17 28 33 18 30 7 0 8 1 12 16 4 27 21 24 1 28 32 3 25 25 6 7 25 34 25 29 22 11 24 8 7 31 3 37 36 24 4 28 23 12 9 9 14 15 35 22 9 35 33 27 10 23 38 15 5 21 21 10 15 9 4 19 22 4 2 2 39 33 37 33 3 38 28 17 38 28 33 16 15 29 33 8 3 30 11 30 18 0 27 6 18 23 28 23 35 36 37 3 26 7 23 35 25 3 37 33 14 20 22 24 38 28 20 15 30 24 0 26 26 29 1 7 25 37 23 21 14 22 35 33 36 8 26 14 14 25 2 26 33 22 6 6 13 30 26 21 6 25 25 24 14 36 11 13 3 
 numPartitions is 41
38 27 2 9 36 9 25 37 5 30 28 6 16 17 20 3 32 13 2 25 21 4 3 13 6 18 39 17 19 34 17 6 0 17 38 29 31 3 28 14 4 24 20 3 9 10 16 23 29 4 26 29 27 33 35 7 5 13 5 30 37 19 23 21 2 15 24 16 28 29 40 34 26 15 18 26 1 26 4 13 27 11 40 34 4 14 11 15 34 17 19 3 22 22 4 10 16 21 24 27 15 14 26 23 13 34 33 40 37 9 26 32 32 1 27 34 37 30 34 26 39 6 16 22 18 13 39 10 31 11 38 25 29 15 29 3 7 26 35 33 20 26 32 37 31 36 38 16 11 21 2 39 22 6 34 11 16 5 25 40 11 28 19 14 28 32 18 26 8 25 13 4 30 24 0 16 21 10 36 16 23 19 11 38 23 11 33 33 0 25 6 28 27 7 22 36 0 17 16 11 1 
 numPartitions is 42
2 33 22 41 1 1 38 22 29 32 11 2 39 38 13 4 39 38 19 3 3 15 19 6 41 34 9 27 33 8 32 13 7 34 8 17 25 7 26 15 33 34 41 27 9 39 18 13 6 30 9 2 40 33 10 16 40 39 35 12 41 14 34 21 21 33 2 35 23 32 33 35 18 15 36 2 39 25 7 9 36 4 32 32 23 24 25 19 24 1 11 4 35 27 41 27 4 7 22 23 25 23 41 22 21 37 40 19 4 2 38 34 33 23 33 17 33 34 24 11 26 24 21 32 7 3 15 2 15 18 31 16 10 34 29 16 22 29 12 33 39 14 7 41 2 35 3 37 15 23 3 19 10 24 12 30 4 18 32 33 12 20 16 2 10 1 1 3 5 1 27 39 30 10 29 5 40 8 28 38 0 17 10 14 5 22 20 18 27 10 34 19 36 11 1 10 14 32 37 41 27 
 numPartitions is 43
2 35 22 29 6 14 36 10 36 35 4 33 35 3 5 37 20 23 4 25 32 23 14 8 25 35 33 15 32 18 25 28 26 30 36 42 35 2 28 33 29 38 25 32 2 39 41 8 23 2 17 12 31 36 32 18 0 18 23 1 24 25 40 7 11 2 28 36 7 35 16 35 32 5 10 33 31 23 31 10 24 22 39 10 4 11 35 33 19 35 0 27 8 6 29 13 36 26 19 36 31 18 26 24 9 13 42 22 21 3 14 22 21 34 34 33 11 39 3 30 9 9 34 8 26 24 6 29 14 26 35 32 32 7 28 19 24 15 15 37 22 35 25 13 15 2 11 8 38 41 16 7 18 39 40 39 19 41 17 27 35 11 4 6 22 35 29 11 11 20 40 35 4 30 15 42 34 23 26 37 17 17 14 12 4 13 31 15 4 30 27 22 20 28 22 5 8 3 9 0 25 
 numPartitions is 44
36 43 32 43 37 5 26 24 27 28 41 6 31 6 15 34 25 8 27 9 21 5 25 16 23 0 11 19 41 24 4 25 3 0 28 13 15 41 34 29 19 6 11 5 9 37 16 5 22 34 11 40 36 29 36 24 24 39 13 40 5 0 24 23 21 9 18 19 33 10 1 41 18 43 24 40 27 19 19 17 0 8 28 32 7 8 9 37 18 19 19 10 33 19 13 35 2 11 10 15 37 25 5 2 19 21 40 39 6 16 18 10 27 1 29 25 27 34 28 21 42 24 13 36 27 41 9 12 19 34 43 22 10 0 43 10 2 31 12 43 19 32 9 19 34 23 15 39 17 23 29 25 18 32 18 32 14 16 12 23 2 8 28 34 30 21 1 15 1 41 27 29 10 18 27 21 16 4 10 2 30 41 22 42 41 22 42 34 5 18 2 5 2 33 33 12 22 0 3 41 35 
 numPartitions is 45
23 18 13 2 43 10 35 40 5 23 14 2 0 29 1 43 24 20 43 39 33 9 19 33 26 16 18 24 33 38 5 22 40 43 20 8 34 25 44 12 30 40 29 12 33 42 3 13 18 30 27 35 13 21 22 16 19 27 11 39 41 23 37 33 30 30 11 17 5 44 30 29 12 36 39 23 42 1 28 42 36 19 14 38 23 27 19 34 24 25 20 7 29 15 8 27 10 43 43 20 25 11 11 10 30 19 19 4 37 14 32 37 24 38 42 38 33 13 33 17 8 33 3 11 10 39 33 23 18 0 16 10 28 25 2 31 13 8 18 3 30 26 22 23 26 32 33 10 30 8 27 43 4 0 27 24 28 33 5 15 15 14 40 41 1 19 16 27 20 22 33 6 9 7 20 23 1 8 16 29 39 20 22 11 8 7 41 6 18 25 16 1 21 5 40 19 14 11 16 23 18 
 numPartitions is 46
12 45 12 35 41 31 32 34 9 6 45 6 33 12 41 16 33 0 43 15 37 15 1 2 33 22 41 27 9 2 42 25 45 8 0 29 31 37 42 9 41 14 11 7 17 3 28 1 20 32 33 42 0 13 38 26 40 7 37 28 31 44 44 5 13 7 24 35 43 8 33 19 32 5 20 2 33 23 31 1 18 44 8 2 27 26 39 3 42 45 1 6 7 21 31 9 40 11 16 37 37 7 3 0 23 27 20 37 6 14 42 20 19 29 13 15 19 22 30 5 20 40 31 14 1 15 5 6 39 38 27 4 14 0 43 2 10 41 2 9 11 32 7 39 2 37 7 37 41 17 43 33 26 6 36 0 22 38 34 41 28 24 34 12 8 27 29 41 7 23 27 17 2 18 37 23 6 12 40 10 28 35 6 18 3 16 10 4 37 20 22 27 38 31 7 10 28 8 37 45 43 
 numPartitions is 47
5 20 40 24 42 21 12 37 10 44 20 39 15 26 29 34 3 24 28 34 46 29 38 12 17 0 40 38 44 39 32 9 2 39 37 22 19 25 30 31 33 5 20 11 4 16 18 35 14 1 16 28 39 28 32 4 25 29 44 18 23 28 24 11 35 23 1 43 45 11 3 39 27 26 11 37 42 12 39 1 10 24 13 6 2 8 35 11 14 22 30 13 28 18 26 40 24 24 16 20 0 6 30 6 25 35 2 15 19 7 32 18 25 38 24 14 9 39 30 10 4 0 40 22 40 42 11 21 2 38 19 19 9 13 4 32 13 3 20 11 2 20 36 43 43 15 24 0 34 27 19 30 39 37 40 42 29 12 19 44 27 10 32 46 19 41 13 28 2 7 15 33 34 37 26 11 33 11 26 34 38 9 7 0 46 3 43 22 0 15 22 44 36 24 34 18 30 15 8 30 25 
 numPartitions is 48
44 15 28 47 37 1 2 40 11 20 17 14 3 14 43 10 33 44 31 9 21 33 13 36 35 4 39 3 45 8 44 25 19 40 32 5 19 25 2 21 27 22 23 45 33 33 12 1 42 30 39 8 16 9 4 16 28 27 5 24 17 44 16 27 9 33 14 23 41 2 9 5 30 3 0 32 39 31 43 21 12 40 20 44 23 12 25 25 6 7 35 46 17 3 17 27 10 7 22 23 13 5 5 10 15 25 28 19 22 44 2 10 15 41 45 17 27 22 12 41 38 36 33 32 7 21 9 32 3 30 19 22 10 40 35 22 34 47 12 39 27 44 13 35 2 23 15 43 9 35 45 1 46 12 6 24 22 12 44 15 6 32 16 2 34 13 25 39 17 37 15 21 30 46 11 41 28 32 34 14 30 41 34 2 17 22 38 30 45 22 10 13 6 41 25 40 14 20 43 29 27 
 numPartitions is 49
9 47 43 27 36 8 3 8 22 46 46 37 25 45 6 46 4 3 19 3 38 36 26 48 6 41 44 27 5 1 11 48 42 41 1 17 4 28 26 15 33 20 13 20 9 11 25 20 27 16 9 9 40 47 38 37 47 39 35 26 41 21 41 0 21 40 16 21 37 25 19 42 4 8 29 9 46 39 21 16 29 39 46 32 16 10 25 12 38 8 46 11 28 48 6 34 4 14 36 16 32 44 6 8 0 9 33 5 46 2 3 41 33 44 5 24 26 34 38 11 26 24 42 18 21 45 8 44 1 11 38 16 45 34 1 9 43 8 26 19 46 0 42 27 44 0 3 37 1 23 31 26 24 3 19 23 25 39 18 47 26 48 23 9 31 22 1 31 47 29 13 32 16 17 22 26 5 22 14 24 21 45 10 28 33 1 20 32 27 45 41 12 43 11 29 31 14 39 9 34 20 
 numPartitions is 50
28 3 28 47 3 25 40 10 25 38 39 2 45 44 21 28 29 30 33 19 3 19 19 18 1 26 3 39 23 8 0 17 15 48 10 3 39 35 34 37 25 20 29 27 33 7 38 3 38 0 7 30 38 31 22 26 34 37 41 4 21 8 22 3 35 15 26 47 15 4 5 9 32 1 34 8 27 21 3 7 36 14 14 38 43 32 39 19 44 25 45 32 9 35 3 47 20 13 48 15 5 21 31 40 25 19 4 9 12 24 32 42 19 43 37 3 23 38 28 17 38 48 23 36 25 9 33 38 3 30 1 20 8 20 37 6 18 23 18 13 45 36 37 43 36 47 13 45 15 3 7 33 34 40 42 44 38 48 10 15 0 24 10 16 36 29 31 17 35 27 43 1 24 12 35 23 6 18 16 44 14 25 42 46 13 12 36 6 13 0 26 21 16 25 45 34 44 46 21 23 3 

@wbo4958 wbo4958 requested a review from HyukjinKwon September 13, 2022 12:47
@wbo4958 wbo4958 changed the title [WIP][SPARK-40407][SQL] Fix the potential data skew caused by df.repartition [SPARK-40407][SQL] Fix the potential data skew caused by df.repartition Sep 16, 2022
@wbo4958 wbo4958 marked this pull request as ready for review September 16, 2022 23:10
@wbo4958
Copy link
Contributor Author

wbo4958 commented Sep 17, 2022

@WeichenXu123 @HyukjinKwon can you help to review it?

@mridulm
Copy link
Contributor

mridulm commented Sep 18, 2022

Or, we can use what RDD.coalesce does currently.

@wbo4958
Copy link
Contributor Author

wbo4958 commented Sep 19, 2022

Or, we can use what RDD.coalesce does currently.

Good suggestion. Done. Thx. @mridulm

@WeichenXu123
Copy link
Contributor

@wbo4958

Issue: The xgboost code uses rdd barrier mode, but barrier mode does not work with coalesce operator.

@wbo4958
Copy link
Contributor Author

wbo4958 commented Sep 19, 2022

@wbo4958

Issue: The xgboost code uses rdd barrier mode, but barrier mode does not work with coalesce operator.

@mridulm just suggested using another random way borrowing from RDD.coalesce to get the starting position, it will not introduce any coalesce into repartition.

@wbo4958
Copy link
Contributor Author

wbo4958 commented Sep 20, 2022

@cloud-fan could you help to review it?

test("SPARK-40407: repartition should not result in severe data skew") {
val df = spark.range(0, 100, 1, 50).repartition(4)
val result = df.mapPartitions(iter => Iterator.single(iter.length)).collect()
assert(result.mkString(",") === "25,31,25,19")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd do assert(result.map(_.getInt(0)).sorted == Seq(19, 25, 25, 31))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Thx

@@ -299,7 +300,8 @@ object ShuffleExchangeExec {
def getPartitionKeyExtractor(): InternalRow => Any = newPartitioning match {
case RoundRobinPartitioning(numPartitions) =>
// Distributes elements evenly across output partitions, starting from a random partition.
var position = new Random(TaskContext.get().partitionId()).nextInt(numPartitions)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I may miss something. The original code should already produce different starting positions for different mapper tasks?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK I tried (1 to 200).foreach(partitionId => print(new Random(partitionId).nextInt(32) + " ")) and the result is very counterintuitive. A small change for the seed does not change the random result.

Can we add some comments to explain why we add hashing.byteswap32?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was fixed in SPARK-21782 for RDD - looks like the sql version did not leverage it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mridulm wow, good findings. I didn't realize there was a similar issue.

@wbo4958 wbo4958 force-pushed the roundrobin-data-skew branch from 62c99e8 to 0f1c677 Compare September 21, 2022 12:46
@wbo4958
Copy link
Contributor Author

wbo4958 commented Sep 21, 2022

@cloud-fan @mridulm could you help to review it again? Thx

@cloud-fan
Copy link
Contributor

@wbo4958 Can you add comments as I asked in https://github.com/apache/spark/pull/37855/files#r975993118 ?

@wbo4958
Copy link
Contributor Author

wbo4958 commented Sep 22, 2022

@wbo4958 Can you add comments as I asked in https://github.com/apache/spark/pull/37855/files#r975993118 ?

I added some comments from https://issues.apache.org/jira/browse/SPARK-21782.

@wbo4958 wbo4958 requested review from cloud-fan and mridulm and removed request for HyukjinKwon and cloud-fan September 22, 2022 12:22
@cloud-fan cloud-fan closed this in f6c4e58 Sep 22, 2022
cloud-fan pushed a commit that referenced this pull request Sep 22, 2022
### What changes were proposed in this pull request?

``` scala
val df = spark.range(0, 100, 1, 50).repartition(4)
val v = df.rdd.mapPartitions { iter => {
        Iterator.single(iter.length)
}.collect()
println(v.mkString(","))
```

The above simple code outputs `50,0,0,50`, which means there is no data in partition 1 and partition 2.

The RoundRobin seems to ensure to distribute the records evenly *in the same partition*, and not guarantee it between partitions.

Below is the code to generate the key

``` scala
      case RoundRobinPartitioning(numPartitions) =>
        // Distributes elements evenly across output partitions, starting from a random partition.
        var position = new Random(TaskContext.get().partitionId()).nextInt(numPartitions)
        (row: InternalRow) =>
{         // The HashPartitioner will handle the `mod` by the number of partitions
         position += 1
         position
 }
```

In this case, There are 50 partitions, each partition will only compute 2 elements. The issue for RoundRobin here is it always starts with position=2 to do the Roundrobin.

See the output of Random
``` scala
scala> (1 to 200).foreach(partitionId => print(new Random(partitionId).nextInt(4) + " "))  // the position is always 2.
2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2
```

Similarly, the below Random code also outputs the same value,

``` scala
(1 to 200).foreach(partitionId => print(new Random(partitionId).nextInt(2) + " "))
(1 to 200).foreach(partitionId => print(new Random(partitionId).nextInt(4) + " "))
(1 to 200).foreach(partitionId => print(new Random(partitionId).nextInt(8) + " "))
(1 to 200).foreach(partitionId => print(new Random(partitionId).nextInt(16) + " "))
(1 to 200).foreach(partitionId => print(new Random(partitionId).nextInt(32) + " "))
```

Consider partition 0, the total elements are [0, 1], so when shuffle writes, for element 0, the key will be (position + 1) = 2 + 1 = 3%4=3, the element 1, the key will be (position + 1)=(3+1)=4%4 = 0
consider partition 1, the total elements are [2, 3], so when shuffle writes, for element 2, the key will be (position + 1) = 2 + 1 = 3%4=3, the element 3, the key will be (position + 1)=(3+1)=4%4 = 0

The calculation is also applied for other left partitions since the starting position is always 2 for this case.

So, as you can see, each partition will write its elements to Partition [0, 3], which results in Partition [1, 2] without any data.

This PR changes the starting position of RoundRobin. The default position calculated by `new Random(partitionId).nextInt(numPartitions)` may always be the same for different partitions, which means each partition will output the data into the same keys when shuffle writes, and some keys may not have any data in some special cases.

### Why are the changes needed?

The PR can fix the data skew issue for the special cases.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Will add some tests and watch CI pass

Closes #37855 from wbo4958/roundrobin-data-skew.

Authored-by: Bobby Wang <wbo4958@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit f6c4e58)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@cloud-fan
Copy link
Contributor

thanks, merging to master/3.3/3.2!

cloud-fan pushed a commit that referenced this pull request Sep 22, 2022
### What changes were proposed in this pull request?

``` scala
val df = spark.range(0, 100, 1, 50).repartition(4)
val v = df.rdd.mapPartitions { iter => {
        Iterator.single(iter.length)
}.collect()
println(v.mkString(","))
```

The above simple code outputs `50,0,0,50`, which means there is no data in partition 1 and partition 2.

The RoundRobin seems to ensure to distribute the records evenly *in the same partition*, and not guarantee it between partitions.

Below is the code to generate the key

``` scala
      case RoundRobinPartitioning(numPartitions) =>
        // Distributes elements evenly across output partitions, starting from a random partition.
        var position = new Random(TaskContext.get().partitionId()).nextInt(numPartitions)
        (row: InternalRow) =>
{         // The HashPartitioner will handle the `mod` by the number of partitions
         position += 1
         position
 }
```

In this case, There are 50 partitions, each partition will only compute 2 elements. The issue for RoundRobin here is it always starts with position=2 to do the Roundrobin.

See the output of Random
``` scala
scala> (1 to 200).foreach(partitionId => print(new Random(partitionId).nextInt(4) + " "))  // the position is always 2.
2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2
```

Similarly, the below Random code also outputs the same value,

``` scala
(1 to 200).foreach(partitionId => print(new Random(partitionId).nextInt(2) + " "))
(1 to 200).foreach(partitionId => print(new Random(partitionId).nextInt(4) + " "))
(1 to 200).foreach(partitionId => print(new Random(partitionId).nextInt(8) + " "))
(1 to 200).foreach(partitionId => print(new Random(partitionId).nextInt(16) + " "))
(1 to 200).foreach(partitionId => print(new Random(partitionId).nextInt(32) + " "))
```

Consider partition 0, the total elements are [0, 1], so when shuffle writes, for element 0, the key will be (position + 1) = 2 + 1 = 3%4=3, the element 1, the key will be (position + 1)=(3+1)=4%4 = 0
consider partition 1, the total elements are [2, 3], so when shuffle writes, for element 2, the key will be (position + 1) = 2 + 1 = 3%4=3, the element 3, the key will be (position + 1)=(3+1)=4%4 = 0

The calculation is also applied for other left partitions since the starting position is always 2 for this case.

So, as you can see, each partition will write its elements to Partition [0, 3], which results in Partition [1, 2] without any data.

This PR changes the starting position of RoundRobin. The default position calculated by `new Random(partitionId).nextInt(numPartitions)` may always be the same for different partitions, which means each partition will output the data into the same keys when shuffle writes, and some keys may not have any data in some special cases.

### Why are the changes needed?

The PR can fix the data skew issue for the special cases.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Will add some tests and watch CI pass

Closes #37855 from wbo4958/roundrobin-data-skew.

Authored-by: Bobby Wang <wbo4958@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit f6c4e58)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@wbo4958
Copy link
Contributor Author

wbo4958 commented Sep 22, 2022

Thx.

@wbo4958 wbo4958 deleted the roundrobin-data-skew branch September 22, 2022 13:01
@zhengruifeng
Copy link
Contributor

Good catch!
seems we can also simply switch to XORShiftRandom which always hash the seeds

scala> (1 to 200).map(partitionId => new Random(partitionId).nextInt(4))
val res3: IndexedSeq[Int] = Vector(2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2)

scala> (1 to 200).map(partitionId => new XORShiftRandom(partitionId).nextInt(4))
val res4: IndexedSeq[Int] = Vector(0, 3, 2, 2, 3, 3, 1, 2, 2, 0, 2, 2, 2, 1, 2, 2, 0, 3, 2, 0, 3, 3, 1, 1, 1, 1, 0, 0, 2, 3, 1, 3, 0, 1, 1, 2, 0, 2, 1, 3, 3, 0, 0, 1, 1, 3, 0, 2, 1, 1, 2, 0, 1, 3, 0, 0, 0, 1, 0, 3, 2, 1, 3, 3, 3, 0, 1, 0, 3, 0, 0, 0, 3, 0, 2, 1, 0, 2, 0, 2, 2, 0, 3, 0, 2, 3, 0, 1, 1, 1, 1, 3, 3, 3, 3, 3, 0, 1, 3, 1, 3, 0, 2, 1, 2, 1, 3, 0, 1, 1, 0, 2, 0, 0, 3, 0, 2, 1, 1, 2, 0, 3, 1, 3, 1, 0, 0, 2, 3, 1, 2, 1, 3, 0, 1, 2, 2, 2, 1, 3, 3, 3, 0, 2, 2, 3, 0, 1, 0, 3, 0, 2, 1, 2, 1, 2, 1, 1, 3, 3, 0, 2, 0, 0, 1, 1, 1, 1, 2, 0, 0, 1, 0, 1, 3, 0, 3, 1, 1, 3, 2, 3, 1, 2, 3, 0, 3, 3, 2, 3, 0, 0, 1, 1, 0, 2, 1, 0, 0, 2)

@wbo4958
Copy link
Contributor Author

wbo4958 commented Sep 28, 2022

Good catch! seems we can also simply switch to XORShiftRandom which always hash the seeds

scala> (1 to 200).map(partitionId => new Random(partitionId).nextInt(4))
val res3: IndexedSeq[Int] = Vector(2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2)

scala> (1 to 200).map(partitionId => new XORShiftRandom(partitionId).nextInt(4))
val res4: IndexedSeq[Int] = Vector(0, 3, 2, 2, 3, 3, 1, 2, 2, 0, 2, 2, 2, 1, 2, 2, 0, 3, 2, 0, 3, 3, 1, 1, 1, 1, 0, 0, 2, 3, 1, 3, 0, 1, 1, 2, 0, 2, 1, 3, 3, 0, 0, 1, 1, 3, 0, 2, 1, 1, 2, 0, 1, 3, 0, 0, 0, 1, 0, 3, 2, 1, 3, 3, 3, 0, 1, 0, 3, 0, 0, 0, 3, 0, 2, 1, 0, 2, 0, 2, 2, 0, 3, 0, 2, 3, 0, 1, 1, 1, 1, 3, 3, 3, 3, 3, 0, 1, 3, 1, 3, 0, 2, 1, 2, 1, 3, 0, 1, 1, 0, 2, 0, 0, 3, 0, 2, 1, 1, 2, 0, 3, 1, 3, 1, 0, 0, 2, 3, 1, 2, 1, 3, 0, 1, 2, 2, 2, 1, 3, 3, 3, 0, 2, 2, 3, 0, 1, 0, 3, 0, 2, 1, 2, 1, 2, 1, 1, 3, 3, 0, 2, 0, 0, 1, 1, 1, 1, 2, 0, 0, 1, 0, 1, 3, 0, 3, 1, 1, 3, 2, 3, 1, 2, 3, 0, 3, 3, 2, 3, 0, 0, 1, 1, 0, 2, 1, 0, 0, 2)

Yeah, I tried XORShiftRandom is working well, see #37855 (comment)

@dongjoon-hyun
Copy link
Member

Hi, @cloud-fan, @wangyum and all.
I switched SPARK-40407 to Bug according to the context.

sunchao pushed a commit to sunchao/spark that referenced this pull request Jun 2, 2023
### What changes were proposed in this pull request?

``` scala
val df = spark.range(0, 100, 1, 50).repartition(4)
val v = df.rdd.mapPartitions { iter => {
        Iterator.single(iter.length)
}.collect()
println(v.mkString(","))
```

The above simple code outputs `50,0,0,50`, which means there is no data in partition 1 and partition 2.

The RoundRobin seems to ensure to distribute the records evenly *in the same partition*, and not guarantee it between partitions.

Below is the code to generate the key

``` scala
      case RoundRobinPartitioning(numPartitions) =>
        // Distributes elements evenly across output partitions, starting from a random partition.
        var position = new Random(TaskContext.get().partitionId()).nextInt(numPartitions)
        (row: InternalRow) =>
{         // The HashPartitioner will handle the `mod` by the number of partitions
         position += 1
         position
 }
```

In this case, There are 50 partitions, each partition will only compute 2 elements. The issue for RoundRobin here is it always starts with position=2 to do the Roundrobin.

See the output of Random
``` scala
scala> (1 to 200).foreach(partitionId => print(new Random(partitionId).nextInt(4) + " "))  // the position is always 2.
2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2
```

Similarly, the below Random code also outputs the same value,

``` scala
(1 to 200).foreach(partitionId => print(new Random(partitionId).nextInt(2) + " "))
(1 to 200).foreach(partitionId => print(new Random(partitionId).nextInt(4) + " "))
(1 to 200).foreach(partitionId => print(new Random(partitionId).nextInt(8) + " "))
(1 to 200).foreach(partitionId => print(new Random(partitionId).nextInt(16) + " "))
(1 to 200).foreach(partitionId => print(new Random(partitionId).nextInt(32) + " "))
```

Consider partition 0, the total elements are [0, 1], so when shuffle writes, for element 0, the key will be (position + 1) = 2 + 1 = 3%4=3, the element 1, the key will be (position + 1)=(3+1)=4%4 = 0
consider partition 1, the total elements are [2, 3], so when shuffle writes, for element 2, the key will be (position + 1) = 2 + 1 = 3%4=3, the element 3, the key will be (position + 1)=(3+1)=4%4 = 0

The calculation is also applied for other left partitions since the starting position is always 2 for this case.

So, as you can see, each partition will write its elements to Partition [0, 3], which results in Partition [1, 2] without any data.

This PR changes the starting position of RoundRobin. The default position calculated by `new Random(partitionId).nextInt(numPartitions)` may always be the same for different partitions, which means each partition will output the data into the same keys when shuffle writes, and some keys may not have any data in some special cases.

### Why are the changes needed?

The PR can fix the data skew issue for the special cases.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Will add some tests and watch CI pass

Closes apache#37855 from wbo4958/roundrobin-data-skew.

Authored-by: Bobby Wang <wbo4958@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit f6c4e58)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants