From ce8964d9b25f9b3bd9f22ad1f3a73a1f73930f42 Mon Sep 17 00:00:00 2001 From: Steve Russo <64294847+sjrusso8@users.noreply.github.com> Date: Fri, 31 May 2024 14:48:03 -0400 Subject: [PATCH] simplify dataset location --- .gitignore | 3 +++ README.md | 13 +++++++++---- core/src/dataframe.rs | 4 ++-- core/src/functions/mod.rs | 6 ++++-- core/src/readwriter.rs | 4 ++-- docker-compose.yml | 2 +- examples/README.md | 10 ++++++++++ examples/src/deltalake.rs | 4 ++-- examples/src/reader.rs | 4 ++-- 9 files changed, 35 insertions(+), 15 deletions(-) diff --git a/.gitignore b/.gitignore index 50b3c18..0d5f784 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,6 @@ .vscode *.ipynb + +/spark-warehouse +/artifacts diff --git a/README.md b/README.md index 4ab032f..ee6a368 100644 --- a/README.md +++ b/README.md @@ -46,12 +46,17 @@ cargo build With local spark: -1. [Download Spark distribution](https://spark.apache.org/downloads.html) (3.4.0+), unzip the package. +1. [Download Spark distribution](https://spark.apache.org/downloads.html) (3.5.1 recommended), unzip the package. + +2. Set your `SPARK_HOME` environment variable to the location where spark was extracted to, 2. Start the Spark Connect server with the following command (make sure to use a package version that matches your Spark distribution): -``` -sbin/start-connect-server.sh --packages org.apache.spark:spark-connect_2.12:3.4.0 +```bash +$ $SPARK_HOME/sbin/start-connect-server.sh --packages "org.apache.spark:spark-connect_2.12:3.5.1,io.delta:delta-spark_2.12:3.0.0" \ + --conf "spark.driver.extraJavaOptions=-Divy.cache.dir=/tmp -Divy.home=/tmp" \ + --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \ + --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" ``` With docker: @@ -59,7 +64,7 @@ With docker: 1. Start the Spark Connect server by leveraging the created `docker-compose.yml` in this repo. This will start a Spark Connect Server running on port 15002 ```bash -docker compose up --build -d +$ docker compose up --build -d ``` **Step 5**: Run an example from the repo under [/examples](https://github.com/sjrusso8/spark-connect-rs/tree/main/examples/README.md) diff --git a/core/src/dataframe.rs b/core/src/dataframe.rs index 3c01006..393349b 100644 --- a/core/src/dataframe.rs +++ b/core/src/dataframe.rs @@ -58,7 +58,7 @@ use polars_arrow; /// A `DataFrame` is created from a `spark.sql()` statement /// /// ```rust -/// let df = spark.sql("SELECT * FROM json.`/datasets/employees.json`").await?; +/// let df = spark.sql("SELECT * FROM json.`/opt/spark/work-dir/datasets/employees.json`").await?; /// ``` /// /// ## read & readStream @@ -1745,7 +1745,7 @@ mod tests { async fn test_df_input_files() -> Result<(), SparkError> { let spark = setup().await; - let path = ["/datasets/people.csv"]; + let path = ["/opt/spark/work-dir/datasets/people.csv"]; let df = spark .read() diff --git a/core/src/functions/mod.rs b/core/src/functions/mod.rs index cb0c61b..60ffa74 100644 --- a/core/src/functions/mod.rs +++ b/core/src/functions/mod.rs @@ -643,7 +643,7 @@ mod tests { async fn test_func_input_file() -> Result<(), SparkError> { let spark = setup().await; - let path = ["/datasets/people.csv"]; + let path = ["/opt/spark/work-dir/datasets/people.csv"]; let df = spark .read() @@ -654,7 +654,9 @@ mod tests { let res = df.select(input_file_name()).head(None).await?; - let a: ArrayRef = Arc::new(StringArray::from(vec!["file:///datasets/people.csv"])); + let a: ArrayRef = Arc::new(StringArray::from(vec![ + "file:///opt/spark/work-dir/datasets/people.csv", + ])); let expected = RecordBatch::try_from_iter(vec![("input_file_name()", a)])?; diff --git a/core/src/readwriter.rs b/core/src/readwriter.rs index b72b194..2b5abdb 100644 --- a/core/src/readwriter.rs +++ b/core/src/readwriter.rs @@ -462,7 +462,7 @@ mod tests { async fn test_dataframe_read() -> Result<(), SparkError> { let spark = setup().await; - let path = ["/datasets/people.csv"]; + let path = ["/opt/spark/work-dir/datasets/people.csv"]; let df = spark .read() @@ -481,7 +481,7 @@ mod tests { async fn test_dataframe_read_schema() -> Result<(), SparkError> { let spark = setup().await; - let path = ["/datasets/people.json"]; + let path = ["/opt/spark/work-dir/datasets/people.csv"]; let schema = StructType::new(vec![ StructField { diff --git a/docker-compose.yml b/docker-compose.yml index b525beb..a56d1e3 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -14,4 +14,4 @@ services: - "4040:4040" - "15002:15002" volumes: - - ./datasets:/datasets + - ./datasets:/opt/spark/work-dir/datasets diff --git a/examples/README.md b/examples/README.md index 1ac2943..a1837d8 100644 --- a/examples/README.md +++ b/examples/README.md @@ -47,6 +47,16 @@ $ cargo run --bin readstream Read a file into a dataframe, save the result as a deltalake table, and append a new record to the table. **Prerequisite** the spark cluster must be started with the deltalake package. The `docker-compose.yml` provided in the repo has deltalake pre-installed. +Or if you are running a spark connect server location, run the below scripts first + +If you are running a local spark connect server. The Delta Lake jars need to be added onto the server before it starts. + +```bash +$ $SPARK_HOME/sbin/start-connect-server.sh --packages "org.apache.spark:spark-connect_2.12:3.5.1,io.delta:delta-spark_2.12:3.0.0" \ + --conf "spark.driver.extraJavaOptions=-Divy.cache.dir=/tmp -Divy.home=/tmp" \ + --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \ + --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" +``` ```bash $ cargo run --bin deltalake diff --git a/examples/src/deltalake.rs b/examples/src/deltalake.rs index fea77d3..c7526ec 100644 --- a/examples/src/deltalake.rs +++ b/examples/src/deltalake.rs @@ -17,8 +17,8 @@ async fn main() -> Result<(), Box> { .await?; // path might vary based on where you started your spark cluster - // the `/examples` folder of spark contains dummy data - let paths = ["/datasets/people.csv"]; + // the `/datasets/` folder of spark contains dummy data + let paths = ["./datasets/people.csv"]; // Load a CSV file from the spark server let df = spark diff --git a/examples/src/reader.rs b/examples/src/reader.rs index d5e699e..42e4afb 100644 --- a/examples/src/reader.rs +++ b/examples/src/reader.rs @@ -11,14 +11,14 @@ use spark_connect_rs::types::DataType; async fn main() -> Result<(), Box> { let spark: SparkSession = SparkSessionBuilder::default().build().await?; - let path = ["/datasets/people.csv"]; + let path = "./datasets/people.csv"; let df = spark .read() .format("csv") .option("header", "True") .option("delimiter", ";") - .load(path)?; + .load([path])?; // select columns and perform data manipulations let df = df