Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rebase spiceai-41 branch #18

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
0567c02
feat: initial subquery support (#37)
backkem May 22, 2024
ce87bf4
feat: add fallback TableProvider to FederatedTableProviderAdaptor (#39)
phillipleblanc Jun 18, 2024
330e4e8
delete redundant github workflows & commitlint
hozan23 Aug 23, 2024
f7182a3
update deps && use datafusion 41
hozan23 Aug 22, 2024
05519e8
remove connectorx from sources/sql crate
hozan23 Aug 22, 2024
3408e79
flight-sql: fix up dependencie issues
hozan23 Aug 22, 2024
a514a6c
move sources/sql code to datafusion-federation crate and add sql feature
hozan23 Aug 23, 2024
b699eec
move sources/flight-sql to datafusion-flight-sql-server
hozan23 Aug 23, 2024
d9449e5
"create datafusion-flight-sql-table-provider crate & move the executo…
hozan23 Aug 23, 2024
ac84a50
override supports_rewrite for FederationOptimizerRule and SQLFederati…
hozan23 Aug 23, 2024
5d240d7
Improve project overview
backkem Aug 23, 2024
f4babee
update README.md (#48)
hozan23 Aug 24, 2024
48dbea5
datafusion-fedeartion: Remove assert_eq macros and handle errors prop…
hozan23 Aug 26, 2024
c8fa466
Add example in datafusion-federation crate (#50)
hozan23 Aug 26, 2024
1033cff
feat: initial subquery support (#37)
backkem May 22, 2024
7dfb653
delete redundant github workflows & commitlint
hozan23 Aug 23, 2024
8968619
update deps && use datafusion 41
hozan23 Aug 22, 2024
5741fd6
remove connectorx from sources/sql crate
hozan23 Aug 22, 2024
82f3dfa
flight-sql: fix up dependencie issues
hozan23 Aug 22, 2024
3e5996d
move sources/sql code to datafusion-federation crate and add sql feature
hozan23 Aug 23, 2024
6e6acd5
move sources/flight-sql to datafusion-flight-sql-server
hozan23 Aug 23, 2024
0cb719d
"create datafusion-flight-sql-table-provider crate & move the executo…
hozan23 Aug 23, 2024
66fb747
override supports_rewrite for FederationOptimizerRule and SQLFederati…
hozan23 Aug 23, 2024
5e7bba4
Improve project overview
backkem Aug 23, 2024
b80203a
update README.md (#48)
hozan23 Aug 24, 2024
95d3416
datafusion-fedeartion: Remove assert_eq macros and handle errors prop…
hozan23 Aug 26, 2024
f789187
Add example in datafusion-federation crate (#50)
hozan23 Aug 26, 2024
49f8be0
fix rebase conflicts
hozan23 Aug 29, 2024
340a955
Merge remote-tracking branch 'upstream/main' into datafusion-federati…
hozan23 Aug 29, 2024
ffdbe87
cargo fmt
hozan23 Aug 29, 2024
f9abf43
remove datafusion-federation/analyzer.rs
hozan23 Aug 29, 2024
238f836
datafusion-federation: export FederatedPlanner
hozan23 Aug 29, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 0 additions & 13 deletions .github/workflows/check.yml

This file was deleted.

30 changes: 0 additions & 30 deletions .github/workflows/pull-request.yml

This file was deleted.

4 changes: 2 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ jobs:
- uses: arduino/setup-protoc@v3
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- run: cargo clippy -- -Dwarnings
- run: cargo clippy -- -D warnings

package:
name: Package
Expand All @@ -69,5 +69,5 @@ jobs:
- uses: arduino/setup-protoc@v3
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- run: cargo build
- run: cargo build --all
- run: cargo package -p datafusion-federation --allow-dirty
12 changes: 3 additions & 9 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,18 @@ resolver = "2"

members = [
"datafusion-federation",
"examples",
"sources/sql",
"sources/flight-sql",
"datafusion-flight-sql-server",
"datafusion-flight-sql-table-provider",
]

[patch.crates-io]
# connectorx = { path = "../connector-x/connectorx" }
# datafusion = { path = "../arrow-datafusion/datafusion/core" }

[workspace.package]
version = "0.1.6"
edition = "2021"
license = "Apache-2.0"
readme = "README.md"


[workspace.dependencies]
async-trait = "0.1.77"
async-trait = "0.1.81"
async-stream = "0.3.5"
futures = "0.3.30"
datafusion = "41.0.0"
Expand Down
125 changes: 119 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,22 +1,135 @@
## DataFusion Federation
# DataFusion Federation

[![crates.io](https://img.shields.io/crates/v/datafusion-federation.svg)](https://crates.io/crates/datafusion-federation)
[![docs.rs](https://docs.rs/datafusion-federation/badge.svg)](https://docs.rs/datafusion-federation)

The goal of this repo is to allow [DataFusion](https://github.com/apache/arrow-datafusion) to resolve queries across remote query engines while pushing down as much compute as possible down.
DataFusion Federation allows
[DataFusion](https://github.com/apache/arrow-datafusion) to execute (part of) a
query plan by a remote execution engine.

Check out [the examples](./examples/) to get a feel for how it works.
┌────────────────┐
┌────────────┐ │ Remote DBMS(s) │
SQL Query ───> │ DataFusion │ ───> │ ( execution │
└────────────┘ │ happens here ) │
└────────────────┘

Potential use-cases:
The goal is to allow resolving queries across remote query engines while
pushing down as much compute as possible to the remote database(s). This allows
execution to happen as close to the storage as possible. This concept is
referred to as 'query federation'.

> [!TIP]
> This repository implements the federation framework itself. If you want to
> connect to a specific database, check out the compatible providers available
> in
> [datafusion-contrib/datafusion-table-providers](https://github.com/datafusion-contrib/datafusion-table-providers/).

## Usage

Check out the [examples](./datafusion-federation/examples/) to get a feel for
how it works.

## Potential use-cases:

- Querying across SQLite, MySQL, PostgreSQL, ...
- Pushing down SQL or [Substrait](https://substrait.io/) plans.
- DataFusion -> Flight SQL -> DataFusion
- ..

#### Status
## Design concept

Say you have a query plan as follows:

┌────────────┐
│ Join │
└────────────┘
┌───────┴────────┐
┌────────────┐ ┌────────────┐
│ Scan A │ │ Join │
└────────────┘ └────────────┘
┌───────┴────────┐
┌────────────┐ ┌────────────┐
│ Scan B │ │ Scan C │
└────────────┘ └────────────┘

DataFusion Federation will identify the largest possible sub-plans that
can be executed by an external database:

┌────────────┐ Optimizer recognizes
│ Join │ that B and C are
└────────────┘ available in an
▲ external database
┌──────────────┴────────┐
│ ┌ ─ ─ ─ ─ ─ ─ ┴ ─ ── ─ ─ ─ ─ ─┐
┌────────────┐ ┌────────────┐ │
│ Scan A │ │ │ Join │
└────────────┘ └────────────┘ │
│ ▲
┌───────┴────────┐ │
┌────────────┐ ┌────────────┐ │
││ Scan B │ │ Scan C │
└────────────┘ └────────────┘ │
─ ── ─ ─ ── ─ ─ ─ ─ ─ ─ ─ ── ─ ┘

The sub-plans are cut out and replaced by an opaque federation node in the plan:

┌────────────┐
│ Join │
└────────────┘ Rewritten Plan
┌────────┴───────────┐
│ │
┌────────────┐ ┏━━━━━━━━━━━━━━━━━━┓
│ Scan A │ ┃ Scan B+C ┃
└────────────┘ ┃ (TableProvider ┃
┃ that can execute ┃
┃ sub-plan in an ┃
┃external database)┃
┗━━━━━━━━━━━━━━━━━━┛

Different databases may have different query languages and execution
capabilities. To accommodate for this, we allow each 'federation provider' to
self-determine what part of a sub-plan it will actually federate. This is done
by letting each federation provider define its own optimizer rule. When a
sub-plan is 'cut out' of the overall plan, it is first passed the federation
provider's optimizer rule. This optimizer rule determines the part of the plan
that is cut out, based on the execution capabilities of the database it
represents.

## Implementation

A remote database is represented by the `FederationProvider` trait. To identify
table scans that are available in the same database, they implement
`FederatedTableSource` trait. This trait allows lookup of the corresponding
`FederationProvider`.

Identifying sub-plans to federate is done by the `FederationOptimizerRule`.
This rule needs to be registered in your DataFusion SessionState. One easy way
to do this is using `default_session_state`. To do its job, the
`FederationOptimizerRule` currently requires that all TableProviders that need
to be federated are `FederatedTableProviderAdaptor`s. The
`FederatedTableProviderAdaptor` also has a fallback mechanism that allows
implementations to fallback to a 'vanilla' TableProvider in case the
`FederationOptimizerRule` isn't registered.

The `FederationProvider` can provide a `compute_context`. This allows it to
differentiate between multiple remote execution context of the same type. For
example two different mysql instances, database schemas, access level, etc. The
`FederationProvider` also returns the `Optimizer` that is allows it to
self-determine what part of a sub-plan it can federate.

The `sql` module implements a generic `FederationProvider` for SQL execution
engines. A specific SQL engine implements the `SQLExecutor` trait for its
engine specific execution. There are a number of compatible providers available
in
[datafusion-contrib/datafusion-table-providers](https://github.com/datafusion-contrib/datafusion-table-providers/).

## Status

The project is in alpha status. Contributions welcome; land a PR = commit access.
The project is in alpha status. Contributions welcome; land a PR = commit
access.

- [Docs (release)](https://docs.rs/datafusion-federation)
- [Docs (main)](https://datafusion-contrib.github.io/datafusion-federation/)
8 changes: 0 additions & 8 deletions commitlint.config.js

This file was deleted.

24 changes: 18 additions & 6 deletions datafusion-federation/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,29 @@ description = "Datafusion federation."
name = "datafusion_federation"
path = "src/lib.rs"

[package.metadata.docs.rs]
# Whether to pass `--all-features` to Cargo (default: false)
all-features = true
# Whether to pass `--no-default-features` to Cargo (default: false)
no-default-features = true

[features]
sql = []

[dependencies]
futures.workspace = true
async-trait.workspace = true
datafusion.workspace = true
async-stream.workspace = true
futures.workspace = true
arrow-json.workspace = true

[package.metadata.docs.rs]

# Whether to pass `--all-features` to Cargo (default: false)
all-features = true
[dev-dependencies]
tokio = { version = "1.39.3", features = ["full"] }
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
tracing = "0.1.40"

# Whether to pass `--no-default-features` to Cargo (default: false)
no-default-features = true
[[example]]
name = "df-csv"
path = "examples/df-csv.rs"
required-features = ["sql"]
115 changes: 115 additions & 0 deletions datafusion-federation/examples/df-csv.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
use std::sync::Arc;

use async_trait::async_trait;
use datafusion::{
arrow::datatypes::SchemaRef,
catalog::SchemaProvider,
error::{DataFusionError, Result},
execution::{
context::{SessionContext, SessionState},
options::CsvReadOptions,
},
physical_plan::{stream::RecordBatchStreamAdapter, SendableRecordBatchStream},
sql::unparser::dialect::{DefaultDialect, Dialect},
};
use datafusion_federation::sql::{SQLExecutor, SQLFederationProvider, SQLSchemaProvider};
use futures::TryStreamExt;

const CSV_PATH: &str = "./examples/test.csv";
const TABLE_NAME: &str = "test";

#[tokio::main]
async fn main() -> Result<()> {
// Create a remote context
let remote_ctx = Arc::new(SessionContext::new());

// Registers a CSV file
remote_ctx
.register_csv(TABLE_NAME, CSV_PATH, CsvReadOptions::new())
.await?;
let known_tables: Vec<String> = [TABLE_NAME].iter().map(|&x| x.into()).collect();

// Register schema
let executor = Arc::new(InMemorySQLExecutor::new(remote_ctx));
let provider = Arc::new(SQLFederationProvider::new(executor));
let schema_provider =
Arc::new(SQLSchemaProvider::new_with_tables(provider, known_tables).await?);

// Local context
let state = datafusion_federation::default_session_state();
overwrite_default_schema(&state, schema_provider)?;
let ctx = SessionContext::new_with_state(state);

// Run query
let query = r#"SELECT * from test"#;
let df = ctx.sql(query).await?;

// let explain = df.clone().explain(true, false)?;
// explain.show().await?;

df.show().await
}

fn overwrite_default_schema(state: &SessionState, schema: Arc<dyn SchemaProvider>) -> Result<()> {
let options = &state.config().options().catalog;
let catalog = state
.catalog_list()
.catalog(options.default_catalog.as_str())
.unwrap();

catalog.register_schema(options.default_schema.as_str(), schema)?;

Ok(())
}

pub struct InMemorySQLExecutor {
session: Arc<SessionContext>,
}

impl InMemorySQLExecutor {
pub fn new(session: Arc<SessionContext>) -> Self {
Self { session }
}
}

#[async_trait]
impl SQLExecutor for InMemorySQLExecutor {
fn name(&self) -> &str {
"in_memory_sql_executor"
}

fn compute_context(&self) -> Option<String> {
None
}

fn execute(&self, sql: &str, schema: SchemaRef) -> Result<SendableRecordBatchStream> {
// Execute it using the remote datafusion session context
let future_stream = _execute(self.session.clone(), sql.to_string());
let stream = futures::stream::once(future_stream).try_flatten();
Ok(Box::pin(RecordBatchStreamAdapter::new(
schema.clone(),
stream,
)))
}

async fn table_names(&self) -> Result<Vec<String>> {
Err(DataFusionError::NotImplemented(
"table inference not implemented".to_string(),
))
}

async fn get_table_schema(&self, table_name: &str) -> Result<SchemaRef> {
let sql = format!("select * from {table_name} limit 1");
let df = self.session.sql(&sql).await?;
let schema = df.schema().as_arrow().clone();
Ok(Arc::new(schema))
}

fn dialect(&self) -> Arc<dyn Dialect> {
Arc::new(DefaultDialect {})
}
}

async fn _execute(ctx: Arc<SessionContext>, sql: String) -> Result<SendableRecordBatchStream> {
ctx.sql(&sql).await?.execute_stream().await
}
4 changes: 4 additions & 0 deletions datafusion-federation/examples/test.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
foo,bar
a,1
b,2
c,3
Loading