From aaf1d1cac90cdcf4fea76ee152907712cf71efdc Mon Sep 17 00:00:00 2001 From: veeupup Date: Sat, 25 Nov 2023 17:12:45 +0800 Subject: [PATCH 1/3] Library Guide: Add Using the DataFrame API Signed-off-by: veeupup --- .../using-the-dataframe-api.md | 103 +++++++++++++++++- 1 file changed, 102 insertions(+), 1 deletion(-) diff --git a/docs/source/library-user-guide/using-the-dataframe-api.md b/docs/source/library-user-guide/using-the-dataframe-api.md index fdf309980dc2..a14a5ff755c6 100644 --- a/docs/source/library-user-guide/using-the-dataframe-api.md +++ b/docs/source/library-user-guide/using-the-dataframe-api.md @@ -19,4 +19,105 @@ # Using the DataFrame API -Coming Soon +## What is a DataFrame + +`DataFrame` is a basic concept in `datafusion` and is only a thin wrapper over LogicalPlan. + +```rust +pub struct DataFrame { + session_state: SessionState, + plan: LogicalPlan, +} +``` + +## How to generate a DataFrame + +You can manually call the `DataFrame` API or automatically generate a `DataFrame` through the SQL query planner just like: + +use `sql` to construct `DataFrame`: + +```rust +let ctx = SessionContext::new(); +// Register the in-memory table containing the data +ctx.register_table("users", Arc::new(create_memtable()?))?; +let dataframe = ctx.sql("SELECT * FROM users;").await?; +``` + +construct `DataFrame` manually + +```rust +let ctx = SessionContext::new(); +// Register the in-memory table containing the data +ctx.register_table("users", Arc::new(create_memtable()?))?; +let dataframe = ctx + .table("users") + .filter(col("a").lt_eq(col("b")))? + .sort(vec![col("a").sort(true, true), col("b").sort(false, false)])?; +``` + +## Collect / Streaming Exec + +When you have a `DataFrame`, you may want to access the results of the internal `LogicalPlan`. You can do this by using `collect` to retrieve all outputs at once, or `streaming_exec` to obtain a `SendableRecordBatchStream`. + +You can just collect all outputs once like: + +```rust +let ctx = SessionContext::new(); +let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?; +let batches = df.collect().await?; +``` + +You can also use stream output to iterate the `RecordBatch` + +```rust +let ctx = SessionContext::new(); +let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?; +let mut stream = df.execute_stream().await?; +while let Some(rb) = stream.next().await { + println!("{rb:?}"); +} +``` + +# Write DataFrame to Files + +You can also serializate `DataFrame` to a file. For now, `Datafusion` supports write `DataFrame` to `csv`, `json` and `parquet`. + +Before writing to a file, it will call collect to calculate all the results of the DataFrame and then write to file. + +For example, if you write it to a csv_file + +```rust +let ctx = SessionContext::new(); +// Register the in-memory table containing the data +ctx.register_table("users", Arc::new(mem_table))?; +let dataframe = ctx.sql("SELECT * FROM users;").await?; + +dataframe + .write_csv("user_dataframe.csv", DataFrameWriteOptions::default(), None) + .await; +``` + +and the file will look like (Example Output): + +``` +id,bank_account +1,9000 +``` + +## Transform between LogicalPlan and DataFrame + +As it is showed above, `DataFrame` is just a very thin wrapper of `LogicalPlan`, so you can easily go back and forth between them. + +```rust +// Just combine LogicalPlan with SessionContext and you get a DataFrame +let ctx = SessionContext::new(); +// Register the in-memory table containing the data +ctx.register_table("users", Arc::new(mem_table))?; +let dataframe = ctx.sql("SELECT * FROM users;").await?; + +// get LogicalPlan in dataframe +let plan = dataframe.logical_plan().clone(); + +// construct a DataFrame with LogicalPlan +let new_df = DataFrame::new(ctx.state(), plan); +``` From a2e07484e36d0055f70104e156678caad0f58b01 Mon Sep 17 00:00:00 2001 From: veeupup Date: Mon, 27 Nov 2023 20:33:56 +0800 Subject: [PATCH 2/3] fix comments --- .../using-the-dataframe-api.md | 20 ++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/docs/source/library-user-guide/using-the-dataframe-api.md b/docs/source/library-user-guide/using-the-dataframe-api.md index a14a5ff755c6..ebe37e0207f9 100644 --- a/docs/source/library-user-guide/using-the-dataframe-api.md +++ b/docs/source/library-user-guide/using-the-dataframe-api.md @@ -30,6 +30,24 @@ pub struct DataFrame { } ``` +For both `DataFrame` and `LogicalPlan`, you can build the query manually, such as: + +```rust +let df = ctx.table("users").await?; + +let new_df = df.select(vec![col("id"), col("bank_account")])? + .sort(vec![col("id")])?; + +let plan = LogicalPlanBuilder::from(&df.to_logical_plan()) + .project(vec![col("id"), col("bank_account")])? + .sort(vec![col("id")])? + .build()?; +``` + +But The main difference between a DataFrame and a LogicalPlan is that the DataFrame contains functionality for executing queries rather than just building plans. + +You can use `collect` or `execute_stream` to execute the query. + ## How to generate a DataFrame You can manually call the `DataFrame` API or automatically generate a `DataFrame` through the SQL query planner just like: @@ -80,7 +98,7 @@ while let Some(rb) = stream.next().await { # Write DataFrame to Files -You can also serializate `DataFrame` to a file. For now, `Datafusion` supports write `DataFrame` to `csv`, `json` and `parquet`. +You can also serialize `DataFrame` to a file. For now, `Datafusion` supports write `DataFrame` to `csv`, `json` and `parquet`. Before writing to a file, it will call collect to calculate all the results of the DataFrame and then write to file. From d2521430856b39de9e6f3c07a7cd5e432bf31d1c Mon Sep 17 00:00:00 2001 From: veeupup Date: Tue, 28 Nov 2023 19:48:13 +0800 Subject: [PATCH 3/3] fix comments Signed-off-by: veeupup --- .../using-the-dataframe-api.md | 30 +++++++++++-------- 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/docs/source/library-user-guide/using-the-dataframe-api.md b/docs/source/library-user-guide/using-the-dataframe-api.md index ebe37e0207f9..c4f4ecd4f137 100644 --- a/docs/source/library-user-guide/using-the-dataframe-api.md +++ b/docs/source/library-user-guide/using-the-dataframe-api.md @@ -21,7 +21,7 @@ ## What is a DataFrame -`DataFrame` is a basic concept in `datafusion` and is only a thin wrapper over LogicalPlan. +`DataFrame` in `DataFrame` is modeled after the Pandas DataFrame interface, and is a thin wrapper over LogicalPlan that adds functionality for building and executing those plans. ```rust pub struct DataFrame { @@ -30,29 +30,29 @@ pub struct DataFrame { } ``` -For both `DataFrame` and `LogicalPlan`, you can build the query manually, such as: +You can build up `DataFrame`s using its methods, similarly to building `LogicalPlan`s using `LogicalPlanBuilder`: ```rust let df = ctx.table("users").await?; +// Create a new DataFrame sorted by `id`, `bank_account` let new_df = df.select(vec![col("id"), col("bank_account")])? .sort(vec![col("id")])?; +// Build the same plan using the LogicalPlanBuilder let plan = LogicalPlanBuilder::from(&df.to_logical_plan()) .project(vec![col("id"), col("bank_account")])? .sort(vec![col("id")])? .build()?; ``` -But The main difference between a DataFrame and a LogicalPlan is that the DataFrame contains functionality for executing queries rather than just building plans. - You can use `collect` or `execute_stream` to execute the query. ## How to generate a DataFrame -You can manually call the `DataFrame` API or automatically generate a `DataFrame` through the SQL query planner just like: +You can directly use the `DataFrame` API or generate a `DataFrame` from a SQL query. -use `sql` to construct `DataFrame`: +For example, to use `sql` to construct `DataFrame`: ```rust let ctx = SessionContext::new(); @@ -61,7 +61,7 @@ ctx.register_table("users", Arc::new(create_memtable()?))?; let dataframe = ctx.sql("SELECT * FROM users;").await?; ``` -construct `DataFrame` manually +To construct `DataFrame` using the API: ```rust let ctx = SessionContext::new(); @@ -75,7 +75,13 @@ let dataframe = ctx ## Collect / Streaming Exec -When you have a `DataFrame`, you may want to access the results of the internal `LogicalPlan`. You can do this by using `collect` to retrieve all outputs at once, or `streaming_exec` to obtain a `SendableRecordBatchStream`. +DataFusion `DataFrame`s are "lazy", meaning they do not do any processing until they are executed, which allows for additional optimizations. + +When you have a `DataFrame`, you can run it in one of three ways: + +1. `collect` which executes the query and buffers all the output into a `Vec` +2. `streaming_exec`, which begins executions and returns a `SendableRecordBatchStream` which incrementally computes output on each call to `next()` +3. `cache` which executes the query and buffers the output into a new in memory DataFrame. You can just collect all outputs once like: @@ -85,7 +91,7 @@ let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?; let batches = df.collect().await?; ``` -You can also use stream output to iterate the `RecordBatch` +You can also use stream output to incrementally generate output one `RecordBatch` at a time ```rust let ctx = SessionContext::new(); @@ -100,9 +106,9 @@ while let Some(rb) = stream.next().await { You can also serialize `DataFrame` to a file. For now, `Datafusion` supports write `DataFrame` to `csv`, `json` and `parquet`. -Before writing to a file, it will call collect to calculate all the results of the DataFrame and then write to file. +When writing a file, DataFusion will execute the DataFrame and stream the results to a file. -For example, if you write it to a csv_file +For example, to write a csv_file ```rust let ctx = SessionContext::new(); @@ -124,7 +130,7 @@ id,bank_account ## Transform between LogicalPlan and DataFrame -As it is showed above, `DataFrame` is just a very thin wrapper of `LogicalPlan`, so you can easily go back and forth between them. +As shown above, `DataFrame` is just a very thin wrapper of `LogicalPlan`, so you can easily go back and forth between them. ```rust // Just combine LogicalPlan with SessionContext and you get a DataFrame