Skip to content

Commit

Permalink
fix: register object store with datafusion (#107)
Browse files Browse the repository at this point in the history

---------

Co-authored-by: Shiyan Xu <2701446+xushiyan@users.noreply.github.com>
  • Loading branch information
abyssnlp and xushiyan authored Aug 12, 2024
1 parent b3ffa32 commit 3359e10
Show file tree
Hide file tree
Showing 7 changed files with 40 additions and 3 deletions.
14 changes: 14 additions & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,5 +60,19 @@ dashmap = { workspace = true }
futures = { workspace = true }
tokio = { workspace = true }

# datafusion
datafusion = { workspace = true, optional = true }
datafusion-expr = { workspace = true, optional = true }
datafusion-common = { workspace = true, optional = true }
datafusion-physical-expr = { workspace = true, optional = true }

[dev-dependencies]
hudi-tests = { path = "../tests" }

[features]
datafusion = [
"dep:datafusion",
"datafusion-expr",
"datafusion-common",
"datafusion-physical-expr",
]
8 changes: 8 additions & 0 deletions crates/core/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,14 @@ impl Storage {
}
}

#[cfg(feature = "datafusion")]
pub fn register_object_store(
&self,
runtime_env: Arc<datafusion::execution::runtime_env::RuntimeEnv>,
) {
runtime_env.register_object_store(self.base_url.as_ref(), self.object_store.clone());
}

#[cfg(test)]
async fn get_file_info(&self, relative_path: &str) -> Result<FileInfo> {
let obj_url = join_url_segments(&self.base_url, &[relative_path])?;
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/table/fs_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use crate::storage::{get_leaf_dirs, Storage};
#[allow(dead_code)]
pub struct FileSystemView {
configs: Arc<HudiConfigs>,
storage: Arc<Storage>,
pub(crate) storage: Arc<Storage>,
partition_to_file_groups: Arc<DashMap<String, Vec<FileGroup>>>,
}

Expand Down
13 changes: 13 additions & 0 deletions crates/core/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,19 @@ impl Table {
})
}

#[cfg(feature = "datafusion")]
pub fn register_storage(
&self,
runtime_env: Arc<datafusion::execution::runtime_env::RuntimeEnv>,
) {
self.timeline
.storage
.register_object_store(runtime_env.clone());
self.file_system_view
.storage
.register_object_store(runtime_env.clone());
}

async fn load_configs<I, K, V>(
base_url: Arc<Url>,
all_options: I,
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/table/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ impl Instant {
#[allow(dead_code)]
pub struct Timeline {
configs: Arc<HudiConfigs>,
storage: Arc<Storage>,
pub(crate) storage: Arc<Storage>,
pub instants: Vec<Instant>,
}

Expand Down
2 changes: 1 addition & 1 deletion crates/datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ homepage.workspace = true
repository.workspace = true

[dependencies]
hudi-core = { version = "0.2.0", path = "../core" }
hudi-core = { version = "0.2.0", path = "../core", features = ["datafusion"] }
# arrow
arrow-schema = { workspace = true }

Expand Down
2 changes: 2 additions & 0 deletions crates/datafusion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ impl TableProvider for HudiDataSource {
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
self.table.register_storage(state.runtime_env().clone());

let file_slices = self
.table
.split_file_slices(self.get_input_partitions())
Expand Down

0 comments on commit 3359e10

Please sign in to comment.