Skip to content

Commit

Permalink
rust bindings for CDF
Browse files Browse the repository at this point in the history
Co-authored-by: Oussama Saoudi <oussama.saoudi@databricks.com>
  • Loading branch information
PatrickJin-db and OussamaSaoudi-db committed Dec 18, 2024
1 parent 49b188b commit 04c7a31
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 47 deletions.
25 changes: 22 additions & 3 deletions .github/workflows/build-kernel-wheels.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
runs-on: ${{ matrix.os }}
strategy:
matrix:
os: [ubuntu-latest, macos-latest, windows-latest]
os: [ubuntu-latest, ubuntu-20.04, macos-latest, windows-latest]
python-version: [3.8]
arch: [x86_64, arm64]
include:
Expand All @@ -25,6 +25,8 @@ jobs:
arch: arm64
- os: ubuntu-latest
arch: x86_64
- os: ubuntu-20.04
arch: x86_64
- os: windows-latest
arch: x86_64

Expand Down Expand Up @@ -69,8 +71,25 @@ jobs:
maturin build --release
shell: bash

- name: Build wheel (x86_64 Linux Ubuntu 20.04)
if: matrix.os == 'ubuntu-20.04'
run: |
cd python/delta-kernel-rust-sharing-wrapper
maturin build --release
shell: bash

- name: Upload wheels
uses: actions/upload-artifact@v2
uses: actions/upload-artifact@v4
with:
name: wheels
name: wheel-${{ matrix.os }}-${{ matrix.arch }}
path: python/delta-kernel-rust-sharing-wrapper/target/wheels/*.whl

merge:
runs-on: ubuntu-latest
needs: build
steps:
- name: Merge Artifacts
uses: actions/upload-artifact/merge@v4
with:
name: all-wheels
pattern: wheel-*
1 change: 1 addition & 0 deletions python/delta-kernel-rust-sharing-wrapper/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Cargo.lock
4 changes: 2 additions & 2 deletions python/delta-kernel-rust-sharing-wrapper/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "delta-kernel-rust-sharing-wrapper"
edition = "2021"
license = "Apache-2.0"
version = "0.1.0"
version = "0.2.0"

[lib]
name = "delta_kernel_rust_sharing_wrapper"
Expand All @@ -11,7 +11,7 @@ crate-type = ["cdylib"]

[dependencies]
arrow = { version = "53.3.0", features = ["pyarrow"] }
delta_kernel = {version = "0.5", features = ["cloud", "default", "default-engine"]}
delta_kernel = { version = "0.6.0", features = ["cloud", "default-engine"]}
openssl = { version = "0.10", features = ["vendored"] }
url = "2"

Expand Down
155 changes: 113 additions & 42 deletions python/delta-kernel-rust-sharing-wrapper/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,37 +1,44 @@
use std::sync::Arc;

use arrow::compute::filter_record_batch;
use arrow::datatypes::SchemaRef;
use arrow::datatypes::SchemaRef as ArrowSchemaRef;
use arrow::error::ArrowError;
use arrow::pyarrow::PyArrowType;
use delta_kernel::engine::arrow_data::ArrowEngineData;
use arrow::record_batch::{RecordBatch, RecordBatchIterator, RecordBatchReader};

use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor;
use delta_kernel::engine::default::DefaultEngine;
use delta_kernel::scan::ScanResult;
use delta_kernel::table_changes::scan::{
TableChangesScan as KernelTableChangesScan,
TableChangesScanBuilder as KernelTableChangesScanBuilder,
};
use delta_kernel::Error as KernelError;
use delta_kernel::{engine::arrow_data::ArrowEngineData, schema::StructType};
use delta_kernel::{DeltaResult, Engine};

use pyo3::exceptions::PyValueError;
use pyo3::prelude::*;

use url::Url;

use arrow::record_batch::{RecordBatch, RecordBatchIterator, RecordBatchReader};
use delta_kernel::Engine;

use std::collections::HashMap;

struct KernelError(delta_kernel::Error);
struct PyKernelError(KernelError);

impl From<KernelError> for PyErr {
fn from(error: KernelError) -> Self {
impl From<PyKernelError> for PyErr {
fn from(error: PyKernelError) -> Self {
PyValueError::new_err(format!("Kernel error: {}", error.0))
}
}

impl From<delta_kernel::Error> for KernelError {
fn from(delta_kernel_error: delta_kernel::Error) -> Self {
impl From<KernelError> for PyKernelError {
fn from(delta_kernel_error: KernelError) -> Self {
Self(delta_kernel_error)
}
}

type DeltaPyResult<T> = std::result::Result<T, KernelError>;
type DeltaPyResult<T> = std::result::Result<T, PyKernelError>;

#[pyclass]
struct Table(delta_kernel::Table);
Expand All @@ -40,7 +47,7 @@ struct Table(delta_kernel::Table);
impl Table {
#[new]
fn new(location: &str) -> DeltaPyResult<Self> {
let location = Url::parse(location).map_err(delta_kernel::Error::InvalidUrl)?;
let location = Url::parse(location).map_err(KernelError::InvalidUrl)?;
let table = delta_kernel::Table::new(location);
Ok(Table(table))
}
Expand Down Expand Up @@ -73,11 +80,43 @@ impl ScanBuilder {
}

fn build(&mut self) -> DeltaPyResult<Scan> {
let scan = self.0.take().unwrap().build()?;
let scan = self
.0
.take()
.ok_or_else(|| KernelError::generic("Can only call build() once on ScanBuilder"))?
.build()?;
Ok(Scan(scan))
}
}

fn try_get_schema(schema: &Arc<StructType>) -> Result<ArrowSchemaRef, KernelError> {
Ok(Arc::new(schema.as_ref().try_into().map_err(|e| {
KernelError::Generic(format!("Could not get result schema: {e}"))
})?))
}

fn try_create_record_batch_iter(
results: impl Iterator<Item = DeltaResult<ScanResult>>,
result_schema: ArrowSchemaRef,
) -> RecordBatchIterator<impl Iterator<Item = Result<RecordBatch, ArrowError>>> {
let record_batches = results.map(|res| {
let scan_res = res.and_then(|res| Ok((res.full_mask(), res.raw_data?)));
let (mask, data) = scan_res.map_err(|e| ArrowError::from_external_error(Box::new(e)))?;
let record_batch: RecordBatch = data
.into_any()
.downcast::<ArrowEngineData>()
.map_err(|_| ArrowError::CastError("Couldn't cast to ArrowEngineData".to_string()))?
.into();
if let Some(mask) = mask {
let filtered_batch = filter_record_batch(&record_batch, &mask.into())?;
Ok(filtered_batch)
} else {
Ok(record_batch)
}
});
RecordBatchIterator::new(record_batches, result_schema)
}

#[pyclass]
struct Scan(delta_kernel::scan::Scan);

Expand All @@ -87,50 +126,80 @@ impl Scan {
&self,
engine_interface: &PythonInterface,
) -> DeltaPyResult<PyArrowType<Box<dyn RecordBatchReader + Send>>> {
let result_schema: SchemaRef =
Arc::new(self.0.schema().as_ref().try_into().map_err(|e| {
delta_kernel::Error::Generic(format!("Could not get result schema: {e}"))
})?);
let results = self.0.execute(engine_interface.0.as_ref())?;
let record_batches: Vec<_> = results
.map(|res| {
let scan_res = res.and_then(|res| Ok((res.full_mask(), res.raw_data?)));
let (mask, data) =
scan_res.map_err(|e| ArrowError::from_external_error(Box::new(e)))?;
let record_batch: RecordBatch = data
.into_any()
.downcast::<ArrowEngineData>()
.map_err(|_| {
ArrowError::CastError("Couldn't cast to ArrowEngineData".to_string())
})?
.into();
if let Some(mask) = mask {
let filtered_batch = filter_record_batch(&record_batch, &mask.into())?;
Ok(filtered_batch)
} else {
Ok(record_batch)
}
})
.collect();
let record_batch_iter = RecordBatchIterator::new(record_batches, result_schema);
let result_schema: ArrowSchemaRef = try_get_schema(self.0.schema())?;
let results = self.0.execute(engine_interface.0.clone())?;
let record_batch_iter = try_create_record_batch_iter(results, result_schema);
Ok(PyArrowType(Box::new(record_batch_iter)))
}
}

#[pyclass]
struct TableChangesScanBuilder(Option<KernelTableChangesScanBuilder>);

#[pymethods]
impl TableChangesScanBuilder {
#[new]
#[pyo3(signature = (table, engine_interface, start_version, end_version=None))]
fn new(
table: &Table,
engine_interface: &PythonInterface,
start_version: u64,
end_version: Option<u64>,
) -> DeltaPyResult<TableChangesScanBuilder> {
let table_changes = table
.0
.table_changes(engine_interface.0.as_ref(), start_version, end_version)?;
Ok(TableChangesScanBuilder(Some(
table_changes.into_scan_builder(),
)))
}

fn build(&mut self) -> DeltaPyResult<TableChangesScan> {
let scan = self
.0
.take()
.ok_or_else(|| {
KernelError::generic("Can only call build() once on TableChangesScanBuilder")
})?
.build()?;
let schema: ArrowSchemaRef = try_get_schema(scan.schema())?;
Ok(TableChangesScan { scan, schema })
}
}

#[pyclass]
struct TableChangesScan {
scan: KernelTableChangesScan,
schema: ArrowSchemaRef,
}

#[pymethods]
impl TableChangesScan {
fn execute(
&self,
engine_interface: &PythonInterface,
) -> DeltaPyResult<PyArrowType<Box<dyn RecordBatchReader + Send>>> {
let result_schema = self.schema.clone();
let results = self.scan.execute(engine_interface.0.clone())?;
let record_batch_iter = try_create_record_batch_iter(results, result_schema);
Ok(PyArrowType(Box::new(record_batch_iter)))
}
}

#[pyclass]
struct PythonInterface(Box<dyn Engine + Send>);
struct PythonInterface(Arc<dyn Engine + Send>);

#[pymethods]
impl PythonInterface {
#[new]
fn new(location: &str) -> DeltaPyResult<Self> {
let url = Url::parse(location).map_err(delta_kernel::Error::InvalidUrl)?;
let url = Url::parse(location).map_err(KernelError::InvalidUrl)?;
let client = DefaultEngine::try_new(
&url,
HashMap::<String, String>::new(),
Arc::new(TokioBackgroundExecutor::new()),
)?;
Ok(PythonInterface(Box::new(client)))
Ok(PythonInterface(Arc::new(client)))
}
}

Expand All @@ -144,5 +213,7 @@ fn delta_kernel_rust_sharing_wrapper(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<Snapshot>()?;
m.add_class::<ScanBuilder>()?;
m.add_class::<Scan>()?;
m.add_class::<TableChangesScanBuilder>()?;
m.add_class::<TableChangesScan>()?;
Ok(())
}

0 comments on commit 04c7a31

Please sign in to comment.