Skip to content

Commit

Permalink
Merge branch 'feature' into add-stats
Browse files Browse the repository at this point in the history
  • Loading branch information
samansmink committed Aug 30, 2024
2 parents 24dec54 + bdd8f26 commit e7cf167
Show file tree
Hide file tree
Showing 16 changed files with 256 additions and 83 deletions.
12 changes: 9 additions & 3 deletions .github/workflows/CloudTesting.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ jobs:
with:
vcpkgGitCommitId: a1a1cbc975abf909a6c8985a6a2b8fe20bbd9bd6

- name: Configure OpenSSL for Rust
run: |
echo "OPENSSL_ROOT_DIR=`pwd`/build/release/vcpkg_installed/x64-linux" >> $GITHUB_ENV
echo "OPENSSL_DIR=`pwd`/build/release/vcpkg_installed/x64-linux" >> $GITHUB_ENV
echo "OPENSSL_USE_STATIC_LIBS=true" >> $GITHUB_ENV
- name: Setup Rust
uses: dtolnay/rust-toolchain@stable

Expand All @@ -57,15 +63,15 @@ jobs:
AZURE_TENANT_ID: ${{secrets.AZURE_TENANT_ID}}
AZURE_STORAGE_ACCOUNT: ${{secrets.AZURE_STORAGE_ACCOUNT}}
run: |
python3 duckdb/scripts/run_tests_one_by_one.py ./build/release/test/unittest "*test/sql/cloud/*"
python3 duckdb/scripts/run_tests_one_by_one.py ./build/release/test/unittest `pwd`/test/sql/cloud/*
- name: Test with SPN logged in in azure-cli
env:
AZURE_STORAGE_ACCOUNT: ${{secrets.AZURE_STORAGE_ACCOUNT}}
DUCKDB_AZ_CLI_LOGGED_IN: 1
run: |
az login --service-principal -u ${{secrets.AZURE_CLIENT_ID}} -p ${{secrets.AZURE_CLIENT_SECRET}} --tenant ${{secrets.AZURE_TENANT_ID}}
python3 duckdb/scripts/run_tests_one_by_one.py ./build/release/test/unittest "*test/sql/cloud/*"
python3 duckdb/scripts/run_tests_one_by_one.py ./build/release/test/unittest `pwd`/test/sql/cloud/*
- name: Log out azure-cli
if: always()
Expand All @@ -77,4 +83,4 @@ jobs:
AZURE_STORAGE_ACCOUNT: ${{secrets.AZURE_STORAGE_ACCOUNT}}
DUCKDB_AZURE_PUBLIC_CONTAINER_AVAILABLE: 1
run: |
python3 duckdb/scripts/run_tests_one_by_one.py ./build/release/test/unittest "*test/sql/cloud/*"
python3 duckdb/scripts/run_tests_one_by_one.py ./build/release/test/unittest `pwd`/test/sql/cloud/*
12 changes: 12 additions & 0 deletions .github/workflows/LocalTesting.yml
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,12 @@ jobs:
with:
vcpkgGitCommitId: a1a1cbc975abf909a6c8985a6a2b8fe20bbd9bd6

- name: Configure OpenSSL for Rust
run: |
echo "OPENSSL_ROOT_DIR=`pwd`/build/release/vcpkg_installed/x64-linux" >> $GITHUB_ENV
echo "OPENSSL_DIR=`pwd`/build/release/vcpkg_installed/x64-linux" >> $GITHUB_ENV
echo "OPENSSL_USE_STATIC_LIBS=true" >> $GITHUB_ENV
- name: Build
shell: bash
run: make
Expand Down Expand Up @@ -194,6 +200,12 @@ jobs:
with:
vcpkgGitCommitId: a1a1cbc975abf909a6c8985a6a2b8fe20bbd9bd6

- name: Configure OpenSSL for Rust
run: |
echo "OPENSSL_ROOT_DIR=`pwd`/build/release/vcpkg_installed/x64-linux" >> $GITHUB_ENV
echo "OPENSSL_DIR=`pwd`/build/release/vcpkg_installed/x64-linux" >> $GITHUB_ENV
echo "OPENSSL_USE_STATIC_LIBS=true" >> $GITHUB_ENV
- name: Build
shell: bash
run: make generate-data
Expand Down
4 changes: 2 additions & 2 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@
branch = main
[submodule "extension-ci-tools"]
path = extension-ci-tools
url = git@github.com:duckdb/extension-ci-tools.git
branch = main
url = https://github.com/duckdb/extension-ci-tools.git
branch = main
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ if(APPLE)
elseif(UNIX)
set(PLATFORM_LIBS m c resolv)
elseif(WIN32)
set(PLATFORM_LIBS ntdll ncrypt secur32 ws2_32 userenv bcrypt msvcrt advapi32)
set(PLATFORM_LIBS ntdll ncrypt secur32 ws2_32 userenv bcrypt msvcrt advapi32 RuntimeObject)
else()
message(STATUS "UNKNOWN OS")
endif()
Expand Down
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ test_release: export DAT_PATH=./build/release/rust/src/delta_kernel/acceptance/t
test_debug: export DELTA_KERNEL_TESTS_PATH=./build/debug/rust/src/delta_kernel/kernel/tests/data
test_debug: export DAT_PATH=./build/debug/rust/src/delta_kernel/acceptance/tests/dat

# Core extensions that we need for testing
CORE_EXTENSIONS='tpcds;tpch;aws;azure;httpfs'

# Set this flag during building to enable the benchmark runner
ifeq (${BUILD_BENCHMARK}, 1)
TOOLCHAIN_FLAGS:=${TOOLCHAIN_FLAGS} -DBUILD_BENCHMARKS=1
Expand Down
56 changes: 45 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,40 +1,68 @@
# DuckDB Delta Extension

This is the experimental DuckDB extension for [Delta](https://delta.io/). It is built using the (also experimental)
[Delta Kernel](https://github.com/delta-incubator/delta-kernel-rs). The extension (currently) offers **read** support for delta
tables, both local and remote.

# Supported platforms
## Supported platforms

The supported platforms are:

- `linux_amd64` and `linux_amd64_gcc4` and `linux_arm64`
- `osx_amd64` and `osx_arm64`
- `windows_amd64`

Support for the [other](https://duckdb.org/docs/extensions/working_with_extensions#platforms) DuckDB platforms is
work-in-progress

# How to use
**NOTE: this extension requires the DuckDB v0.10.3 or higher**
## How to use

> [!NOTE]
> This extension requires the DuckDB v0.10.3 or higher
This extension is distributed as a binary extension. To use it, simply use one of its functions from DuckDB and the extension will be autoloaded:

```SQL
FROM delta_scan('s3://some/delta/table');
```

Note that using DuckDB [Secrets](https://duckdb.org/docs/configuration/secrets_manager.html) for S3 authentication is supported:
To scan a local table, use the full path prefixes with `file://`

```SQL
FROM delta_scan('file:///some/path/on/local/machine');
```

## Cloud Storage authentication

Note that using DuckDB [Secrets](https://duckdb.org/docs/configuration/secrets_manager.html) for Cloud authentication is supported.

### S3 Example

```SQL
CREATE SECRET (TYPE S3, provider credential_chain);
CREATE SECRET (
TYPE S3,
PROVIDER CREDENTIAL_CHAIN
);
FROM delta_scan('s3://some/delta/table/with/auth');
```

To scan a local table, use the full path prefixes with `file://`
### Azure Example

```SQL
FROM delta_scan('file:///some/path/on/local/machine');
CREATE SECRET (
TYPE AZURE,
PROVIDER CREDENTIAL_CHAIN,
CHAIN 'cli',
ACCOUNT_NAME 'mystorageaccount'
);
FROM delta_scan('abfss://some/delta/table/with/auth');
```

# Features
## Features

While still experimental, many (scanning) features/optimizations are already supported in this extension as it reuses most of DuckDB's
regular parquet scanning logic:

- multithreaded scans and parquet metadata reading
- data skipping/filter pushdown
- skipping row-groups in file (based on parquet metadata)
Expand All @@ -43,24 +71,30 @@ regular parquet scanning logic:
- scanning tables with deletion vectors
- all primitive types
- structs
- S3 support with secrets
- Cloud storage (AWS, Azure, GCP) support with secrets

More features coming soon!

# Building
## Building

See the [Extension Template](https://github.com/duckdb/extension-template) for generic build instructions

# Running tests
## Running tests

There are various tests available for the delta extension:

1. Delta Acceptence Test (DAT) based tests in `/test/sql/dat`
2. delta-kernel-rs based tests in `/test/sql/delta_kernel_rs`
3. Generated data based tests in `tests/sql/generated` (generated using [delta-rs](https://delta-io.github.io/delta-rs/), [PySpark](https://spark.apache.org/docs/latest/api/python/index.html), and DuckDB)

To run the first 2 sets of tests:

```shell
make test_debug
```

or in release mode

```shell
make test
```
Expand Down
2 changes: 1 addition & 1 deletion duckdb
Submodule duckdb updated 1846 files
23 changes: 1 addition & 22 deletions extension_config.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,4 @@
duckdb_extension_load(delta
SOURCE_DIR ${CMAKE_CURRENT_LIST_DIR}
LOAD_TESTS
)

# Build the httpfs extension to test with s3/http
duckdb_extension_load(httpfs)

## Build the azure extension to test with azure
#duckdb_extension_load(azure
# LOAD_TESTS
# GIT_URL https://github.com/duckdb/duckdb_azure
# GIT_TAG d92b3b87ff06e6694883b1a6dbf684eeefedd609
#)
#
## Build the aws extension to test with credential providers
#duckdb_extension_load(aws
# LOAD_TESTS
# GIT_URL https://github.com/duckdb/duckdb_aws
# GIT_TAG 42c78d3f99e1a188a2b178ea59e3c17907af4fb2
#)

# Build the tpch and tpcds extension for testing/benchmarking
duckdb_extension_load(tpch)
duckdb_extension_load(tpcds)
)
70 changes: 60 additions & 10 deletions scripts/generate_test_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,45 @@
def delete_old_files():
if (os.path.isdir(BASE_PATH)):
shutil.rmtree(BASE_PATH)
def generate_test_data_delta_rs(path, query, part_column=False):

def generate_test_data_delta_rs_multi(path, init, tables):
"""
generate_test_data_delta_rs generates some test data using delta-rs and duckdb
:param path: the test data path (prefixed with BASE_PATH)
:param init: a duckdb query initializes the duckdb tables that will be written
:param tables: list of dicts containing the fields: name, query, (optionally) part_column
:return: describe what it returns
"""
generated_path = f"{BASE_PATH}/{path}"

if (os.path.isdir(generated_path)):
return

con = duckdb.connect()

con.sql(init)

for table in tables:
# Write delta table data
test_table_df = con.sql(table['query']).df()
table_name = table['name']

os.makedirs(f"{generated_path}/{table_name}/delta_lake", exist_ok=True)
os.makedirs(f"{generated_path}/{table_name}/duckdb", exist_ok=True)

if 'part_column' in table:
write_deltalake(f"{generated_path}/{table_name}/delta_lake", test_table_df, partition_by=[table['part_column']])
else:
write_deltalake(f"{generated_path}/{table_name}/delta_lake", test_table_df)

# Write DuckDB's reference data
if 'part_column' in table:
con.sql(f"COPY ({table['query']}) to '{generated_path}/{table_name}/duckdb' (FORMAT parquet, PARTITION_BY {table['part_column']})")
else:
con.sql(f"COPY ({table['query']}) to '{generated_path}/{table_name}/duckdb/data.parquet' (FORMAT parquet)")

def generate_test_data_delta_rs(path, query, part_column=False, add_golden_table=True):
"""
generate_test_data_delta_rs generates some test data using delta-rs and duckdb
Expand All @@ -38,12 +76,13 @@ def generate_test_data_delta_rs(path, query, part_column=False):
else:
write_deltalake(f"{generated_path}/delta_lake", test_table_df)

# Write DuckDB's reference data
os.mkdir(f'{generated_path}/duckdb')
if (part_column):
con.sql(f"COPY test_table to '{generated_path}/duckdb' (FORMAT parquet, PARTITION_BY {part_column})")
else:
con.sql(f"COPY test_table to '{generated_path}/duckdb/data.parquet' (FORMAT parquet)")
if add_golden_table:
# Write DuckDB's reference data
os.mkdir(f'{generated_path}/duckdb')
if (part_column):
con.sql(f"COPY test_table to '{generated_path}/duckdb' (FORMAT parquet, PARTITION_BY {part_column})")
else:
con.sql(f"COPY test_table to '{generated_path}/duckdb/data.parquet' (FORMAT parquet)")

def generate_test_data_pyspark(name, current_path, input_path, delete_predicate = False):
"""
Expand Down Expand Up @@ -86,13 +125,20 @@ def generate_test_data_pyspark(name, current_path, input_path, delete_predicate
if delete_predicate:
deltaTable.delete(delete_predicate)

## Writing the
## WRITING THE PARQUET FILES
df = spark.table(f'test_table_{name}')
df.write.parquet(parquet_reference_path, mode='overwrite')

# TO CLEAN, uncomment
# delete_old_files()

### TPCH SF1
init = "call dbgen(sf=1);"
tables = ["customer","lineitem","nation","orders","part","partsupp","region","supplier"]
queries = [f"from {x}" for x in tables]
tables = [{'name': x[0], 'query':x[1]} for x in zip(tables,queries)]
generate_test_data_delta_rs_multi("delta_rs_tpch_sf1", init, tables)

### Simple partitioned table
query = "CREATE table test_table AS SELECT i, i%2 as part from range(0,10) tbl(i);"
generate_test_data_delta_rs("simple_partitioned", query, "part")
Expand All @@ -112,14 +158,18 @@ def generate_test_data_pyspark(name, current_path, input_path, delete_predicate
query += "CREATE table test_table AS SELECT *, l_orderkey%10 as part from lineitem;"
generate_test_data_delta_rs("lineitem_sf1_10part", query, "part")

## Simple table with a blob as a value
query = "create table test_table as SELECT encode('ABCDE') as blob, encode('ABCDE') as blob_part, 'ABCDE' as string UNION ALL SELECT encode('😈') as blob, encode('😈') as blob_part, '😈' as string"
generate_test_data_delta_rs("simple_blob_table", query, "blob_part", add_golden_table=False)

## Simple partitioned table with structs
query = "CREATE table test_table AS SELECT {'i':i, 'j':i+1} as value, i%2 as part from range(0,10) tbl(i);"
generate_test_data_delta_rs("simple_partitioned_with_structs", query, "part");
generate_test_data_delta_rs("simple_partitioned_with_structs", query, "part")

## Partitioned table with all types we can file skip on
for type in ["bool", "int", "tinyint", "smallint", "bigint", "float", "double", "varchar"]:
query = f"CREATE table test_table as select i::{type} as value, i::{type} as part from range(0,2) tbl(i)"
generate_test_data_delta_rs(f"test_file_skipping/{type}", query, "part");
generate_test_data_delta_rs(f"test_file_skipping/{type}", query, "part")

## Simple table with deletion vector
con = duckdb.connect()
Expand Down
6 changes: 3 additions & 3 deletions src/delta_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ unique_ptr<SchemaVisitor::FieldList> SchemaVisitor::VisitSnapshotSchema(ffi::Sha
visitor.visit_float = VisitSimpleType<LogicalType::FLOAT>();
visitor.visit_double = VisitSimpleType<LogicalType::DOUBLE>();
visitor.visit_boolean = VisitSimpleType<LogicalType::BOOLEAN>();
visitor.visit_binary = VisitSimpleType<LogicalType::VARCHAR>();
visitor.visit_binary = VisitSimpleType<LogicalType::BLOB>();
visitor.visit_date = VisitSimpleType<LogicalType::DATE>();
visitor.visit_timestamp = VisitSimpleType<LogicalType::TIMESTAMP>();
visitor.visit_timestamp_ntz = VisitSimpleType<LogicalType::TIMESTAMP_TZ>();
visitor.visit_timestamp = VisitSimpleType<LogicalType::TIMESTAMP_TZ>();
visitor.visit_timestamp_ntz = VisitSimpleType<LogicalType::TIMESTAMP>();

uintptr_t result = visit_schema(snapshot, &visitor);
return state.TakeFieldList(result);
Expand Down
Loading

0 comments on commit e7cf167

Please sign in to comment.