diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 7061dd7..afc1f71 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -70,6 +70,19 @@ jobs: name: cov-report-rust-tests-${{ runner.os }} path: ./cov-reports if-no-files-found: 'error' + + rust-integration-tests: + strategy: + fail-fast: false + matrix: + os: [ ubuntu-22.04 ] + runs-on: ${{ matrix.os }} + steps: + - uses: actions/checkout@v4 + - name: Spin up minio for s3 tests + run: docker compose -f docker/docker-compose.yaml --profile s3 up -d + - name: Run Rust integration tests + run: docker compose -f docker/docker-compose.yaml run integration_test_rust python-tests: strategy: diff --git a/.gitignore b/.gitignore index 5f104d1..28b5c9e 100644 --- a/.gitignore +++ b/.gitignore @@ -16,6 +16,7 @@ # under the License. /Cargo.lock +**/Cargo.lock /target **/target diff --git a/crates/tests/src/lib.rs b/crates/tests/src/lib.rs index 457ae4e..89d6cdf 100644 --- a/crates/tests/src/lib.rs +++ b/crates/tests/src/lib.rs @@ -61,6 +61,13 @@ impl TestTable { path_buf.to_str().unwrap().to_string() } + pub fn s3_path(&self) -> String { + let bucket = std::env::var("INTEGRATION_TEST_S3_BUCKET") + .expect("INTEGRATION_TEST_S3_BUCKET not set"); + let data_path = Path::new(format!("s3://{}", bucket).as_str()).join(self.as_ref()); + data_path.to_str().unwrap().to_string() + } + pub fn url(&self) -> Url { let path = self.path(); Url::from_file_path(path).unwrap() diff --git a/docker/Dockerfile.minio b/docker/Dockerfile.minio new file mode 100644 index 0000000..4a27f44 --- /dev/null +++ b/docker/Dockerfile.minio @@ -0,0 +1,23 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +FROM alpine:3.18 +RUN apk update && apk add --no-cache wget ca-certificates bash findutils + +RUN cd /usr/local/bin && \ + wget -q --show-progress https://dl.min.io/client/mc/release/linux-amd64/mc && \ + chmod +x mc diff --git a/docker/docker-compose.yaml b/docker/docker-compose.yaml new file mode 100644 index 0000000..c56eeec --- /dev/null +++ b/docker/docker-compose.yaml @@ -0,0 +1,92 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +version: "3.8" + +services: + minio: + image: quay.io/minio/minio + container_name: minio + profiles: + - s3 + ports: + - "9000:9000" + networks: + - hudi_network + env_file: + - hudi.env + command: server /data + healthcheck: + test: mc ready local + interval: 5s + timeout: 10s + retries: 5 + start_period: 2s + + + mc: + build: + context: ./ + dockerfile: ./Dockerfile.minio + container_name: mc + networks: + - hudi_network + profiles: + - s3 + depends_on: + minio: + condition: service_healthy + env_file: + - hudi.env + volumes: + - ../crates/tests/data/tables:/opt/data:ro + entrypoint: > + /bin/bash -c ' + set -eux + mc config host add store http://minio:9000 $$MINIO_ACCESS_KEY $$MINIO_SECRET_KEY + mc mb store/$$MINIO_TEST_BUCKET + find /opt/data -name "*.zip" -exec unzip {} -d /tmp/data/ \; + find /tmp/data -type d -mindepth 1 -maxdepth 1 -exec mc cp --recursive {} store/$$MINIO_TEST_BUCKET \; + mc ls store/$$MINIO_TEST_BUCKET + ' + + integration_test_rust: + image: rust:1.75 + container_name: integration_test_rust + networks: + - hudi_network + profiles: + - integration_test + env_file: + - hudi.env + volumes: + - ../crates:/app/crates:ro + - ../python:/app/python:ro + - ../integration_test:/app/integration_test + - ../Cargo.toml:/app/Cargo.toml:ro + entrypoint: > + /bin/sh -c " + set -eux + apt update && apt install -y --no-install-recommends libpython3.11-dev + cd /app/integration_test + cargo test + " + + +networks: + hudi_network: + driver: bridge \ No newline at end of file diff --git a/docker/hudi.env b/docker/hudi.env new file mode 100644 index 0000000..ed51bb6 --- /dev/null +++ b/docker/hudi.env @@ -0,0 +1,29 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# setup minio +MINIO_ROOT_USER=minio +MINIO_ROOT_PASSWORD=minio123 +MINIO_ACCESS_KEY=minio +MINIO_SECRET_KEY=minio123 +MINIO_TEST_BUCKET=test-bucket + +# s3 integration test +AWS_ACCESS_KEY_ID=minio +AWS_SECRET_ACCESS_KEY=minio123 +AWS_ENDPOINT=http://minio:9000 +INTEGRATION_TEST_S3_BUCKET=test-bucket diff --git a/integration_test/Cargo.toml b/integration_test/Cargo.toml new file mode 100644 index 0000000..48fb8cf --- /dev/null +++ b/integration_test/Cargo.toml @@ -0,0 +1,35 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[workspace] +members = [] + +[package] +version = "0.1.0" +name = "integration_test" +edition = "2021" +license = "Apache-2.0" +rust-version = "1.75.0" +description = "Integration tests for hudi-rs" + + +[dependencies] +hudi = { features=["datafusion"], path="../crates/hudi" } +hudi-tests = { path="../crates/tests" } +datafusion = { version = "= 39.0.0" } +datafusion-expr = { version = "= 39.0.0" } +tokio = { version = "1", features = ["rt-multi-thread"] } diff --git a/integration_test/src/main.rs b/integration_test/src/main.rs new file mode 100644 index 0000000..319b87e --- /dev/null +++ b/integration_test/src/main.rs @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +fn main() { + println!("hudi-rs integration tests") +} diff --git a/integration_test/tests/hudi_test.rs b/integration_test/tests/hudi_test.rs new file mode 100644 index 0000000..37c7d4e --- /dev/null +++ b/integration_test/tests/hudi_test.rs @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use std::sync::Arc; + +use datafusion::{ + arrow::{ + array::{self, ArrowPrimitiveType, AsArray}, + datatypes::{DataType, Field, Int64Type, Schema, TimeUnit}, + }, + prelude::*, + scalar::ScalarValue, +}; +use hudi::HudiDataSource; +use hudi_tests::TestTable; + +#[derive(PartialEq, Debug)] +struct Record { + id: i32, + name: String, + is_active: bool, +} + +async fn prepare_session_context( + path: &str, + mut options: Vec<(&str, &str)>, + tablename: &str, +) -> SessionContext { + let config = SessionConfig::new() + .set_bool("datafusion.catalog.information_schema", true) + .set( + "datafusion.sql_parser.enable_ident_normalization", + ScalarValue::from(false), + ); + let ctx = SessionContext::new_with_config(config); + + options.append(&mut vec![("allow_http", "true")]); + + let hudi = HudiDataSource::new_with_options(path, options) + .await + .expect("Failed to create HudiDataSource"); + + ctx.register_table(tablename, Arc::new(hudi)) + .expect("Failed to register table"); + ctx +} + +async fn get_primitive_from_dataframe(df: &DataFrame) -> ::Native +where + T: ArrowPrimitiveType, +{ + let result = df + .clone() + .collect() + .await + .expect("Failed to collect result"); + let values = result + .first() + .unwrap() + .column(0) + .as_primitive::() + .values(); + + *values.first().unwrap() +} + +#[tokio::test] +async fn test_datafusion_read_tables() { + for (t, n_cols, n_rows) in &[ + (TestTable::V6ComplexkeygenHivestyle, 21, 4), + (TestTable::V6Nonpartitioned, 21, 4), + (TestTable::V6SimplekeygenHivestyleNoMetafields, 21, 4), + (TestTable::V6SimplekeygenNonhivestyle, 21, 4), + (TestTable::V6TimebasedkeygenNonhivestyle, 22, 4), + ] { + let ctx = prepare_session_context(&t.s3_path(), vec![], t.as_ref()).await; + let df_rows = ctx + .sql(&format!("select count(*) from {}", t.as_ref())) + .await + .expect("Failed to get number of rows"); + let df_cols = ctx + .sql(&format!( + "select count(column_name) from information_schema.columns where table_name = '{}'", + t.as_ref() + )) + .await + .expect("Failed to get number of columns"); + + let rows = get_primitive_from_dataframe::(&df_rows).await; + let cols = get_primitive_from_dataframe::(&df_cols).await; + + assert_eq!(rows, *n_rows); + assert_eq!(cols, *n_cols); + + let expected_data = vec![ + Record { + id: 1, + name: "Alice".to_string(), + is_active: false, + }, + Record { + id: 2, + name: "Bob".to_string(), + is_active: false, + }, + Record { + id: 3, + name: "Carol".to_string(), + is_active: true, + }, + Record { + id: 4, + name: "Diana".to_string(), + is_active: true, + }, + ]; + + let mut actual_data: Vec = vec![]; + let df = ctx + .sql(&format!("select id, name, isActive from {}", t.as_ref())) + .await + .unwrap(); + + df.collect().await.unwrap().iter().for_each(|rb| { + let id = rb + .column_by_name("id") + .unwrap() + .as_any() + .downcast_ref::(); + let name = rb + .column_by_name("name") + .unwrap() + .as_any() + .downcast_ref::(); + + let is_active = rb + .column_by_name("isActive") + .unwrap() + .as_any() + .downcast_ref::(); + + for ((id, name), is_active) in id + .unwrap() + .values() + .iter() + .zip(name.unwrap().iter()) + .zip(is_active.unwrap().values().iter()) + { + actual_data.push(Record { + id: *id, + name: name.unwrap().to_string(), + is_active, + }); + } + }); + + assert!(actual_data + .iter() + .all(|record| expected_data.contains(record))); + + let df_schema = ctx + .sql(&format!( + "select + _hoodie_commit_time, + _hoodie_commit_seqno, + _hoodie_record_key, + _hoodie_partition_path, + _hoodie_file_name, + id, + name, + isActive, + intField, + longField, + floatField, + doubleField, + decimalField, + dateField, + timestampField, + binaryField, + structField.field2 as structField_field2 + from {}", + t.as_ref() + )) + .await + .unwrap(); + + assert_eq!(df_schema.schema().as_arrow(), &get_expected_table_schema()); + } +} + +fn get_expected_table_schema() -> Schema { + Schema::new(vec![ + Field::new("_hoodie_commit_time", DataType::Utf8, true), + Field::new("_hoodie_commit_seqno", DataType::Utf8, true), + Field::new("_hoodie_record_key", DataType::Utf8, true), + Field::new("_hoodie_partition_path", DataType::Utf8, true), + Field::new("_hoodie_file_name", DataType::Utf8, true), + Field::new("id", DataType::Int32, true), + Field::new("name", DataType::Utf8, true), + Field::new("isActive", DataType::Boolean, true), + Field::new("intField", DataType::Int32, true), + Field::new("longField", DataType::Int64, true), + Field::new("floatField", DataType::Float32, true), + Field::new("doubleField", DataType::Float64, true), + Field::new("decimalField", DataType::Decimal128(10, 5), true), + Field::new("dateField", DataType::Date32, true), + Field::new( + "timestampField", + DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())), + true, + ), + Field::new("binaryField", DataType::Binary, true), + Field::new("structField_field2", DataType::Int32, true), + ]) +}