Skip to content

Commit

Permalink
Spark 3.4: Multiple shuffle partitions per file in compaction (#7897)
Browse files Browse the repository at this point in the history
  • Loading branch information
aokolnychyi authored Jun 27, 2023
1 parent f5bb0c0 commit d98e7a1
Show file tree
Hide file tree
Showing 7 changed files with 242 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import org.apache.spark.sql.catalyst.plans.logical.DropTag
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.logical.MergeRows
import org.apache.spark.sql.catalyst.plans.logical.NoStatsUnaryNode
import org.apache.spark.sql.catalyst.plans.logical.OrderAwareCoalesce
import org.apache.spark.sql.catalyst.plans.logical.ReplaceIcebergData
import org.apache.spark.sql.catalyst.plans.logical.ReplacePartitionField
import org.apache.spark.sql.catalyst.plans.logical.SetIdentifierFields
Expand All @@ -47,6 +48,7 @@ import org.apache.spark.sql.catalyst.plans.logical.UpdateRows
import org.apache.spark.sql.catalyst.plans.logical.WriteIcebergDelta
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.connector.catalog.TableCatalog
import org.apache.spark.sql.execution.OrderAwareCoalesceExec
import org.apache.spark.sql.execution.SparkPlan
import scala.jdk.CollectionConverters._

Expand Down Expand Up @@ -111,6 +113,9 @@ case class ExtendedDataSourceV2Strategy(spark: SparkSession) extends Strategy wi
case NoStatsUnaryNode(child) =>
planLater(child) :: Nil

case OrderAwareCoalesce(numPartitions, coalescer, child) =>
OrderAwareCoalesceExec(numPartitions, coalescer, planLater(child)) :: Nil

case _ => Nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,41 @@ public void testRewriteDataFilesWithSortStrategy() {
assertEquals("Data after compaction should not change", expectedRecords, actualRecords);
}

@Test
public void testRewriteDataFilesWithSortStrategyAndMultipleShufflePartitionsPerFile() {
createTable();
insertData(10 /* file count */);

List<Object[]> output =
sql(
"CALL %s.system.rewrite_data_files("
+ " table => '%s', "
+ " strategy => 'sort', "
+ " sort_order => 'c1', "
+ " options => map('shuffle-partitions-per-file', '2'))",
catalogName, tableIdent);

assertEquals(
"Action should rewrite 10 data files and add 1 data files",
row(10, 1),
Arrays.copyOf(output.get(0), 2));

// as there is only one small output file, validate the query ordering (it will not change)
ImmutableList<Object[]> expectedRows =
ImmutableList.of(
row(1, "foo", null),
row(1, "foo", null),
row(1, "foo", null),
row(1, "foo", null),
row(1, "foo", null),
row(2, "bar", null),
row(2, "bar", null),
row(2, "bar", null),
row(2, "bar", null),
row(2, "bar", null));
assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s", tableName));
}

@Test
public void testRewriteDataFilesWithZOrder() {
createTable();
Expand Down Expand Up @@ -225,6 +260,42 @@ public void testRewriteDataFilesWithZOrder() {
assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s", tableName));
}

@Test
public void testRewriteDataFilesWithZOrderAndMultipleShufflePartitionsPerFile() {
createTable();
insertData(10 /* file count */);

List<Object[]> output =
sql(
"CALL %s.system.rewrite_data_files("
+ " table => '%s', "
+ "strategy => 'sort', "
+ " sort_order => 'zorder(c1, c2)', "
+ " options => map('shuffle-partitions-per-file', '2'))",
catalogName, tableIdent);

assertEquals(
"Action should rewrite 10 data files and add 1 data files",
row(10, 1),
Arrays.copyOf(output.get(0), 2));

// due to z-ordering, the data will be written in the below order
// as there is only one small output file, validate the query ordering (it will not change)
ImmutableList<Object[]> expectedRows =
ImmutableList.of(
row(2, "bar", null),
row(2, "bar", null),
row(2, "bar", null),
row(2, "bar", null),
row(2, "bar", null),
row(1, "foo", null),
row(1, "foo", null),
row(1, "foo", null),
row(1, "foo", null),
row(1, "foo", null));
assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s", tableName));
}

@Test
public void testRewriteDataFilesWithFilter() {
createTable();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.apache.spark.sql.catalyst.plans.logical.OrderAwareCoalesce;
import org.apache.spark.sql.catalyst.plans.logical.OrderAwareCoalescer;
import org.apache.spark.sql.connector.distributions.Distribution;
import org.apache.spark.sql.connector.distributions.Distributions;
import org.apache.spark.sql.connector.distributions.OrderedDistribution;
Expand All @@ -59,7 +61,24 @@ abstract class SparkShufflingDataRewriter extends SparkSizeBasedDataRewriter {

public static final double COMPRESSION_FACTOR_DEFAULT = 1.0;

/**
* The number of shuffle partitions to use for each output file. By default, this file rewriter
* assumes each shuffle partition would become a separate output file. Attempting to generate
* large output files of 512 MB or higher may strain the memory resources of the cluster as such
* rewrites would require lots of Spark memory. This parameter can be used to further divide up
* the data which will end up in a single file. For example, if the target file size is 2 GB, but
* the cluster can only handle shuffles of 512 MB, this parameter could be set to 4. Iceberg will
* use a custom coalesce operation to stitch these sorted partitions back together into a single
* sorted file.
*
* <p>Note using this parameter requires enabling Iceberg Spark session extensions.
*/
public static final String SHUFFLE_PARTITIONS_PER_FILE = "shuffle-partitions-per-file";

public static final int SHUFFLE_PARTITIONS_PER_FILE_DEFAULT = 1;

private double compressionFactor;
private int numShufflePartitionsPerFile;

protected SparkShufflingDataRewriter(SparkSession spark, Table table) {
super(spark, table);
Expand All @@ -75,13 +94,15 @@ public Set<String> validOptions() {
return ImmutableSet.<String>builder()
.addAll(super.validOptions())
.add(COMPRESSION_FACTOR)
.add(SHUFFLE_PARTITIONS_PER_FILE)
.build();
}

@Override
public void init(Map<String, String> options) {
super.init(options);
this.compressionFactor = compressionFactor(options);
this.numShufflePartitionsPerFile = numShufflePartitionsPerFile(options);
}

@Override
Expand Down Expand Up @@ -114,7 +135,16 @@ private Function<Dataset<Row>, Dataset<Row>> sortFunction(List<FileScanTask> gro
private LogicalPlan sortPlan(LogicalPlan plan, SortOrder[] ordering, int numShufflePartitions) {
SparkFunctionCatalog catalog = SparkFunctionCatalog.get();
OrderedWrite write = new OrderedWrite(ordering, numShufflePartitions);
return DistributionAndOrderingUtils$.MODULE$.prepareQuery(write, plan, Option.apply(catalog));
LogicalPlan sortPlan =
DistributionAndOrderingUtils$.MODULE$.prepareQuery(write, plan, Option.apply(catalog));

if (numShufflePartitionsPerFile == 1) {
return sortPlan;
} else {
OrderAwareCoalescer coalescer = new OrderAwareCoalescer(numShufflePartitionsPerFile);
int numOutputPartitions = numShufflePartitions / numShufflePartitionsPerFile;
return new OrderAwareCoalesce(numOutputPartitions, coalescer, sortPlan);
}
}

private Dataset<Row> transformPlan(Dataset<Row> df, Function<LogicalPlan, LogicalPlan> func) {
Expand All @@ -134,7 +164,7 @@ private org.apache.iceberg.SortOrder outputSortOrder(List<FileScanTask> group) {

private int numShufflePartitions(List<FileScanTask> group) {
int numOutputFiles = (int) numOutputFiles((long) (inputSize(group) * compressionFactor));
return Math.max(1, numOutputFiles);
return Math.max(1, numOutputFiles * numShufflePartitionsPerFile);
}

private double compressionFactor(Map<String, String> options) {
Expand All @@ -145,6 +175,19 @@ private double compressionFactor(Map<String, String> options) {
return value;
}

private int numShufflePartitionsPerFile(Map<String, String> options) {
int value =
PropertyUtil.propertyAsInt(
options, SHUFFLE_PARTITIONS_PER_FILE, SHUFFLE_PARTITIONS_PER_FILE_DEFAULT);
Preconditions.checkArgument(
value > 0, "'%s' is set to %s but must be > 0", SHUFFLE_PARTITIONS_PER_FILE, value);
Preconditions.checkArgument(
value == 1 || Spark3Util.extensionsEnabled(spark()),
"Using '%s' requires enabling Iceberg Spark session extensions",
SHUFFLE_PARTITIONS_PER_FILE);
return value;
}

private static class OrderedWrite implements RequiresDistributionAndOrdering {
private final OrderedDistribution distribution;
private final SortOrder[] ordering;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark.sql.catalyst.plans.logical

import org.apache.spark.rdd.PartitionCoalescer
import org.apache.spark.rdd.PartitionGroup
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions.Attribute

// this node doesn't extend RepartitionOperation on purpose to keep this logic isolated
// and ignore it in optimizer rules such as CollapseRepartition
case class OrderAwareCoalesce(
numPartitions: Int,
coalescer: PartitionCoalescer,
child: LogicalPlan) extends OrderPreservingUnaryNode {

override def output: Seq[Attribute] = child.output

override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = {
copy(child = newChild)
}
}

class OrderAwareCoalescer(val groupSize: Int) extends PartitionCoalescer with Serializable {

override def coalesce(maxPartitions: Int, parent: RDD[_]): Array[PartitionGroup] = {
val partitionBins = parent.partitions.grouped(groupSize)
partitionBins.map { partitions =>
val group = new PartitionGroup()
group.partitions ++= partitions
group
}.toArray
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark.sql.execution

import org.apache.spark.rdd.PartitionCoalescer
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.expressions.SortOrder
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.catalyst.plans.physical.SinglePartition
import org.apache.spark.sql.catalyst.plans.physical.UnknownPartitioning

case class OrderAwareCoalesceExec(
numPartitions: Int,
coalescer: PartitionCoalescer,
child: SparkPlan) extends UnaryExecNode {

override def output: Seq[Attribute] = child.output

override def outputOrdering: Seq[SortOrder] = child.outputOrdering

override def outputPartitioning: Partitioning = {
if (numPartitions == 1) SinglePartition else UnknownPartitioning(numPartitions)
}

protected override def doExecute(): RDD[InternalRow] = {
val result = child.execute()
if (numPartitions == 1 && result.getNumPartitions < 1) {
// make sure we don't output an RDD with 0 partitions,
// when claiming that we have a `SinglePartition`
// see CoalesceExec in Spark
new CoalesceExec.EmptyRDDWithPartitions(sparkContext, numPartitions)
} else {
result.coalesce(numPartitions, shuffle = false, Some(coalescer))
}
}

override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = {
copy(child = newChild)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -894,6 +894,15 @@ public void testInvalidOptions() {
() -> basicRewrite(table).option(RewriteDataFiles.REWRITE_JOB_ORDER, "foo").execute())
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Invalid rewrite job order name: foo");

Assertions.assertThatThrownBy(
() ->
basicRewrite(table)
.sort(SortOrder.builderFor(table.schema()).asc("c2").build())
.option(SparkShufflingDataRewriter.SHUFFLE_PARTITIONS_PER_FILE, "5")
.execute())
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("requires enabling Iceberg Spark session extensions");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ public void testSortDataValidOptions() {
Assert.assertEquals(
"Rewriter must report all supported options",
ImmutableSet.of(
SparkSortDataRewriter.SHUFFLE_PARTITIONS_PER_FILE,
SparkSortDataRewriter.TARGET_FILE_SIZE_BYTES,
SparkSortDataRewriter.MIN_FILE_SIZE_BYTES,
SparkSortDataRewriter.MAX_FILE_SIZE_BYTES,
Expand All @@ -281,6 +282,7 @@ public void testZOrderDataValidOptions() {
Assert.assertEquals(
"Rewriter must report all supported options",
ImmutableSet.of(
SparkZOrderDataRewriter.SHUFFLE_PARTITIONS_PER_FILE,
SparkZOrderDataRewriter.TARGET_FILE_SIZE_BYTES,
SparkZOrderDataRewriter.MIN_FILE_SIZE_BYTES,
SparkZOrderDataRewriter.MAX_FILE_SIZE_BYTES,
Expand Down

0 comments on commit d98e7a1

Please sign in to comment.