Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into spark-4.0-widening
Browse files Browse the repository at this point in the history
  • Loading branch information
kazuyukitanimura committed Jul 10, 2024
2 parents 7329648 + 15e7baa commit a65b38b
Show file tree
Hide file tree
Showing 561 changed files with 28,897 additions and 7,487 deletions.
2 changes: 1 addition & 1 deletion .github/actions/java-test/action.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ runs:
- name: Run Cargo build
shell: bash
run: |
cd core
cd native
cargo build
- name: Cache Maven dependencies
Expand Down
14 changes: 10 additions & 4 deletions .github/actions/rust-test/action.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,27 @@ runs:
- name: Check Cargo fmt
shell: bash
run: |
cd core
cd native
cargo fmt --all -- --check --color=never
- name: Check Cargo clippy
shell: bash
run: |
cd core
cd native
cargo clippy --color=never -- -D warnings
- name: Check compilation
shell: bash
run: |
cd core
cd native
cargo check --benches
- name: Check unused dependencies
shell: bash
run: |
cd native
cargo install cargo-machete && cargo machete
- name: Cache Maven dependencies
uses: actions/cache@v4
with:
Expand All @@ -56,5 +62,5 @@ runs:
- name: Run Cargo test
shell: bash
run: |
cd core
cd native
RUST_BACKTRACE=1 cargo test
6 changes: 3 additions & 3 deletions .github/workflows/benchmark-tpch.yml
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ jobs:
with:
name: libcomet-${{ github.run_id }}
path: |
core/target/release/libcomet.so
core/target/release/libcomet.dylib
native/target/release/libcomet.so
native/target/release/libcomet.dylib
retention-days: 1 # remove the artifact after 1 day, only valid for this workflow
overwrite: true
- name: Generate TPC-H (SF=1) table data
Expand Down Expand Up @@ -119,7 +119,7 @@ jobs:
uses: actions/download-artifact@v4
with:
name: libcomet-${{ github.run_id }}
path: core/target/release
path: native/target/release
- name: Run TPC-H queries
run: |
SPARK_HOME=`pwd` SPARK_TPCH_DATA=`pwd`/tpch/sf1_parquet ./mvnw -B -Prelease -Dsuites=org.apache.spark.sql.CometTPCHQuerySuite test
6 changes: 3 additions & 3 deletions .github/workflows/benchmark.yml
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ jobs:
with:
name: libcomet-${{ github.run_id }}
path: |
core/target/release/libcomet.so
core/target/release/libcomet.dylib
native/target/release/libcomet.so
native/target/release/libcomet.dylib
retention-days: 1 # remove the artifact after 1 day, only valid for this workflow
overwrite: true
- name: Build tpcds-kit
Expand Down Expand Up @@ -134,7 +134,7 @@ jobs:
uses: actions/download-artifact@v4
with:
name: libcomet-${{ github.run_id }}
path: core/target/release
path: native/target/release
- name: Run TPC-DS queries (Sort merge join)
if: matrix.join == 'sort_merge'
run: |
Expand Down
3 changes: 2 additions & 1 deletion .github/workflows/spark_sql_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ jobs:
matrix:
os: [ubuntu-latest]
java-version: [11]
spark-version: [{short: '3.4', full: '3.4.3'}]
spark-version: [{short: '3.4', full: '3.4.3'}, {short: '3.5', full: '3.5.1'}]
module:
- {name: "catalyst", args1: "catalyst/test", args2: ""}
- {name: "sql/core-1", args1: "", args2: sql/testOnly * -- -l org.apache.spark.tags.ExtendedSQLTest -l org.apache.spark.tags.SlowSQLTest}
Expand Down Expand Up @@ -75,6 +75,7 @@ jobs:
- name: Run Spark tests
run: |
cd apache-spark
rm -rf /root/.m2/repository/org/apache/parquet # somehow parquet cache requires cleanups
ENABLE_COMET=true build/sbt ${{ matrix.module.args1 }} "${{ matrix.module.args2 }}"
env:
LC_ALL: "C.UTF-8"
Expand Down
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ derby.log
metastore_db/
spark-warehouse/
dependency-reduced-pom.xml
core/src/execution/generated
native/core/src/execution/generated
prebuild
.flattened-pom.xml
rat.txt
Expand Down
36 changes: 18 additions & 18 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -20,65 +20,65 @@
all: core jvm

core:
cd core && cargo build
cd native && cargo build
test-rust:
# We need to compile CometException so that the cargo test can pass
./mvnw compile -pl common -DskipTests $(PROFILES)
cd core && cargo build && \
cd native && cargo build && \
RUST_BACKTRACE=1 cargo test
jvm:
./mvnw clean package -DskipTests $(PROFILES)
test-jvm: core
SPARK_HOME=`pwd` COMET_CONF_DIR=$(shell pwd)/conf RUST_BACKTRACE=1 ./mvnw verify $(PROFILES)
test: test-rust test-jvm
clean:
cd core && cargo clean
cd native && cargo clean
./mvnw clean $(PROFILES)
rm -rf .dist
bench:
cd core && RUSTFLAGS="-Ctarget-cpu=native" cargo bench $(filter-out $@,$(MAKECMDGOALS))
cd native && RUSTFLAGS="-Ctarget-cpu=native" cargo bench $(filter-out $@,$(MAKECMDGOALS))
format:
cd core && cargo fmt
cd native && cargo fmt
./mvnw compile test-compile scalafix:scalafix -Psemanticdb $(PROFILES)
./mvnw spotless:apply $(PROFILES)

core-amd64:
rustup target add x86_64-apple-darwin
cd core && RUSTFLAGS="-Ctarget-cpu=skylake -Ctarget-feature=-prefer-256-bit" CC=o64-clang CXX=o64-clang++ cargo build --target x86_64-apple-darwin --release
cd native && RUSTFLAGS="-Ctarget-cpu=skylake -Ctarget-feature=-prefer-256-bit" CC=o64-clang CXX=o64-clang++ cargo build --target x86_64-apple-darwin --release
mkdir -p common/target/classes/org/apache/comet/darwin/x86_64
cp core/target/x86_64-apple-darwin/release/libcomet.dylib common/target/classes/org/apache/comet/darwin/x86_64
cd core && RUSTFLAGS="-Ctarget-cpu=haswell -Ctarget-feature=-prefer-256-bit" cargo build --release
cp native/target/x86_64-apple-darwin/release/libcomet.dylib common/target/classes/org/apache/comet/darwin/x86_64
cd native && RUSTFLAGS="-Ctarget-cpu=haswell -Ctarget-feature=-prefer-256-bit" cargo build --release
mkdir -p common/target/classes/org/apache/comet/linux/amd64
cp core/target/release/libcomet.so common/target/classes/org/apache/comet/linux/amd64
cp native/target/release/libcomet.so common/target/classes/org/apache/comet/linux/amd64
jar -cf common/target/comet-native-x86_64.jar \
-C common/target/classes/org/apache/comet darwin \
-C common/target/classes/org/apache/comet linux
./dev/deploy-file common/target/comet-native-x86_64.jar comet-native-x86_64${COMET_CLASSIFIER} jar

core-arm64:
rustup target add aarch64-apple-darwin
cd core && RUSTFLAGS="-Ctarget-cpu=apple-m1" CC=arm64-apple-darwin21.4-clang CXX=arm64-apple-darwin21.4-clang++ CARGO_FEATURE_NEON=1 cargo build --target aarch64-apple-darwin --release
cd native && RUSTFLAGS="-Ctarget-cpu=apple-m1" CC=arm64-apple-darwin21.4-clang CXX=arm64-apple-darwin21.4-clang++ CARGO_FEATURE_NEON=1 cargo build --target aarch64-apple-darwin --release
mkdir -p common/target/classes/org/apache/comet/darwin/aarch64
cp core/target/aarch64-apple-darwin/release/libcomet.dylib common/target/classes/org/apache/comet/darwin/aarch64
cd core && RUSTFLAGS="-Ctarget-cpu=native" cargo build --release
cp native/target/aarch64-apple-darwin/release/libcomet.dylib common/target/classes/org/apache/comet/darwin/aarch64
cd native && RUSTFLAGS="-Ctarget-cpu=native" cargo build --release
mkdir -p common/target/classes/org/apache/comet/linux/aarch64
cp core/target/release/libcomet.so common/target/classes/org/apache/comet/linux/aarch64
cp native/target/release/libcomet.so common/target/classes/org/apache/comet/linux/aarch64
jar -cf common/target/comet-native-aarch64.jar \
-C common/target/classes/org/apache/comet darwin \
-C common/target/classes/org/apache/comet linux
./dev/deploy-file common/target/comet-native-aarch64.jar comet-native-aarch64${COMET_CLASSIFIER} jar

release-linux: clean
rustup target add aarch64-apple-darwin x86_64-apple-darwin
cd core && RUSTFLAGS="-Ctarget-cpu=apple-m1" CC=arm64-apple-darwin21.4-clang CXX=arm64-apple-darwin21.4-clang++ CARGO_FEATURE_NEON=1 cargo build --target aarch64-apple-darwin --release
cd core && RUSTFLAGS="-Ctarget-cpu=skylake -Ctarget-feature=-prefer-256-bit" CC=o64-clang CXX=o64-clang++ cargo build --target x86_64-apple-darwin --release
cd core && RUSTFLAGS="-Ctarget-cpu=native -Ctarget-feature=-prefer-256-bit" cargo build --release
cd native && RUSTFLAGS="-Ctarget-cpu=apple-m1" CC=arm64-apple-darwin21.4-clang CXX=arm64-apple-darwin21.4-clang++ CARGO_FEATURE_NEON=1 cargo build --target aarch64-apple-darwin --release
cd native && RUSTFLAGS="-Ctarget-cpu=skylake -Ctarget-feature=-prefer-256-bit" CC=o64-clang CXX=o64-clang++ cargo build --target x86_64-apple-darwin --release
cd native && RUSTFLAGS="-Ctarget-cpu=native -Ctarget-feature=-prefer-256-bit" cargo build --release
./mvnw install -Prelease -DskipTests $(PROFILES)
release:
cd core && RUSTFLAGS="-Ctarget-cpu=native" cargo build --release
cd native && RUSTFLAGS="-Ctarget-cpu=native" cargo build --release
./mvnw install -Prelease -DskipTests $(PROFILES)
release-nogit:
cd core && RUSTFLAGS="-Ctarget-cpu=native" cargo build --features nightly --release
cd native && RUSTFLAGS="-Ctarget-cpu=native" cargo build --features nightly --release
./mvnw install -Prelease -DskipTests $(PROFILES) -Dmaven.gitcommitid.skip=true
benchmark-%: clean release
cd spark && COMET_CONF_DIR=$(shell pwd)/conf MAVEN_OPTS='-Xmx20g' ../mvnw exec:java -Dexec.mainClass="$*" -Dexec.classpathScope="test" -Dexec.cleanupDaemonThreads="false" -Dexec.args="$(filter-out $@,$(MAKECMDGOALS))" $(PROFILES)
Expand Down
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,14 @@ The following chart shows the time it takes to run the 22 TPC-H queries against
using a single executor with 8 cores. See the [Comet Benchmarking Guide](https://datafusion.apache.org/comet/contributor-guide/benchmarking.html)
for details of the environment used for these benchmarks.

When using Comet, the overall run time is reduced from 649 seconds to 440 seconds, a 1.5x speedup.
When using Comet, the overall run time is reduced from 649 seconds to 433 seconds, a 1.5x speedup, with some queries
showing a 2x-3x speedup.

Running the same queries with DataFusion standalone (without Spark) using the same number of cores results in a 3.9x
speedup compared to Spark.

Comet is not yet achieving full DataFusion speeds in all cases, but with future work we aim to provide a 2x-4x speedup
for many use cases.
for a broader set of queries.

![](docs/source/_static/images/tpch_allqueries.png)

Expand Down
4 changes: 2 additions & 2 deletions common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -193,14 +193,14 @@ under the License.
<directory>${project.basedir}/src/main/resources</directory>
</resource>
<resource>
<directory>${project.basedir}/../core/target/x86_64-apple-darwin/release</directory>
<directory>${project.basedir}/../native/target/x86_64-apple-darwin/release</directory>
<includes>
<include>libcomet.dylib</include>
</includes>
<targetPath>org/apache/comet/darwin/x86_64</targetPath>
</resource>
<resource>
<directory>${project.basedir}/../core/target/aarch64-apple-darwin/release</directory>
<directory>${project.basedir}/../native/target/aarch64-apple-darwin/release</directory>
<includes>
<include>libcomet.dylib</include>
</includes>
Expand Down
3 changes: 2 additions & 1 deletion common/src/main/java/org/apache/arrow/c/ArrowImporter.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ public FieldVector importVector(
ArrowArray array, ArrowSchema schema, CDataDictionaryProvider provider) {
Field field = importField(schema, provider);
FieldVector vector = field.createVector(allocator);
Data.importIntoVector(allocator, array, vector, provider);
CometArrayImporter importer = new CometArrayImporter(allocator, vector, provider);
importer.importArray(array);
return vector;
}
}
152 changes: 152 additions & 0 deletions common/src/main/java/org/apache/arrow/c/CometArrayImporter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.arrow.c;

import java.util.Collections;
import java.util.List;

import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.dictionary.Dictionary;
import org.apache.arrow.vector.dictionary.DictionaryProvider;
import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
import org.apache.arrow.vector.types.pojo.DictionaryEncoding;

import static org.apache.arrow.c.NativeUtil.NULL;
import static org.apache.arrow.memory.util.LargeMemoryUtil.checkedCastToInt;
import static org.apache.arrow.util.Preconditions.checkNotNull;
import static org.apache.arrow.util.Preconditions.checkState;

/**
* Importer for {@link ArrowArray}. We copy it from Arrow `ArrayImporter` because we need to use
* `CometBufferImportTypeVisitor` instead of Arrow `BufferImportTypeVisitor`.
*/
final class CometArrayImporter {
private static final int MAX_IMPORT_RECURSION_LEVEL = 64;

private final BufferAllocator allocator;
private final FieldVector vector;
private final DictionaryProvider dictionaryProvider;

private ReferenceCountedArrowArray underlyingAllocation;
private int recursionLevel;

CometArrayImporter(
BufferAllocator allocator, FieldVector vector, DictionaryProvider dictionaryProvider) {
this.allocator = Preconditions.checkNotNull(allocator);
this.vector = Preconditions.checkNotNull(vector);
this.dictionaryProvider = dictionaryProvider;
}

void importArray(ArrowArray src) {
ArrowArray.Snapshot snapshot = src.snapshot();
checkState(snapshot.release != NULL, "Cannot import released ArrowArray");

// Move imported array
ArrowArray ownedArray = ArrowArray.allocateNew(allocator);
ownedArray.save(snapshot);
src.markReleased();
src.close();

recursionLevel = 0;

// This keeps the array alive as long as there are any buffers that need it
underlyingAllocation = new ReferenceCountedArrowArray(ownedArray);
try {
doImport(snapshot);
} finally {
underlyingAllocation.release();
}
}

private void importChild(CometArrayImporter parent, ArrowArray src) {
ArrowArray.Snapshot snapshot = src.snapshot();
checkState(snapshot.release != NULL, "Cannot import released ArrowArray");
recursionLevel = parent.recursionLevel + 1;
checkState(
recursionLevel <= MAX_IMPORT_RECURSION_LEVEL,
"Recursion level in ArrowArray struct exceeded");
// Child buffers will keep the entire parent import alive.
underlyingAllocation = parent.underlyingAllocation;
doImport(snapshot);
}

private void doImport(ArrowArray.Snapshot snapshot) {
// First import children (required for reconstituting parent array data)
long[] children =
NativeUtil.toJavaArray(snapshot.children, checkedCastToInt(snapshot.n_children));
if (children != null && children.length > 0) {
List<FieldVector> childVectors = vector.getChildrenFromFields();
checkState(
children.length == childVectors.size(),
"ArrowArray struct has %s children (expected %s)",
children.length,
childVectors.size());
for (int i = 0; i < children.length; i++) {
checkState(children[i] != NULL, "ArrowArray struct has NULL child at position %s", i);
CometArrayImporter childImporter =
new CometArrayImporter(allocator, childVectors.get(i), dictionaryProvider);
childImporter.importChild(this, ArrowArray.wrap(children[i]));
}
}

// Handle import of a dictionary encoded vector
if (snapshot.dictionary != NULL) {
DictionaryEncoding encoding = vector.getField().getDictionary();
checkNotNull(encoding, "Missing encoding on import of ArrowArray with dictionary");

Dictionary dictionary = dictionaryProvider.lookup(encoding.getId());
checkNotNull(dictionary, "Dictionary lookup failed on import of ArrowArray with dictionary");

// reset the dictionary vector to the initial state
dictionary.getVector().clear();

CometArrayImporter dictionaryImporter =
new CometArrayImporter(allocator, dictionary.getVector(), dictionaryProvider);
dictionaryImporter.importChild(this, ArrowArray.wrap(snapshot.dictionary));
}

// Import main data
ArrowFieldNode fieldNode = new ArrowFieldNode(snapshot.length, snapshot.null_count);
long[] bufferPointers =
NativeUtil.toJavaArray(snapshot.buffers, checkedCastToInt(snapshot.n_buffers));

try (final CometBufferImportTypeVisitor visitor =
new CometBufferImportTypeVisitor(
allocator, underlyingAllocation, fieldNode, snapshot, bufferPointers)) {
final List<ArrowBuf> buffers;
if (bufferPointers == null || bufferPointers.length == 0) {
buffers = Collections.emptyList();
} else {
buffers = vector.getField().getType().accept(visitor);
}
vector.loadFieldBuffers(fieldNode, buffers);
} catch (Exception e) {
throw new IllegalArgumentException(
"Could not load buffers for field "
+ vector.getField()
+ ". error message: "
+ e.getMessage(),
e);
}
}
}
Loading

0 comments on commit a65b38b

Please sign in to comment.