Skip to content

Commit

Permalink
refactor: change the test
Browse files Browse the repository at this point in the history
  • Loading branch information
gohalo committed Sep 10, 2024
2 parents 44077ba + 01ef2fc commit eaf8d0d
Show file tree
Hide file tree
Showing 10 changed files with 214 additions and 57 deletions.
36 changes: 18 additions & 18 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ resolver = "2"
version = "0.2.0"
edition = "2021"
license = "Apache-2.0"
rust-version = "1.75.0"
rust-version = "1.76"
keywords = ["apachehudi", "hudi", "datalake", "arrow"]
readme = "README.md"
description = "A native Rust library for Apache Hudi"
Expand All @@ -35,25 +35,25 @@ repository = "https://github.com/apache/hudi-rs"

[workspace.dependencies]
# arrow
arrow = { version = "= 52.0.0", features = ["pyarrow"] }
arrow-arith = { version = "= 52.0.0" }
arrow-array = { version = "= 52.0.0" }
arrow-buffer = { version = "= 52.0.0" }
arrow-cast = { version = "= 52.0.0" }
arrow-ipc = { version = "= 52.0.0" }
arrow-json = { version = "= 52.0.0" }
arrow-ord = { version = "= 52.0.0" }
arrow-row = { version = "= 52.0.0" }
arrow-schema = { version = "= 52.0.0", features = ["serde"] }
arrow-select = { version = "= 52.0.0" }
object_store = { version = "= 0.10.1", features = ["aws", "azure", "gcp"] }
parquet = { version = "= 52.0.0", features = ["async", "object_store"] }
arrow = { version = "= 52.2.0", features = ["pyarrow"] }
arrow-arith = { version = "= 52.2.0" }
arrow-array = { version = "= 52.2.0" }
arrow-buffer = { version = "= 52.2.0" }
arrow-cast = { version = "= 52.2.0" }
arrow-ipc = { version = "= 52.2.0" }
arrow-json = { version = "= 52.2.0" }
arrow-ord = { version = "= 52.2.0" }
arrow-row = { version = "= 52.2.0" }
arrow-schema = { version = "= 52.2.0", features = ["serde"] }
arrow-select = { version = "= 52.2.0" }
object_store = { version = "= 0.10.2", features = ["aws", "azure", "gcp"] }
parquet = { version = "= 52.2.0", features = ["async", "object_store"] }

# datafusion
datafusion = { version = "= 39.0.0" }
datafusion-expr = { version = "= 39.0.0" }
datafusion-common = { version = "= 39.0.0" }
datafusion-physical-expr = { version = "= 39.0.0" }
datafusion = { version = "= 41.0.0" }
datafusion-expr = { version = "= 41.0.0" }
datafusion-common = { version = "= 41.0.0" }
datafusion-physical-expr = { version = "= 41.0.0" }

# serde
serde = { version = "1.0.203", features = ["derive"] }
Expand Down
2 changes: 2 additions & 0 deletions crates/core/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ pub mod internal;
pub mod read;
pub mod table;

pub const HUDI_CONF_DIR: &str = "HUDI_CONF_DIR";

pub trait ConfigParser: AsRef<str> {
type Output;

Expand Down
7 changes: 7 additions & 0 deletions crates/core/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,13 @@ impl Storage {
Ok(bytes)
}

pub async fn get_file_data_from_absolute_path(&self, absolute_path: &str) -> Result<Bytes> {
let obj_path = ObjPath::from_absolute_path(PathBuf::from(absolute_path))?;
let result = self.object_store.get(&obj_path).await?;
let bytes = result.bytes().await?;
Ok(bytes)
}

pub async fn get_parquet_file_data(&self, relative_path: &str) -> Result<RecordBatch> {
let obj_url = join_url_segments(&self.base_url, &[relative_path])?;
let obj_path = ObjPath::from_url_path(obj_url.path())?;
Expand Down
30 changes: 28 additions & 2 deletions crates/core/src/storage/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/

use std::collections::HashMap;
use std::io::{BufRead, BufReader, Cursor};
use std::path::{Path, PathBuf};
use std::str::FromStr;

use anyhow::{anyhow, Result};
use anyhow::{anyhow, Context, Result};
use bytes::Bytes;
use url::{ParseError, Url};

pub fn split_filename(filename: &str) -> Result<(String, String)> {
Expand Down Expand Up @@ -80,6 +82,30 @@ pub fn empty_options<'a>() -> std::iter::Empty<(&'a str, &'a str)> {
std::iter::empty::<(&str, &str)>()
}

pub async fn parse_config_data(data: &Bytes, split_chars: &str) -> Result<HashMap<String, String>> {
let cursor = Cursor::new(data);
let lines = BufReader::new(cursor).lines();
let mut configs = HashMap::new();

for line in lines {
let line = line.context("Failed to read line")?;
let trimmed_line = line.trim();
if trimmed_line.is_empty() || trimmed_line.starts_with('#') {
continue;
}
let mut parts = trimmed_line.splitn(2, |c| split_chars.contains(c));
let key = parts
.next()
.context("Missing key in config line")?
.trim()
.to_owned();
let value = parts.next().unwrap_or("").trim().to_owned();
configs.insert(key, value);
}

Ok(configs)
}

#[cfg(test)]
mod tests {
use std::str::FromStr;
Expand Down
120 changes: 89 additions & 31 deletions crates/core/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

use std::collections::HashMap;
use std::env;
use std::path::PathBuf;
use std::fs::read_to_string;
use std::io::{BufRead, BufReader};
use std::str::FromStr;
use std::sync::Arc;

Expand All @@ -39,8 +39,9 @@ use crate::config::read::HudiReadConfig;
use crate::config::read::HudiReadConfig::AsOfTimestamp;
use crate::config::table::{HudiTableConfig, TableTypeValue};
use crate::config::HudiConfigs;
use crate::config::HUDI_CONF_DIR;
use crate::file_group::FileSlice;
use crate::storage::utils::{empty_options, parse_uri};
use crate::storage::utils::{empty_options, parse_config_data, parse_uri};
use crate::storage::Storage;
use crate::table::fs_view::FileSystemView;
use crate::table::timeline::Timeline;
Expand Down Expand Up @@ -151,34 +152,68 @@ impl Table {
extra_options.insert(k.as_ref().to_string(), v.into());
}
}

let storage = Storage::new(base_url, &extra_options)?;
let data = storage.get_file_data(".hoodie/hoodie.properties").await?;
let cursor = std::io::Cursor::new(data);
let lines = BufReader::new(cursor).lines();
for line in lines {
let line = line?;
let trimmed_line = line.trim();
if trimmed_line.is_empty() || trimmed_line.starts_with('#') {
continue;
}
let mut parts = trimmed_line.splitn(2, '=');
let key = parts.next().unwrap().to_owned();
let value = parts.next().unwrap_or("").to_owned();
// `hoodie.properties` takes precedence TODO handle conflicts where applicable
hudi_options.insert(key, value);
}

Self::imbue_table_properties(&mut hudi_options, storage.clone()).await?;

Self::imbue_global_hudi_configs(&mut hudi_options, storage.clone()).await?;

let hudi_configs = HudiConfigs::new(hudi_options);

Self::validate_configs(&hudi_configs).map(|_| (hudi_configs, extra_options))
}

fn imbue_cloud_env_vars(options: &mut HashMap<String, String>) {
let prefixes = ["AWS_", "AZURE_", "GOOGLE_"];
options.extend(
env::vars()
.filter(|(key, _)| prefixes.iter().any(|prefix| key.starts_with(prefix)))
.map(|(k, v)| (k.to_ascii_lowercase(), v)),
);
const PREFIXES: [&str; 3] = ["AWS_", "AZURE_", "GOOGLE_"];

for (key, value) in env::vars() {
if PREFIXES.iter().any(|prefix| key.starts_with(prefix))
&& !options.contains_key(&key.to_ascii_lowercase())
{
options.insert(key.to_ascii_lowercase(), value);
}
}
}

async fn imbue_table_properties(
options: &mut HashMap<String, String>,
storage: Arc<Storage>,
) -> Result<()> {
let bytes = storage.get_file_data(".hoodie/hoodie.properties").await?;
let table_properties = parse_config_data(&bytes, "=").await?;

// TODO: handle the case where the same key is present in both table properties and options
for (k, v) in table_properties {
options.insert(k.to_string(), v.to_string());
}

Ok(())
}

async fn imbue_global_hudi_configs(
options: &mut HashMap<String, String>,
storage: Arc<Storage>,
) -> Result<()> {
let global_config_path = env::var(HUDI_CONF_DIR)
.map(PathBuf::from)
.unwrap_or_else(|_| PathBuf::from("/etc/hudi/conf"))
.join("hudi-defaults.conf");

if let Ok(bytes) = storage
.get_file_data_from_absolute_path(global_config_path.to_str().unwrap())
.await
{
if let Ok(global_configs) = parse_config_data(&bytes, " \t=").await {
for (key, value) in global_configs {
if key.starts_with("hoodie.") && !options.contains_key(&key) {
options.insert(key.to_string(), value.to_string());
}
}
}
}

Ok(())
}

fn validate_configs(hudi_configs: &HudiConfigs) -> Result<()> {
Expand Down Expand Up @@ -301,7 +336,7 @@ impl Table {
mod tests {
use std::collections::HashSet;
use std::fs::canonicalize;
use std::panic;
use std::{env, panic};

use url::Url;

Expand All @@ -314,6 +349,7 @@ mod tests {
PrecombineField, RecordKeyFields, TableName, TableType, TableVersion,
TimelineLayoutVersion,
};
use crate::config::HUDI_CONF_DIR;
use crate::storage::utils::join_url_segments;
use crate::table::Table;

Expand All @@ -336,7 +372,7 @@ mod tests {
.get_schema()
.await
.unwrap()
.all_fields()
.flattened_fields()
.into_iter()
.map(|f| f.name().to_string())
.collect();
Expand Down Expand Up @@ -601,14 +637,36 @@ mod tests {
}

#[tokio::test]
async fn support_external_config_file() {
std::env::set_var("HUDI_CONF_DIR", canonicalize("tests/data/config").unwrap());
let table = new_table_without_validation("table_props_valid").await;
async fn get_global_table_props() {
// Without the environment variable HUDI_CONF_DIR
let table = new_table_without_validation("table_props_partial").await;
let configs = table.configs;
assert!(configs.get(DatabaseName).is_err());
assert!(configs.get(TableType).is_err());
assert_eq!(configs.get(TableName).unwrap().to::<String>(), "trips");

// Environment variable HUDI_CONF_DIR points to nothing
let base_path = env::current_dir().unwrap();
let hudi_conf_dir = base_path.join("random/wrong/dir");
env::set_var(HUDI_CONF_DIR, hudi_conf_dir.as_os_str());
let table = new_table_without_validation("table_props_partial").await;
let configs = table.configs;
assert!(configs.get(DatabaseName).is_err());
assert!(configs.get(TableType).is_err());
assert_eq!(configs.get(TableName).unwrap().to::<String>(), "trips");

// With global config
let base_path = env::current_dir().unwrap();
let hudi_conf_dir = base_path.join("tests/data/hudi_conf_dir");
env::set_var(HUDI_CONF_DIR, hudi_conf_dir.as_os_str());
let table = new_table_without_validation("table_props_partial").await;
let configs = table.configs;
assert_eq!(configs.get(DatabaseName).unwrap().to::<String>(), "tmpdb");
assert_eq!(
configs.get(BaseFileFormat).unwrap().to::<String>(),
"parquet"
configs.get(TableType).unwrap().to::<String>(),
"MERGE_ON_READ"
);
assert_eq!(configs.get(TableVersion).unwrap().to::<isize>(), 6);
assert_eq!(configs.get(TableName).unwrap().to::<String>(), "trips");
env::remove_var(HUDI_CONF_DIR)
}
}
22 changes: 22 additions & 0 deletions crates/core/tests/data/hudi_conf_dir/hudi-defaults.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# Default system properties included when running Hudi jobs.
# This is useful for setting default environmental settings.

hoodie.database.name tmpdb
hoodie.table.type= mor
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

hoodie.table.metadata.partitions=files
hoodie.table.precombine.field=ts
hoodie.table.partition.fields=city
hoodie.archivelog.folder=archived
hoodie.table.cdc.enabled=false
hoodie.timeline.layout.version=1
hoodie.table.checksum=3761586722
hoodie.datasource.write.drop.partition.columns=false
hoodie.table.recordkey.fields=uuid
hoodie.table.name=trips
hoodie.partition.metafile.use.base.format=false
hoodie.datasource.write.hive_style_partitioning=false
hoodie.table.metadata.partitions.inflight=
hoodie.populate.meta.fields=true
hoodie.table.keygenerator.class=org.apache.hudi.keygen.SimpleKeyGenerator
hoodie.table.base.file.format=PARQUET
hoodie.datasource.write.partitionpath.urlencode=false
hoodie.table.version=6
Loading

0 comments on commit eaf8d0d

Please sign in to comment.