Skip to content

Commit

Permalink
simplify dataset location
Browse files Browse the repository at this point in the history
  • Loading branch information
sjrusso8 committed May 31, 2024
1 parent b71e4d6 commit ce8964d
Show file tree
Hide file tree
Showing 9 changed files with 35 additions and 15 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,6 @@

.vscode
*.ipynb

/spark-warehouse
/artifacts
13 changes: 9 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,20 +46,25 @@ cargo build

With local spark:

1. [Download Spark distribution](https://spark.apache.org/downloads.html) (3.4.0+), unzip the package.
1. [Download Spark distribution](https://spark.apache.org/downloads.html) (3.5.1 recommended), unzip the package.

2. Set your `SPARK_HOME` environment variable to the location where spark was extracted to,

2. Start the Spark Connect server with the following command (make sure to use a package version that matches your Spark distribution):

```
sbin/start-connect-server.sh --packages org.apache.spark:spark-connect_2.12:3.4.0
```bash
$ $SPARK_HOME/sbin/start-connect-server.sh --packages "org.apache.spark:spark-connect_2.12:3.5.1,io.delta:delta-spark_2.12:3.0.0" \
--conf "spark.driver.extraJavaOptions=-Divy.cache.dir=/tmp -Divy.home=/tmp" \
--conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
--conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
```

With docker:

1. Start the Spark Connect server by leveraging the created `docker-compose.yml` in this repo. This will start a Spark Connect Server running on port 15002

```bash
docker compose up --build -d
$ docker compose up --build -d
```

**Step 5**: Run an example from the repo under [/examples](https://github.com/sjrusso8/spark-connect-rs/tree/main/examples/README.md)
Expand Down
4 changes: 2 additions & 2 deletions core/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ use polars_arrow;
/// A `DataFrame` is created from a `spark.sql()` statement
///
/// ```rust
/// let df = spark.sql("SELECT * FROM json.`/datasets/employees.json`").await?;
/// let df = spark.sql("SELECT * FROM json.`/opt/spark/work-dir/datasets/employees.json`").await?;
/// ```
///
/// ## read & readStream
Expand Down Expand Up @@ -1745,7 +1745,7 @@ mod tests {
async fn test_df_input_files() -> Result<(), SparkError> {
let spark = setup().await;

let path = ["/datasets/people.csv"];
let path = ["/opt/spark/work-dir/datasets/people.csv"];

let df = spark
.read()
Expand Down
6 changes: 4 additions & 2 deletions core/src/functions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -643,7 +643,7 @@ mod tests {
async fn test_func_input_file() -> Result<(), SparkError> {
let spark = setup().await;

let path = ["/datasets/people.csv"];
let path = ["/opt/spark/work-dir/datasets/people.csv"];

let df = spark
.read()
Expand All @@ -654,7 +654,9 @@ mod tests {

let res = df.select(input_file_name()).head(None).await?;

let a: ArrayRef = Arc::new(StringArray::from(vec!["file:///datasets/people.csv"]));
let a: ArrayRef = Arc::new(StringArray::from(vec![
"file:///opt/spark/work-dir/datasets/people.csv",
]));

let expected = RecordBatch::try_from_iter(vec![("input_file_name()", a)])?;

Expand Down
4 changes: 2 additions & 2 deletions core/src/readwriter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ mod tests {
async fn test_dataframe_read() -> Result<(), SparkError> {
let spark = setup().await;

let path = ["/datasets/people.csv"];
let path = ["/opt/spark/work-dir/datasets/people.csv"];

let df = spark
.read()
Expand All @@ -481,7 +481,7 @@ mod tests {
async fn test_dataframe_read_schema() -> Result<(), SparkError> {
let spark = setup().await;

let path = ["/datasets/people.json"];
let path = ["/opt/spark/work-dir/datasets/people.csv"];

let schema = StructType::new(vec![
StructField {
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@ services:
- "4040:4040"
- "15002:15002"
volumes:
- ./datasets:/datasets
- ./datasets:/opt/spark/work-dir/datasets
10 changes: 10 additions & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,16 @@ $ cargo run --bin readstream
Read a file into a dataframe, save the result as a deltalake table, and append a new record to the table.

**Prerequisite** the spark cluster must be started with the deltalake package. The `docker-compose.yml` provided in the repo has deltalake pre-installed.
Or if you are running a spark connect server location, run the below scripts first

If you are running a local spark connect server. The Delta Lake jars need to be added onto the server before it starts.

```bash
$ $SPARK_HOME/sbin/start-connect-server.sh --packages "org.apache.spark:spark-connect_2.12:3.5.1,io.delta:delta-spark_2.12:3.0.0" \
--conf "spark.driver.extraJavaOptions=-Divy.cache.dir=/tmp -Divy.home=/tmp" \
--conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
--conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
```

```bash
$ cargo run --bin deltalake
Expand Down
4 changes: 2 additions & 2 deletions examples/src/deltalake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.await?;

// path might vary based on where you started your spark cluster
// the `/examples` folder of spark contains dummy data
let paths = ["/datasets/people.csv"];
// the `/datasets/` folder of spark contains dummy data
let paths = ["./datasets/people.csv"];

// Load a CSV file from the spark server
let df = spark
Expand Down
4 changes: 2 additions & 2 deletions examples/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ use spark_connect_rs::types::DataType;
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let spark: SparkSession = SparkSessionBuilder::default().build().await?;

let path = ["/datasets/people.csv"];
let path = "./datasets/people.csv";

let df = spark
.read()
.format("csv")
.option("header", "True")
.option("delimiter", ";")
.load(path)?;
.load([path])?;

// select columns and perform data manipulations
let df = df
Expand Down

0 comments on commit ce8964d

Please sign in to comment.