Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: logical Node for find files #2194

Merged
merged 13 commits into from
Mar 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/benchmarks/src/bin/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ async fn benchmark_merge_tpcds(
table.snapshot()?.clone(),
table.log_store(),
DeltaScanConfig {
wrap_partition_values: true,
file_column_name: Some("file_path".to_string()),
},
)
Expand Down
98 changes: 98 additions & 0 deletions crates/core/src/delta_datafusion/find_files/logical.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
use std::collections::HashSet;
use std::hash::{Hash, Hasher};

use datafusion_common::DFSchemaRef;
use datafusion_expr::{Expr, LogicalPlan, UserDefinedLogicalNodeCore};

use crate::delta_datafusion::find_files::ONLY_FILES_DF_SCHEMA;
use crate::logstore::LogStoreRef;
use crate::table::state::DeltaTableState;

#[derive(Debug, Clone)]
pub struct FindFilesNode {
id: String,
predicate: Expr,
table_state: DeltaTableState,
log_store: LogStoreRef,
version: i64,
}

impl FindFilesNode {
pub fn new(
id: String,
table_state: DeltaTableState,
log_store: LogStoreRef,
predicate: Expr,
) -> datafusion_common::Result<Self> {
let version = table_state.version();
Ok(Self {
id,
predicate,
log_store,
table_state,

version,
})
}

pub fn predicate(&self) -> Expr {
self.predicate.clone()
}

pub fn state(&self) -> DeltaTableState {
self.table_state.clone()
}

pub fn log_store(&self) -> LogStoreRef {
self.log_store.clone()
}
}

impl Eq for FindFilesNode {}

impl PartialEq<Self> for FindFilesNode {
fn eq(&self, other: &Self) -> bool {
self.id == other.id
}
}

impl Hash for FindFilesNode {
fn hash<H: Hasher>(&self, state: &mut H) {
state.write(self.id.as_bytes());
state.finish();
}
}

impl UserDefinedLogicalNodeCore for FindFilesNode {
fn name(&self) -> &str {
"FindFiles"
}

fn inputs(&self) -> Vec<&LogicalPlan> {
vec![]
}

fn schema(&self) -> &DFSchemaRef {
&ONLY_FILES_DF_SCHEMA
}

fn expressions(&self) -> Vec<Expr> {
vec![]
}

fn prevent_predicate_push_down_columns(&self) -> HashSet<String> {
HashSet::new()
}

fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(
f,
"FindFiles[id={}, predicate=\"{}\", version={}]",
&self.id, self.predicate, self.version,
)
}

fn from_template(&self, _exprs: &[Expr], _inputs: &[LogicalPlan]) -> Self {
self.clone()
}
}
286 changes: 286 additions & 0 deletions crates/core/src/delta_datafusion/find_files/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,286 @@
use arrow_array::cast::AsArray;
use std::sync::Arc;

use arrow_array::types::UInt16Type;
use arrow_array::RecordBatch;
use arrow_schema::SchemaBuilder;
use arrow_schema::{ArrowError, DataType, Field, Schema, SchemaRef};
use arrow_select::concat::concat_batches;
use async_trait::async_trait;
use datafusion::datasource::MemTable;
use datafusion::execution::context::{QueryPlanner, SessionState};
use datafusion::execution::TaskContext;
use datafusion::physical_plan::filter::FilterExec;
use datafusion::physical_plan::limit::LocalLimitExec;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, PhysicalPlanner};
use datafusion::prelude::SessionContext;
use datafusion_common::{DFSchemaRef, Result, ToDFSchema};
use datafusion_expr::{col, Expr, LogicalPlan, UserDefinedLogicalNode};
use datafusion_physical_expr::create_physical_expr;
use lazy_static::lazy_static;

use crate::delta_datafusion::find_files::logical::FindFilesNode;
use crate::delta_datafusion::find_files::physical::FindFilesExec;
use crate::delta_datafusion::{
df_logical_schema, register_store, DeltaScanBuilder, DeltaScanConfigBuilder, PATH_COLUMN,
};
use crate::logstore::LogStoreRef;
use crate::table::state::DeltaTableState;
use crate::DeltaTableError;

pub mod logical;
pub mod physical;

lazy_static! {
static ref ONLY_FILES_SCHEMA: Arc<Schema> = {
let mut builder = SchemaBuilder::new();
builder.push(Field::new(PATH_COLUMN, DataType::Utf8, false));
Arc::new(builder.finish())
};
static ref ONLY_FILES_DF_SCHEMA: DFSchemaRef =
ONLY_FILES_SCHEMA.clone().to_dfschema_ref().unwrap();
}

struct FindFilesPlannerExtension {}

struct FindFilesPlanner {}

#[async_trait]
impl ExtensionPlanner for FindFilesPlannerExtension {
async fn plan_extension(
&self,
_planner: &dyn PhysicalPlanner,
node: &dyn UserDefinedLogicalNode,
_logical_inputs: &[&LogicalPlan],
_physical_inputs: &[Arc<dyn ExecutionPlan>],
_session_state: &SessionState,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
if let Some(find_files_node) = node.as_any().downcast_ref::<FindFilesNode>() {
return Ok(Some(Arc::new(FindFilesExec::new(
find_files_node.state(),
find_files_node.log_store(),
find_files_node.predicate(),
)?)));
}
Ok(None)
}
}

#[async_trait]
impl QueryPlanner for FindFilesPlanner {
async fn create_physical_plan(
&self,
logical_plan: &LogicalPlan,
session_state: &SessionState,
) -> Result<Arc<dyn ExecutionPlan>> {
let planner = Arc::new(Box::new(DefaultPhysicalPlanner::with_extension_planners(
vec![Arc::new(FindFilesPlannerExtension {})],
)));
planner
.create_physical_plan(logical_plan, session_state)
.await
}
}

async fn scan_table_by_partitions(batch: RecordBatch, predicate: Expr) -> Result<RecordBatch> {
let mut arrays = Vec::new();
let mut fields = Vec::new();

let schema = batch.schema();

arrays.push(
batch
.column_by_name("path")
.ok_or(DeltaTableError::Generic(
"Column with name `path` does not exist".to_owned(),
))?
.to_owned(),
);
fields.push(Field::new(PATH_COLUMN, DataType::Utf8, false));

for field in schema.fields() {
if field.name().starts_with("partition.") {
let name = field.name().strip_prefix("partition.").unwrap();

arrays.push(batch.column_by_name(field.name()).unwrap().to_owned());
fields.push(Field::new(
name,
field.data_type().to_owned(),
field.is_nullable(),
));
}
}

let schema = Arc::new(Schema::new(fields));
let batch = RecordBatch::try_new(schema, arrays)?;
let mem_table = MemTable::try_new(batch.schema(), vec![vec![batch]])?;

let ctx = SessionContext::new();
let mut df = ctx.read_table(Arc::new(mem_table))?;
df = df
.filter(predicate.to_owned())?
.select(vec![col(PATH_COLUMN)])?;
let df_schema = df.schema().clone();
let batches = df.collect().await?;
Ok(concat_batches(&SchemaRef::from(df_schema), &batches)?)
}

async fn scan_table_by_files(
snapshot: DeltaTableState,
log_store: LogStoreRef,
state: SessionState,
expression: Expr,
) -> Result<RecordBatch> {
register_store(log_store.clone(), state.runtime_env().clone());
let scan_config = DeltaScanConfigBuilder::new()
.wrap_partition_values(true)
.with_file_column(true)
.build(&snapshot)?;

let logical_schema = df_logical_schema(&snapshot, &scan_config)?;

// Identify which columns we need to project
let mut used_columns = expression
.to_columns()?
.into_iter()
.map(|column| logical_schema.index_of(&column.name))
.collect::<std::result::Result<Vec<usize>, ArrowError>>()?;
// Add path column
used_columns.push(logical_schema.index_of(scan_config.file_column_name.as_ref().unwrap())?);

let scan = DeltaScanBuilder::new(&snapshot, log_store, &state)
.with_filter(Some(expression.clone()))
.with_projection(Some(&used_columns))
.with_scan_config(scan_config)
.build()
.await?;

let scan = Arc::new(scan);
let input_schema = scan.logical_schema.as_ref().to_owned();
let input_dfschema = input_schema.clone().try_into()?;

let predicate_expr = create_physical_expr(
&Expr::IsTrue(Box::new(expression.clone())),
&input_dfschema,
state.execution_props(),
)?;

let filter: Arc<dyn ExecutionPlan> =
Arc::new(FilterExec::try_new(predicate_expr, scan.clone())?);
let limit: Arc<dyn ExecutionPlan> = Arc::new(LocalLimitExec::new(filter, 1));
let field_idx = input_schema.index_of(PATH_COLUMN)?;
let task_ctx = Arc::new(TaskContext::from(&state));
let path_batches: Vec<RecordBatch> = datafusion::physical_plan::collect(limit, task_ctx)
.await?
.into_iter()
.map(|batch| {
let col = batch
.column(field_idx)
.as_dictionary::<UInt16Type>()
.values();
RecordBatch::try_from_iter(vec![(PATH_COLUMN, col.clone())]).unwrap()
})
.collect();

let result_batches = concat_batches(&ONLY_FILES_SCHEMA.clone(), &path_batches)?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking for this operation to output file paths as soon as they are discovered to allow operations downstream to begin their work ASAP. When performing an memory only scan to makes sense to output as a single batch since IO is minimal.
For this PR I think it's okay since it aligns with the previous behaviour but its something we can change.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you elaborate a bit more? I understand what you are suggesting here, but I'm not sure what you are expecting. Like you want the return type to be a vec or record batches or a stream or something?

Copy link
Collaborator

@Blajda Blajda Mar 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes output record batches and these record batches should simply be a string. There is no benefit for using dictionaries in the output.
I was thinking longer term. The current implementation waits for all files to be scanned prior to sending a record batch downstream. There might be some benefit to send a record batch with a single record as soon as a file match is determined.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh you're talking about the actual schema of the record batch, I thought you were saying the return type of the function was wrong or something of the sort for it to be "immediately available" or something of the sort.


Ok(result_batches)
}

#[cfg(test)]
pub mod tests {
use std::sync::Arc;

use datafusion::prelude::{DataFrame, SessionContext};
use datafusion_common::{assert_batches_eq, assert_batches_sorted_eq};
use datafusion_expr::{col, lit, Expr, Extension, LogicalPlan};

use crate::delta_datafusion::find_files::logical::FindFilesNode;
use crate::delta_datafusion::find_files::FindFilesPlanner;
use crate::operations::collect_sendable_stream;
use crate::{DeltaResult, DeltaTable, DeltaTableError};

pub async fn test_plan<'a>(
table: DeltaTable,
expr: Expr,
) -> Result<Vec<arrow_array::RecordBatch>, DeltaTableError> {
let ctx = SessionContext::new();
let state = ctx
.state()
.with_query_planner(Arc::new(FindFilesPlanner {}));
let find_files_node = LogicalPlan::Extension(Extension {
node: Arc::new(FindFilesNode::new(
"my_cool_plan".into(),
table.snapshot()?.clone(),
table.log_store().clone(),
expr,
)?),
});
let df = DataFrame::new(state.clone(), find_files_node);

let p = state
.clone()
.create_physical_plan(df.logical_plan())
.await?;

let e = p.execute(0, state.task_ctx())?;
collect_sendable_stream(e).await.map_err(Into::into)
}

#[tokio::test]
pub async fn test_find_files_partitioned() -> DeltaResult<()> {
let table = crate::open_table("../test/tests/data/delta-0.8.0-partitioned").await?;
let expr: Expr = col("year").eq(lit(2020));
let s = test_plan(table, expr).await?;

assert_batches_eq! {
["+---------------------------------------------------------------------------------------------+",
"| __delta_rs_path |",
"+---------------------------------------------------------------------------------------------+",
"| year=2020/month=1/day=1/part-00000-8eafa330-3be9-4a39-ad78-fd13c2027c7e.c000.snappy.parquet |",
"| year=2020/month=2/day=3/part-00000-94d16827-f2fd-42cd-a060-f67ccc63ced9.c000.snappy.parquet |",
"| year=2020/month=2/day=5/part-00000-89cdd4c8-2af7-4add-8ea3-3990b2f027b5.c000.snappy.parquet |",
"+---------------------------------------------------------------------------------------------+"],
&s
}
Ok(())
}

#[tokio::test]
pub async fn test_find_files_unpartitioned() -> DeltaResult<()> {
let table = crate::open_table("../test/tests/data/simple_table").await?;
let expr: Expr = col("id").in_list(vec![lit(9i64), lit(7i64)], false);
let s = test_plan(table, expr).await?;

assert_batches_sorted_eq! {
["+---------------------------------------------------------------------+",
"| __delta_rs_path |",
"+---------------------------------------------------------------------+",
"| part-00007-3a0e4727-de0d-41b6-81ef-5223cf40f025-c000.snappy.parquet |",
"| part-00004-315835fe-fb44-4562-98f6-5e6cfa3ae45d-c000.snappy.parquet |",
"+---------------------------------------------------------------------+"],
&s
}
Ok(())
}

#[tokio::test]
pub async fn test_find_files_unpartitioned2() -> DeltaResult<()> {
let table = crate::open_table("../test/tests/data/simple_table").await?;
let expr: Expr = col("id").is_not_null();
let s = test_plan(table, expr).await?;

assert_batches_sorted_eq! {
["+---------------------------------------------------------------------+",
"| __delta_rs_path |",
"+---------------------------------------------------------------------+",
"| part-00001-7891c33d-cedc-47c3-88a6-abcfb049d3b4-c000.snappy.parquet |",
"| part-00004-315835fe-fb44-4562-98f6-5e6cfa3ae45d-c000.snappy.parquet |",
"| part-00007-3a0e4727-de0d-41b6-81ef-5223cf40f025-c000.snappy.parquet |",
"+---------------------------------------------------------------------+"],
&s
}
Ok(())
}
}
Loading
Loading