Skip to content

Commit

Permalink
chore: update readme & examples
Browse files Browse the repository at this point in the history
  • Loading branch information
sjrusso8 committed Apr 16, 2024
1 parent 67d7887 commit 23e44f8
Show file tree
Hide file tree
Showing 7 changed files with 78 additions and 79 deletions.
79 changes: 33 additions & 46 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,69 +11,56 @@ Currently, the Spark Connect client for Rust is **highly experimental** and **sh
not be used in any production setting**. This is currently a "proof of concept" to identify the methods
of interacting with Spark cluster from rust.

## Quick Start

The `spark-connect-rs` aims to provide an entrypoint to [Spark Connect](https://spark.apache.org/docs/latest/spark-connect-overview.html), and provide *similar* DataFrame API interactions.

## Getting Started

This section explains how run Spark Connect Rust locally starting from 0.

Step 1: Install rust via rustup: https://www.rust-lang.org/tools/install

Step 2: Ensure you have a [cmake](https://cmake.org/download/) and [protobuf](https://grpc.io/docs/protoc-installation/) install on your machine

Step 3: Run the following commands to clone the repo

```bash
docker compose up --build -d
```
git clone https://github.com/sjrusso8/spark-connect-rs.git
git submodule update --init --recursive

```rust
use spark_connect_rs;

use spark_connect_rs::{SparkSession, SparkSessionBuilder};

use spark_connect_rs::functions as F;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let spark: SparkSession = SparkSessionBuilder::remote("sc://127.0.0.1:15002/")
.build()
.await?;

let df = spark
.sql("SELECT * FROM json.`/opt/spark/examples/src/main/resources/employees.json`")
.await?;

df.filter("salary >= 3500")
.select(F::col("name"))
.show(Some(5), None, None)
.await?;

// +-------------+
// | show_string |
// +-------------+
// | +------+ |
// | |name | |
// | +------+ |
// | |Andy | |
// | |Justin| |
// | |Berta | |
// | +------+ |
// | |
// +-------------+

Ok(())
}
cargo build
```

## Getting Started
Step 4: Setup the Spark Driver on localhost either by downloading spark or docker compose.

With spark download:

1. [Download Spark distribution](https://spark.apache.org/downloads.html) (3.4.0+), unzip the package.

2. Start the Spark Connect server with the following command (make sure to use a package version that matches your Spark distribution):

```
git clone https://github.com/sjrusso8/spark-connect-rs.git
git submodule update --init --recursive
sbin/start-connect-server.sh --packages org.apache.spark:spark-connect_2.12:3.4.0
```

With docker compose:

1. Start the Spark Connect server by leveraging the created `docker-compose.yml` in this repo. This will start a Spark Connect Server running on port 15002

```bash
docker compose up --build -d
```

Step 5: Run an example from the repo under `/examples`

cargo build && cargo test
```bash
cargo run --example sql
```

## Features

The following section outlines some of the larger functionality that are not yet working with this Spark Connect implementation.

- ![done] TLS authentication & Databricks compatability
- ![done] TLS authentication & Databricks compatability via the feature flag `feature = 'tls'`
- ![open] StreamingQueryManager
- ![open] Window and ~~Pivot~~ functions
- ![open] UDFs or any type of functionality that takes a closure (foreach, foreachBatch, etc.)
Expand Down
8 changes: 5 additions & 3 deletions examples/databricks.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
// This example demonstrates connecting to a Databricks Cluster via a
// tls connection.
// This example demonstrates connecting to a Databricks Cluster via a tls connection.
//
// This demo requires access to a Databricks Workspace, a personal access token,
// and a cluster id. The cluster should be running a 13.3LTS runtime or greater. Populate
// the remote URL string between the `<>` with the appropriate details.
//
// The Databricks workspace instance name is the same as the Server Hostname value for your cluster.
// Get connection details for a Databricks compute resource via https://docs.databricks.com/en/integrations/compute-details.html
//
// To view the connected Spark Session, go to the cluster Spark UI and select the 'Connect' tab.

use spark_connect_rs::{SparkSession, SparkSessionBuilder};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let spark: SparkSession = SparkSessionBuilder::remote("sc://<workspace id>:443/;token=<personal access token>;x-databricks-cluster-id=<cluster-id>")
let spark: SparkSession = SparkSessionBuilder::remote("sc://<workspace instance name>:443/;token=<personal access token>;x-databricks-cluster-id=<cluster-id>")
.build()
.await?;

Expand Down
4 changes: 3 additions & 1 deletion examples/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ use spark_connect_rs::dataframe::SaveMode;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let spark: SparkSession = SparkSessionBuilder::default().build().await?;
let spark: SparkSession = SparkSessionBuilder::remote("sc://127.0.0.1:15002/")
.build()
.await?;

let paths = ["/opt/spark/examples/src/main/resources/people.csv"];

Expand Down
7 changes: 4 additions & 3 deletions examples/reader.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
// This example demonstrates creating a Spark DataFrame from a CSV with read options
// and then adding transformations for 'select' & 'sort'
// printing the results as "show(...)"

use spark_connect_rs::{SparkSession, SparkSessionBuilder};

use spark_connect_rs::functions as F;

// This example demonstrates creating a Spark DataFrame from a CSV with read options
// and then adding transformations for 'select' & 'sort'
// printing the results as "show(...)"
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let spark: SparkSession = SparkSessionBuilder::default().build().await?;
Expand Down
2 changes: 1 addition & 1 deletion examples/readstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{thread, time};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let spark: SparkSession =
SparkSessionBuilder::remote("sc://127.0.0.1:15002/;user_id=example_rs")
SparkSessionBuilder::remote("sc://127.0.0.1:15002/;user_id=stream_example")
.build()
.await?;

Expand Down
53 changes: 29 additions & 24 deletions examples/sql.rs
Original file line number Diff line number Diff line change
@@ -1,38 +1,43 @@
use spark_connect_rs;
// This example demonstrates creating a Spark DataFrame from a SQL command
// and saving the results as a parquet and reading the new parquet file

use spark_connect_rs::dataframe::SaveMode;
use spark_connect_rs::{SparkSession, SparkSessionBuilder};

// This example demonstrates creating a Spark DataFrame from a SQL command
// and leveraging &str input for `filter` and `select` to change the dataframe
// Displaying the results as "show(...)"
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let spark: SparkSession =
SparkSessionBuilder::remote("sc://127.0.0.1:15002/;user_id=example_rs")
.build()
.await?;
let spark: SparkSession = SparkSessionBuilder::remote("sc://127.0.0.1:15002/")
.build()
.await?;

let df = spark
.sql("SELECT * FROM json.`/opt/spark/examples/src/main/resources/employees.json`")
.clone()
.sql("select 'apple' as word, 123 as count")
.await?;

df.filter("salary >= 3500")
.select("*")
.show(Some(5), None, None)
df.write()
.mode(SaveMode::Overwrite)
.format("parquet")
.save("file:///tmp/spark-connect-write-example-output.parquet")
.await?;

// +-----------------+
// | show_string |
// +-----------------+
// | +------+------+ |
// | |name |salary| |
// | +------+------+ |
// | |Andy |4500 | |
// | |Justin|3500 | |
// | |Berta |4000 | |
// | +------+------+ |
// | |
// +-----------------+
let df = spark
.read()
.format("parquet")
.load(["file:///tmp/spark-connect-write-example-output.parquet"])?;

df.show(Some(100), None, None).await?;

// +---------------+
// | show_string |
// +---------------+
// | +-----+-----+ |
// | |word |count| |
// | +-----+-----+ |
// | |apple|123 | |
// | +-----+-----+ |
// | |
// +---------------+

Ok(())
}
4 changes: 3 additions & 1 deletion examples/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ use spark_connect_rs::dataframe::SaveMode;
// then reading the csv file back
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let spark: SparkSession = SparkSessionBuilder::default().build().await?;
let spark: SparkSession = SparkSessionBuilder::remote("sc://127.0.0.1:15002/")
.build()
.await?;

let df = spark
.clone()
Expand Down

0 comments on commit 23e44f8

Please sign in to comment.