Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Enable Comet shuffle with AQE coalesce partitions #651

Closed
wants to merge 31 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
2 changes: 1 addition & 1 deletion .github/workflows/spark_sql_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ jobs:
spark-sql-catalyst:
strategy:
matrix:
os: [ubuntu-latest]
os: [ubuntu-24.04]
java-version: [11]
spark-version: [{short: '3.4', full: '3.4.3'}, {short: '3.5', full: '3.5.1'}]
module:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/spark_sql_test_ansi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ jobs:
spark-sql-catalyst:
strategy:
matrix:
os: [ubuntu-latest]
os: [ubuntu-24.04]
java-version: [17]
spark-version: [{short: '4.0', full: '4.0.0-preview1'}]
module:
Expand Down
11 changes: 6 additions & 5 deletions common/src/main/java/org/apache/comet/parquet/BatchReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@

import org.apache.arrow.c.CometSchemaImporter;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
Expand All @@ -69,9 +68,11 @@
import org.apache.spark.sql.execution.metric.SQLMetric;
import org.apache.spark.sql.types.*;
import org.apache.spark.sql.vectorized.ColumnarBatch;
import org.apache.spark.sql.vectorized.CometColumnarBatch;
import org.apache.spark.util.AccumulatorV2;

import org.apache.comet.CometConf;
import org.apache.comet.package$;
import org.apache.comet.shims.ShimBatchReader;
import org.apache.comet.shims.ShimFileFormat;
import org.apache.comet.vector.CometVector;
Expand All @@ -96,7 +97,7 @@
*/
public class BatchReader extends RecordReader<Void, ColumnarBatch> implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(FileReader.class);
protected static final BufferAllocator ALLOCATOR = new RootAllocator();
protected static final BufferAllocator ALLOCATOR = package$.MODULE$.CometArrowAllocator();

private Configuration conf;
private int capacity;
Expand All @@ -114,7 +115,7 @@ public class BatchReader extends RecordReader<Void, ColumnarBatch> implements Cl
private CometVector[] vectors;
private AbstractColumnReader[] columnReaders;
private CometSchemaImporter importer;
private ColumnarBatch currentBatch;
private CometColumnarBatch currentBatch;
private Future<Option<Throwable>> prefetchTask;
private LinkedBlockingQueue<Pair<PageReadStore, Long>> prefetchQueue;
private FileReader fileReader;
Expand Down Expand Up @@ -195,7 +196,7 @@ public BatchReader(AbstractColumnReader[] columnReaders) {
int numColumns = columnReaders.length;
this.columnReaders = new AbstractColumnReader[numColumns];
vectors = new CometVector[numColumns];
currentBatch = new ColumnarBatch(vectors);
currentBatch = new CometColumnarBatch(vectors);
// This constructor is used by Iceberg only. The columnReaders are
// initialized in Iceberg, so no need to call the init()
isInitialized = true;
Expand Down Expand Up @@ -340,7 +341,7 @@ public void init() throws URISyntaxException, IOException {
}

vectors = new CometVector[numColumns];
currentBatch = new ColumnarBatch(vectors);
currentBatch = new CometColumnarBatch(vectors);
fileReader.setRequestedSchema(requestedSchema.getColumns());

// For test purpose only
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,17 @@
import org.apache.arrow.c.ArrowSchema;
import org.apache.arrow.c.Data;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.FieldVector;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.spark.sql.types.DataType;

import org.apache.comet.package$;
import org.apache.comet.vector.CometPlainVector;
import org.apache.comet.vector.CometVector;

/** A metadata column reader that can be extended by {@link RowIndexColumnReader} etc. */
public class MetadataColumnReader extends AbstractColumnReader {
private final BufferAllocator allocator = new RootAllocator();
private final BufferAllocator allocator = package$.MODULE$.CometArrowAllocator();
private CometVector vector;

public MetadataColumnReader(DataType type, ColumnDescriptor descriptor, boolean useDecimal128) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark.sql.vectorized;

public class CometColumnarBatch extends ColumnarBatch {
/**
* This special `ColumnarBatch` is only used by Comet `BatchReader` which reuses the same
* `ColumnVector`s for each batch if possible.
*/
@Override
public void close() {
// no-op
}

public CometColumnarBatch(ColumnVector[] columns) {
this(columns, 0);
}

/**
* Create a new batch from existing column vectors.
*
* @param columns The columns of this batch
* @param numRows The number of rows in this batch
*/
public CometColumnarBatch(ColumnVector[] columns, int numRows) {
super(columns, numRows);
}
}
11 changes: 0 additions & 11 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -204,17 +204,6 @@ object CometConf extends ShimCometConf {
.checkValues(Set("native", "jvm", "auto"))
.createWithDefault("jvm")

val COMET_SHUFFLE_ENFORCE_MODE_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.shuffle.enforceMode.enabled")
.doc(
"Comet shuffle doesn't support Spark AQE coalesce partitions. If AQE coalesce " +
"partitions is enabled, Comet shuffle won't be triggered even enabled. This config " +
"is used to enforce Comet to trigger shuffle even if AQE coalesce partitions is " +
"enabled. This is for testing purpose only.")
.internal()
.booleanConf
.createWithDefault(false)

val COMET_EXEC_BROADCAST_FORCE_ENABLED: ConfigEntry[Boolean] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.broadcast.enabled")
.doc(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

package org.apache.spark.sql.comet.execution.arrow

import org.apache.arrow.memory.{BufferAllocator, RootAllocator}
import org.apache.arrow.memory.BufferAllocator
import org.apache.arrow.vector.VectorSchemaRoot
import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
Expand All @@ -28,11 +28,11 @@ import org.apache.spark.sql.comet.util.Utils
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.vectorized.ColumnarBatch

import org.apache.comet.CometArrowAllocator
import org.apache.comet.vector.NativeUtil

object CometArrowConverters extends Logging {
// TODO: we should reuse the same root allocator in the comet code base?
val rootAllocator: BufferAllocator = new RootAllocator(Long.MaxValue)
val rootAllocator: BufferAllocator = CometArrowAllocator

// This is similar how Spark converts internal row to Arrow format except that it is transforming
// the result batch to Comet's ColumnarBatch instead of serialized bytes.
Expand Down
Loading
Loading