diff --git a/README.md b/README.md index 386b8ad..62b6952 100644 --- a/README.md +++ b/README.md @@ -11,69 +11,56 @@ Currently, the Spark Connect client for Rust is **highly experimental** and **sh not be used in any production setting**. This is currently a "proof of concept" to identify the methods of interacting with Spark cluster from rust. -## Quick Start - The `spark-connect-rs` aims to provide an entrypoint to [Spark Connect](https://spark.apache.org/docs/latest/spark-connect-overview.html), and provide *similar* DataFrame API interactions. +## Getting Started + +This section explains how run Spark Connect Rust locally starting from 0. + +Step 1: Install rust via rustup: https://www.rust-lang.org/tools/install + +Step 2: Ensure you have a [cmake](https://cmake.org/download/) and [protobuf](https://grpc.io/docs/protoc-installation/) install on your machine + +Step 3: Run the following commands to clone the repo + ```bash -docker compose up --build -d -``` +git clone https://github.com/sjrusso8/spark-connect-rs.git +git submodule update --init --recursive -```rust -use spark_connect_rs; - -use spark_connect_rs::{SparkSession, SparkSessionBuilder}; - -use spark_connect_rs::functions as F; - -#[tokio::main] -async fn main() -> Result<(), Box> { - let spark: SparkSession = SparkSessionBuilder::remote("sc://127.0.0.1:15002/") - .build() - .await?; - - let df = spark - .sql("SELECT * FROM json.`/opt/spark/examples/src/main/resources/employees.json`") - .await?; - - df.filter("salary >= 3500") - .select(F::col("name")) - .show(Some(5), None, None) - .await?; - - // +-------------+ - // | show_string | - // +-------------+ - // | +------+ | - // | |name | | - // | +------+ | - // | |Andy | | - // | |Justin| | - // | |Berta | | - // | +------+ | - // | | - // +-------------+ - - Ok(()) -} +cargo build ``` -## Getting Started +Step 4: Setup the Spark Driver on localhost either by downloading spark or docker compose. + +With spark download: + +1. [Download Spark distribution](https://spark.apache.org/downloads.html) (3.4.0+), unzip the package. + +2. Start the Spark Connect server with the following command (make sure to use a package version that matches your Spark distribution): ``` -git clone https://github.com/sjrusso8/spark-connect-rs.git -git submodule update --init --recursive +sbin/start-connect-server.sh --packages org.apache.spark:spark-connect_2.12:3.4.0 +``` +With docker compose: + +1. Start the Spark Connect server by leveraging the created `docker-compose.yml` in this repo. This will start a Spark Connect Server running on port 15002 + +```bash docker compose up --build -d +``` + +Step 5: Run an example from the repo under `/examples` -cargo build && cargo test +```bash +cargo run --example sql ``` ## Features The following section outlines some of the larger functionality that are not yet working with this Spark Connect implementation. -- ![done] TLS authentication & Databricks compatability +- ![done] TLS authentication & Databricks compatability via the feature flag `feature = 'tls'` - ![open] StreamingQueryManager - ![open] Window and ~~Pivot~~ functions - ![open] UDFs or any type of functionality that takes a closure (foreach, foreachBatch, etc.) diff --git a/examples/databricks.rs b/examples/databricks.rs index 372bed5..9fd3ecb 100644 --- a/examples/databricks.rs +++ b/examples/databricks.rs @@ -1,17 +1,19 @@ -// This example demonstrates connecting to a Databricks Cluster via a -// tls connection. +// This example demonstrates connecting to a Databricks Cluster via a tls connection. // // This demo requires access to a Databricks Workspace, a personal access token, // and a cluster id. The cluster should be running a 13.3LTS runtime or greater. Populate // the remote URL string between the `<>` with the appropriate details. // +// The Databricks workspace instance name is the same as the Server Hostname value for your cluster. +// Get connection details for a Databricks compute resource via https://docs.databricks.com/en/integrations/compute-details.html +// // To view the connected Spark Session, go to the cluster Spark UI and select the 'Connect' tab. use spark_connect_rs::{SparkSession, SparkSessionBuilder}; #[tokio::main] async fn main() -> Result<(), Box> { - let spark: SparkSession = SparkSessionBuilder::remote("sc://:443/;token=;x-databricks-cluster-id=") + let spark: SparkSession = SparkSessionBuilder::remote("sc://:443/;token=;x-databricks-cluster-id=") .build() .await?; diff --git a/examples/delta.rs b/examples/delta.rs index 425befa..5a2477f 100644 --- a/examples/delta.rs +++ b/examples/delta.rs @@ -12,7 +12,9 @@ use spark_connect_rs::dataframe::SaveMode; #[tokio::main] async fn main() -> Result<(), Box> { - let spark: SparkSession = SparkSessionBuilder::default().build().await?; + let spark: SparkSession = SparkSessionBuilder::remote("sc://127.0.0.1:15002/") + .build() + .await?; let paths = ["/opt/spark/examples/src/main/resources/people.csv"]; diff --git a/examples/reader.rs b/examples/reader.rs index 550469a..67c2293 100644 --- a/examples/reader.rs +++ b/examples/reader.rs @@ -1,10 +1,11 @@ +// This example demonstrates creating a Spark DataFrame from a CSV with read options +// and then adding transformations for 'select' & 'sort' +// printing the results as "show(...)" + use spark_connect_rs::{SparkSession, SparkSessionBuilder}; use spark_connect_rs::functions as F; -// This example demonstrates creating a Spark DataFrame from a CSV with read options -// and then adding transformations for 'select' & 'sort' -// printing the results as "show(...)" #[tokio::main] async fn main() -> Result<(), Box> { let spark: SparkSession = SparkSessionBuilder::default().build().await?; diff --git a/examples/readstream.rs b/examples/readstream.rs index 025098a..5d5b10a 100644 --- a/examples/readstream.rs +++ b/examples/readstream.rs @@ -9,7 +9,7 @@ use std::{thread, time}; #[tokio::main] async fn main() -> Result<(), Box> { let spark: SparkSession = - SparkSessionBuilder::remote("sc://127.0.0.1:15002/;user_id=example_rs") + SparkSessionBuilder::remote("sc://127.0.0.1:15002/;user_id=stream_example") .build() .await?; diff --git a/examples/sql.rs b/examples/sql.rs index 1284a5c..843e35a 100644 --- a/examples/sql.rs +++ b/examples/sql.rs @@ -1,38 +1,43 @@ -use spark_connect_rs; +// This example demonstrates creating a Spark DataFrame from a SQL command +// and saving the results as a parquet and reading the new parquet file +use spark_connect_rs::dataframe::SaveMode; use spark_connect_rs::{SparkSession, SparkSessionBuilder}; -// This example demonstrates creating a Spark DataFrame from a SQL command -// and leveraging &str input for `filter` and `select` to change the dataframe -// Displaying the results as "show(...)" #[tokio::main] async fn main() -> Result<(), Box> { - let spark: SparkSession = - SparkSessionBuilder::remote("sc://127.0.0.1:15002/;user_id=example_rs") - .build() - .await?; + let spark: SparkSession = SparkSessionBuilder::remote("sc://127.0.0.1:15002/") + .build() + .await?; let df = spark - .sql("SELECT * FROM json.`/opt/spark/examples/src/main/resources/employees.json`") + .clone() + .sql("select 'apple' as word, 123 as count") .await?; - df.filter("salary >= 3500") - .select("*") - .show(Some(5), None, None) + df.write() + .mode(SaveMode::Overwrite) + .format("parquet") + .save("file:///tmp/spark-connect-write-example-output.parquet") .await?; - // +-----------------+ - // | show_string | - // +-----------------+ - // | +------+------+ | - // | |name |salary| | - // | +------+------+ | - // | |Andy |4500 | | - // | |Justin|3500 | | - // | |Berta |4000 | | - // | +------+------+ | - // | | - // +-----------------+ + let df = spark + .read() + .format("parquet") + .load(["file:///tmp/spark-connect-write-example-output.parquet"])?; + + df.show(Some(100), None, None).await?; + + // +---------------+ + // | show_string | + // +---------------+ + // | +-----+-----+ | + // | |word |count| | + // | +-----+-----+ | + // | |apple|123 | | + // | +-----+-----+ | + // | | + // +---------------+ Ok(()) } diff --git a/examples/writer.rs b/examples/writer.rs index 42e9708..ab52de5 100644 --- a/examples/writer.rs +++ b/examples/writer.rs @@ -11,7 +11,9 @@ use spark_connect_rs::dataframe::SaveMode; // then reading the csv file back #[tokio::main] async fn main() -> Result<(), Box> { - let spark: SparkSession = SparkSessionBuilder::default().build().await?; + let spark: SparkSession = SparkSessionBuilder::remote("sc://127.0.0.1:15002/") + .build() + .await?; let df = spark .clone()