diff --git a/crates/datafusion_ext/src/lib.rs b/crates/datafusion_ext/src/lib.rs index f7a1f6027..abaacd2d2 100644 --- a/crates/datafusion_ext/src/lib.rs +++ b/crates/datafusion_ext/src/lib.rs @@ -1,4 +1,5 @@ pub mod errors; +pub mod metrics; pub mod planner; pub mod runtime; pub mod vars; diff --git a/crates/datafusion_ext/src/metrics.rs b/crates/datafusion_ext/src/metrics.rs new file mode 100644 index 000000000..0d98c0cd0 --- /dev/null +++ b/crates/datafusion_ext/src/metrics.rs @@ -0,0 +1,247 @@ +use datafusion::{ + arrow::datatypes::SchemaRef, + arrow::{datatypes::Schema, record_batch::RecordBatch}, + error::Result, + execution::TaskContext, + physical_expr::PhysicalSortExpr, + physical_plan::{ + metrics::{BaselineMetrics, ExecutionPlanMetricsSet, Gauge, MetricBuilder, MetricsSet}, + DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, + SendableRecordBatchStream, Statistics, + }, +}; +use futures::{Stream, StreamExt}; +use std::fmt; +use std::sync::Arc; +use std::task::{Context, Poll}; +use std::{any::Any, pin::Pin}; + +const BYTES_PROCESSED_GAUGE_NAME: &str = "bytes_processed"; + +/// Standard metrics we should be collecting for all data sources during +/// queries. +#[derive(Debug, Clone)] +pub struct DataSourceMetrics { + /// Track bytes processed by source plans. + pub bytes_processed: Gauge, + + /// Baseline metrics like output rows and elapsed time. + pub baseline: BaselineMetrics, +} + +impl DataSourceMetrics { + pub fn new(partition: usize, metrics: &ExecutionPlanMetricsSet) -> Self { + let bytes_processed = + MetricBuilder::new(metrics).gauge(BYTES_PROCESSED_GAUGE_NAME, partition); + let baseline = BaselineMetrics::new(metrics, partition); + + Self { + bytes_processed, + baseline, + } + } + + /// Track metrics based on the poll result from an async stream. + pub fn record_poll( + &self, + poll: Poll>>, + ) -> Poll>> { + if let Poll::Ready(maybe_batch) = &poll { + match maybe_batch { + Some(Ok(batch)) => { + self.bytes_processed.add(batch.get_array_memory_size()); + self.baseline.record_output(batch.num_rows()); + } + Some(Err(_)) => self.baseline.done(), + None => self.baseline.done(), + } + } + poll + } +} + +/// Thin wrapper around a record batch stream that automatically records metrics +/// about batches that are sent through the stream. +/// +/// Note this should only be used when "ingesting" data during execution (data +/// sources or reading from tables) to avoid double counting bytes processed. +pub struct DataSourceMetricsStreamAdapter { + pub stream: S, + pub metrics: DataSourceMetrics, +} + +impl DataSourceMetricsStreamAdapter { + /// Create a new stream with a new set of data source metrics for the given + /// partition. + pub fn new(stream: S, partition: usize, metrics: &ExecutionPlanMetricsSet) -> Self { + Self { + stream, + metrics: DataSourceMetrics::new(partition, metrics), + } + } +} + +impl Stream for DataSourceMetricsStreamAdapter { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let poll = self.stream.poll_next_unpin(cx); + self.metrics.record_poll(poll) + } +} + +impl RecordBatchStream for DataSourceMetricsStreamAdapter { + fn schema(&self) -> SchemaRef { + self.stream.schema() + } +} + +/// Wrapper around and execution plan that returns a +/// `BoxedDataSourceMetricsStreamAdapter` for additional metrics collection. +/// +/// This should _generally_ only be used for execution plans that we're not able +/// to modify directly to record metrics (e.g. Delta). Otherwise, this should be +/// skipped and metrics collection should be added to the execution plan +/// directly. +#[derive(Debug, Clone)] +pub struct DataSourceMetricsExecAdapter { + child: Arc, + metrics: ExecutionPlanMetricsSet, +} + +impl DataSourceMetricsExecAdapter { + pub fn new(plan: Arc) -> Self { + Self { + child: plan, + metrics: ExecutionPlanMetricsSet::new(), + } + } +} + +impl ExecutionPlan for DataSourceMetricsExecAdapter { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> Arc { + self.child.schema() + } + + fn output_partitioning(&self) -> Partitioning { + self.child.output_partitioning() + } + + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + self.child.output_ordering() + } + + fn children(&self) -> Vec> { + vec![self.child.clone()] + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> Result> { + Ok(Arc::new(Self::new(children[0].clone()))) + } + + fn execute( + &self, + partition: usize, + context: Arc, + ) -> Result { + let stream = self.child.execute(partition, context)?; + Ok(Box::pin(BoxedStreamAdapater::new( + stream, + partition, + &self.metrics, + ))) + } + + fn statistics(&self) -> Statistics { + self.child.statistics() + } + + fn metrics(&self) -> Option { + Some(self.metrics.clone_inner()) + } +} + +impl DisplayAs for DataSourceMetricsExecAdapter { + fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "DataSourceMetricsExecAdapter") + } +} + +struct BoxedStreamAdapater { + stream: SendableRecordBatchStream, + metrics: DataSourceMetrics, +} + +impl BoxedStreamAdapater { + fn new( + stream: SendableRecordBatchStream, + partition: usize, + metrics: &ExecutionPlanMetricsSet, + ) -> Self { + Self { + stream, + metrics: DataSourceMetrics::new(partition, metrics), + } + } +} + +impl Stream for BoxedStreamAdapater { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let poll = self.stream.poll_next_unpin(cx); + self.metrics.record_poll(poll) + } +} + +impl RecordBatchStream for BoxedStreamAdapater { + fn schema(&self) -> SchemaRef { + self.stream.schema() + } +} + +#[derive(Debug, Clone)] +pub struct AggregatedMetrics { + /// Total time taken for a plan to execute. + pub elapsed_compute_ns: u64, + /// Total bytes processed. + pub bytes_processed: u64, +} + +impl AggregatedMetrics { + /// Computes aggregated metrics from a plan. + /// + /// The plan should have already been executed to completion, otherwise + /// partial or incorrect results will be reported. + pub fn new_from_plan(plan: &dyn ExecutionPlan) -> Self { + let mut agg = AggregatedMetrics { + elapsed_compute_ns: 0, + bytes_processed: 0, + }; + agg.aggregate_recurse(plan); + agg + } + + fn aggregate_recurse(&mut self, plan: &dyn ExecutionPlan) { + let metrics = match plan.metrics() { + Some(metrics) => metrics, + None => return, + }; + self.elapsed_compute_ns += metrics.elapsed_compute().unwrap_or_default() as u64; + self.bytes_processed += metrics + .sum_by_name(BYTES_PROCESSED_GAUGE_NAME) + .map(|m| m.as_usize() as u64) + .unwrap_or_default(); + + for child in plan.children() { + self.aggregate_recurse(child.as_ref()); + } + } +} diff --git a/crates/datafusion_ext/src/runtime/runtime_group.rs b/crates/datafusion_ext/src/runtime/runtime_group.rs index ede71c130..28a0c0881 100644 --- a/crates/datafusion_ext/src/runtime/runtime_group.rs +++ b/crates/datafusion_ext/src/runtime/runtime_group.rs @@ -2,6 +2,7 @@ use datafusion::arrow::datatypes::Schema; use datafusion::error::Result as DataFusionResult; use datafusion::execution::TaskContext; use datafusion::physical_expr::PhysicalSortExpr; +use datafusion::physical_plan::metrics::MetricsSet; use datafusion::physical_plan::{ DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics, @@ -69,6 +70,10 @@ impl ExecutionPlan for RuntimeGroupExec { fn statistics(&self) -> Statistics { self.child.statistics() } + + fn metrics(&self) -> Option { + None + } } impl DisplayAs for RuntimeGroupExec { diff --git a/crates/datasources/src/bigquery/mod.rs b/crates/datasources/src/bigquery/mod.rs index 0db7896db..6ad1c09ce 100644 --- a/crates/datasources/src/bigquery/mod.rs +++ b/crates/datasources/src/bigquery/mod.rs @@ -10,8 +10,6 @@ use bigquery_storage::yup_oauth2::{ ServiceAccountAuthenticator, }; use bigquery_storage::{BufferedArrowIpcReader, Client}; -use datafusion::arrow::record_batch::RecordBatch; -use datafusion::arrow::{datatypes::Fields, ipc::reader::StreamReader as ArrowStreamReader}; use datafusion::datasource::TableProvider; use datafusion::error::{DataFusionError, Result as DatafusionResult}; use datafusion::execution::context::SessionState; @@ -29,7 +27,16 @@ use datafusion::{ }, physical_plan::memory::MemoryExec, }; -use datafusion_ext::{errors::ExtensionError, functions::VirtualLister}; +use datafusion::{ + arrow::record_batch::RecordBatch, physical_plan::metrics::ExecutionPlanMetricsSet, +}; +use datafusion::{ + arrow::{datatypes::Fields, ipc::reader::StreamReader as ArrowStreamReader}, + physical_plan::metrics::MetricsSet, +}; +use datafusion_ext::{ + errors::ExtensionError, functions::VirtualLister, metrics::DataSourceMetricsStreamAdapter, +}; use errors::{BigQueryError, Result}; use futures::{Stream, StreamExt}; use gcp_bigquery_client::model::table_field_schema::TableFieldSchema as BigQuerySchema; @@ -358,6 +365,7 @@ impl TableProvider for BigQueryTableProvider { arrow_schema: projected_schema, receiver: recv, num_partitions, + metrics: ExecutionPlanMetricsSet::new(), })) } } @@ -367,6 +375,7 @@ struct BigQueryExec { arrow_schema: ArrowSchemaRef, receiver: Receiver, num_partitions: usize, + metrics: ExecutionPlanMetricsSet, } impl ExecutionPlan for BigQueryExec { @@ -404,16 +413,21 @@ impl ExecutionPlan for BigQueryExec { partition: usize, _context: Arc, ) -> DatafusionResult { - Ok(Box::pin(BufferedIpcStream::new( - self.schema(), - self.receiver.clone(), + let stream = BufferedIpcStream::new(self.schema(), self.receiver.clone(), partition); + Ok(Box::pin(DataSourceMetricsStreamAdapter::new( + stream, partition, + &self.metrics, ))) } fn statistics(&self) -> Statistics { Statistics::default() } + + fn metrics(&self) -> Option { + Some(self.metrics.clone_inner()) + } } impl DisplayAs for BigQueryExec { diff --git a/crates/datasources/src/mongodb/exec.rs b/crates/datasources/src/mongodb/exec.rs index 5b20d74a7..57686da26 100644 --- a/crates/datasources/src/mongodb/exec.rs +++ b/crates/datasources/src/mongodb/exec.rs @@ -7,10 +7,12 @@ use datafusion::arrow::record_batch::RecordBatch; use datafusion::error::{DataFusionError, Result as DatafusionResult}; use datafusion::execution::context::TaskContext; use datafusion::physical_expr::PhysicalSortExpr; +use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use datafusion::physical_plan::{ DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream, Statistics, }; +use datafusion_ext::metrics::DataSourceMetricsStreamAdapter; use futures::{Stream, StreamExt}; use mongodb::bson::{Document, RawDocumentBuf}; use mongodb::{options::FindOptions, Collection}; @@ -29,6 +31,7 @@ pub struct MongoBsonExec { schema: Arc, collection: Collection, limit: Option, + metrics: ExecutionPlanMetricsSet, } impl MongoBsonExec { @@ -41,6 +44,7 @@ impl MongoBsonExec { schema, collection, limit, + metrics: ExecutionPlanMetricsSet::new(), } } } @@ -77,19 +81,24 @@ impl ExecutionPlan for MongoBsonExec { fn execute( &self, - _partition: usize, + partition: usize, _context: Arc, ) -> DatafusionResult { - Ok(Box::pin(BsonStream::new( - self.schema.clone(), - self.collection.clone(), - self.limit, + let stream = BsonStream::new(self.schema.clone(), self.collection.clone(), self.limit); + Ok(Box::pin(DataSourceMetricsStreamAdapter::new( + stream, + partition, + &self.metrics, ))) } fn statistics(&self) -> Statistics { Statistics::default() } + + fn metrics(&self) -> Option { + Some(self.metrics.clone_inner()) + } } impl DisplayAs for MongoBsonExec { diff --git a/crates/datasources/src/mysql/mod.rs b/crates/datasources/src/mysql/mod.rs index c20cda17f..badf806c4 100644 --- a/crates/datasources/src/mysql/mod.rs +++ b/crates/datasources/src/mysql/mod.rs @@ -21,12 +21,14 @@ use datafusion::error::{DataFusionError, Result as DatafusionResult}; use datafusion::execution::context::{SessionState, TaskContext}; use datafusion::logical_expr::{Expr, TableProviderFilterPushDown, TableType}; use datafusion::physical_expr::PhysicalSortExpr; +use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use datafusion::physical_plan::{ DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream, Statistics, }; use datafusion_ext::errors::ExtensionError; use datafusion_ext::functions::VirtualLister; +use datafusion_ext::metrics::DataSourceMetricsStreamAdapter; use futures::{Stream, StreamExt, TryStreamExt}; use mysql_async::consts::{ColumnFlags, ColumnType}; use mysql_async::prelude::Queryable; @@ -369,6 +371,7 @@ impl TableProvider for MysqlTableProvider { accessor: self.accessor.clone(), query, arrow_schema: projected_schema, + metrics: ExecutionPlanMetricsSet::new(), })) } } @@ -380,6 +383,7 @@ struct MysqlExec { accessor: Arc, query: String, arrow_schema: ArrowSchemaRef, + metrics: ExecutionPlanMetricsSet, } impl ExecutionPlan for MysqlExec { @@ -414,7 +418,7 @@ impl ExecutionPlan for MysqlExec { fn execute( &self, - _partition: usize, + partition: usize, _context: Arc, ) -> DatafusionResult { let stream = MysqlQueryStream::open( @@ -424,12 +428,20 @@ impl ExecutionPlan for MysqlExec { ) .map_err(|e| DataFusionError::External(Box::new(e)))?; - Ok(Box::pin(stream)) + Ok(Box::pin(DataSourceMetricsStreamAdapter::new( + stream, + partition, + &self.metrics, + ))) } fn statistics(&self) -> Statistics { Statistics::default() } + + fn metrics(&self) -> Option { + Some(self.metrics.clone_inner()) + } } impl DisplayAs for MysqlExec { diff --git a/crates/datasources/src/native/access.rs b/crates/datasources/src/native/access.rs index f27908c94..4e1c9aeff 100644 --- a/crates/datasources/src/native/access.rs +++ b/crates/datasources/src/native/access.rs @@ -9,6 +9,7 @@ use datafusion::logical_expr::{LogicalPlan, TableProviderFilterPushDown, TableTy use datafusion::physical_plan::empty::EmptyExec; use datafusion::physical_plan::{ExecutionPlan, Statistics}; use datafusion::prelude::Expr; +use datafusion_ext::metrics::DataSourceMetricsExecAdapter; use deltalake::operations::create::CreateBuilder; use deltalake::operations::delete::DeleteBuilder; use deltalake::operations::update::UpdateBuilder; @@ -289,7 +290,8 @@ impl TableProvider for NativeTable { let schema = TableProvider::schema(&self.delta); Ok(Arc::new(EmptyExec::new(false, schema))) } else { - self.delta.scan(session, projection, filters, limit).await + let plan = self.delta.scan(session, projection, filters, limit).await?; + Ok(Arc::new(DataSourceMetricsExecAdapter::new(plan))) } } diff --git a/crates/datasources/src/object_store/http.rs b/crates/datasources/src/object_store/http.rs index a45fa6327..60dd54d7d 100644 --- a/crates/datasources/src/object_store/http.rs +++ b/crates/datasources/src/object_store/http.rs @@ -125,7 +125,6 @@ impl ObjStoreAccess for HttpStoreAccess { file_format: file_format.clone(), base_url, objects, - _predicate_pushdown: true, }); providers.push(prov); @@ -163,7 +162,6 @@ impl HttpStoreAccess { base_url, objects, file_format, - _predicate_pushdown: true, }) } } diff --git a/crates/datasources/src/object_store/mod.rs b/crates/datasources/src/object_store/mod.rs index 99488e480..b8641ab51 100644 --- a/crates/datasources/src/object_store/mod.rs +++ b/crates/datasources/src/object_store/mod.rs @@ -16,6 +16,7 @@ use datafusion::logical_expr::TableType; use datafusion::physical_plan::union::UnionExec; use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::Expr; +use datafusion_ext::metrics::DataSourceMetricsExecAdapter; use errors::ObjectStoreSourceError; use futures::StreamExt; use glob::{MatchOptions, Pattern}; @@ -212,7 +213,6 @@ pub trait ObjStoreAccess: Debug + Display + Send + Sync { base_url, objects, file_format, - _predicate_pushdown: true, })) } } @@ -251,7 +251,6 @@ impl ObjStoreAccessor { state: &SessionState, file_format: Arc, objects: Vec, - _predicate_pushdown: bool, ) -> Result> { let store = self.store; let arrow_schema = file_format.infer_schema(state, &store, &objects).await?; @@ -263,7 +262,6 @@ impl ObjStoreAccessor { base_url, objects, file_format, - _predicate_pushdown, })) } } @@ -275,8 +273,6 @@ pub struct ObjStoreTableProvider { base_url: ObjectStoreUrl, objects: Vec, file_format: Arc, - - _predicate_pushdown: bool, } #[async_trait] @@ -331,10 +327,13 @@ impl TableProvider for ObjStoreTableProvider { ctx.runtime_env() .register_object_store(self.base_url.as_ref(), self.store.clone()); - self.file_format + let plan = self + .file_format .create_physical_plan(ctx, config, filters.as_ref()) .await - .map_err(|e| DataFusionError::External(Box::new(e))) + .map_err(|e| DataFusionError::External(Box::new(e)))?; + + Ok(Arc::new(DataSourceMetricsExecAdapter::new(plan))) } } diff --git a/crates/datasources/src/postgres/mod.rs b/crates/datasources/src/postgres/mod.rs index 28bb4ca7d..d47c9f9cb 100644 --- a/crates/datasources/src/postgres/mod.rs +++ b/crates/datasources/src/postgres/mod.rs @@ -19,12 +19,15 @@ use datafusion::execution::context::SessionState; use datafusion::execution::context::TaskContext; use datafusion::logical_expr::{Expr, TableProviderFilterPushDown, TableType}; use datafusion::physical_expr::PhysicalSortExpr; +use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; +use datafusion::physical_plan::metrics::MetricsSet; use datafusion::physical_plan::{ DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream, Statistics, }; use datafusion_ext::errors::ExtensionError; use datafusion_ext::functions::VirtualLister; +use datafusion_ext::metrics::DataSourceMetricsStreamAdapter; use errors::{PostgresError, Result}; use futures::{future::BoxFuture, ready, stream::BoxStream, FutureExt, Stream, StreamExt}; use protogen::metastore::types::options::TunnelOptions; @@ -700,6 +703,7 @@ pub struct PostgresBinaryCopyExec { pg_types: Arc>, arrow_schema: ArrowSchemaRef, opener: StreamOpener, + metrics: ExecutionPlanMetricsSet, } impl PostgresBinaryCopyExec { @@ -719,6 +723,7 @@ impl PostgresBinaryCopyExec { pg_types: Arc::new(pg_types), arrow_schema: Arc::new(arrow_schema), opener, + metrics: ExecutionPlanMetricsSet::new(), }) } BinaryCopyConfig::State { @@ -732,6 +737,7 @@ impl PostgresBinaryCopyExec { pg_types, arrow_schema, opener, + metrics: ExecutionPlanMetricsSet::new(), }) } } @@ -764,13 +770,13 @@ impl ExecutionPlan for PostgresBinaryCopyExec { _children: Vec>, ) -> DatafusionResult> { Err(DataFusionError::Execution( - "cannot replace children for BinaryCopyExec".to_string(), + "cannot replace children for PostgresBinaryCopyExec".to_string(), )) } fn execute( &self, - _partition: usize, + partition: usize, _context: Arc, ) -> DatafusionResult { let stream = ChunkStream { @@ -779,23 +785,31 @@ impl ExecutionPlan for PostgresBinaryCopyExec { opener: self.opener.clone(), arrow_schema: self.arrow_schema.clone(), }; - Ok(Box::pin(stream)) + Ok(Box::pin(DataSourceMetricsStreamAdapter::new( + stream, + partition, + &self.metrics, + ))) } fn statistics(&self) -> Statistics { Statistics::default() } + + fn metrics(&self) -> Option { + Some(self.metrics.clone_inner()) + } } impl DisplayAs for PostgresBinaryCopyExec { fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "BinaryCopyExec",) + write!(f, "PostgresBinaryCopyExec",) } } impl fmt::Debug for PostgresBinaryCopyExec { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("BinaryCopyExec") + f.debug_struct("PostgresBinaryCopyExec") .field("pg_types", &self.pg_types) .field("arrow_schema", &self.arrow_schema) .finish() diff --git a/crates/datasources/src/snowflake/mod.rs b/crates/datasources/src/snowflake/mod.rs index 82606624f..96b49950b 100644 --- a/crates/datasources/src/snowflake/mod.rs +++ b/crates/datasources/src/snowflake/mod.rs @@ -12,6 +12,7 @@ use datafusion::arrow::datatypes::Fields; use datafusion::arrow::record_batch::RecordBatch; use datafusion::execution::context::TaskContext; use datafusion::physical_expr::PhysicalSortExpr; +use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use datafusion::physical_plan::{ DisplayAs, DisplayFormatType, Partitioning, RecordBatchStream, Statistics, }; @@ -26,6 +27,7 @@ use datafusion::{ }; use datafusion_ext::errors::ExtensionError; use datafusion_ext::functions::VirtualLister; +use datafusion_ext::metrics::DataSourceMetricsStreamAdapter; use futures::{Stream, StreamExt}; use snowflake_connector::{ datatype::SnowflakeDataType, snowflake_to_arrow_datatype, Connection as SnowflakeConnection, @@ -375,6 +377,7 @@ impl TableProvider for SnowflakeTableProvider { arrow_schema: projection_schema, num_partitions, result: Mutex::new(result), + metrics: ExecutionPlanMetricsSet::new(), })) } } @@ -384,6 +387,7 @@ struct SnowflakeExec { arrow_schema: ArrowSchemaRef, num_partitions: usize, result: Mutex, + metrics: ExecutionPlanMetricsSet, } impl ExecutionPlan for SnowflakeExec { @@ -427,12 +431,21 @@ impl ExecutionPlan for SnowflakeExec { "missing chunk for partition: {partition}" )))? }; - Ok(Box::pin(ChunkStream::new(self.schema(), chunk))) + let stream = ChunkStream::new(self.schema(), chunk); + Ok(Box::pin(DataSourceMetricsStreamAdapter::new( + stream, + partition, + &self.metrics, + ))) } fn statistics(&self) -> Statistics { Statistics::default() } + + fn metrics(&self) -> Option { + Some(self.metrics.clone_inner()) + } } impl DisplayAs for SnowflakeExec { diff --git a/crates/protogen/src/sqlexec/physical_plan.rs b/crates/protogen/src/sqlexec/physical_plan.rs index 39fc6bb1d..7842c6ddc 100644 --- a/crates/protogen/src/sqlexec/physical_plan.rs +++ b/crates/protogen/src/sqlexec/physical_plan.rs @@ -292,6 +292,9 @@ pub struct InterleaveExec {} #[derive(Clone, PartialEq, Message)] pub struct RuntimeGroupExec {} +#[derive(Clone, PartialEq, Message)] +pub struct DataSourceMetricsExecAdapter {} + #[derive(Clone, PartialEq, Message)] pub struct AnalyzeExec { #[prost(bool, tag = "1")] @@ -306,7 +309,7 @@ pub struct AnalyzeExec { pub struct ExecutionPlanExtension { #[prost( oneof = "ExecutionPlanExtensionType", - tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29" + tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30" )] pub inner: Option, } @@ -376,4 +379,6 @@ pub enum ExecutionPlanExtensionType { RuntimeGroupExec(RuntimeGroupExec), #[prost(message, tag = "29")] AnalyzeExec(AnalyzeExec), + #[prost(message, tag = "30")] + DataSourceMetricsExecAdapter(DataSourceMetricsExecAdapter), } diff --git a/crates/sqlexec/src/dispatch/external.rs b/crates/sqlexec/src/dispatch/external.rs index 76591ef15..75ce175fd 100644 --- a/crates/sqlexec/src/dispatch/external.rs +++ b/crates/sqlexec/src/dispatch/external.rs @@ -451,9 +451,7 @@ impl<'a> ExternalDispatcher<'a> { let objects = accessor.list_globbed(location).await?; let state = self.df_ctx.state(); - let provider = accessor - .into_table_provider(&state, ft, objects, /* predicate_pushdown = */ true) - .await?; + let provider = accessor.into_table_provider(&state, ft, objects).await?; Ok(provider) } diff --git a/crates/sqlexec/src/extension_codec.rs b/crates/sqlexec/src/extension_codec.rs index 0f953ba4d..adbb8f833 100644 --- a/crates/sqlexec/src/extension_codec.rs +++ b/crates/sqlexec/src/extension_codec.rs @@ -17,6 +17,7 @@ use datafusion::physical_plan::union::InterleaveExec; use datafusion::physical_plan::values::ValuesExec; use datafusion::physical_plan::{displayable, ExecutionPlan}; use datafusion::prelude::{Expr, SessionContext}; +use datafusion_ext::metrics::DataSourceMetricsExecAdapter; use datafusion_ext::runtime::runtime_group::RuntimeGroupExec; use datafusion_proto::logical_plan::from_proto::parse_expr; use datafusion_proto::logical_plan::LogicalExtensionCodec; @@ -714,6 +715,14 @@ impl<'a> PhysicalExtensionCodec for GlareDBExtensionCodec<'a> { Arc::new((&schema).try_into()?), )) } + proto::ExecutionPlanExtensionType::DataSourceMetricsExecAdapter(_ext) => { + Arc::new(DataSourceMetricsExecAdapter::new( + inputs + .get(0) + .ok_or_else(|| DataFusionError::Internal("missing child".to_string()))? + .clone(), + )) + } }; Ok(plan) @@ -978,6 +987,10 @@ impl<'a> PhysicalExtensionCodec for GlareDBExtensionCodec<'a> { show_statistics: true, schema: Some(exec.schema().try_into()?), }) + } else if let Some(_exec) = node.as_any().downcast_ref::() { + proto::ExecutionPlanExtensionType::DataSourceMetricsExecAdapter( + proto::DataSourceMetricsExecAdapter {}, + ) } else { return Err(DataFusionError::NotImplemented(format!( "encoding not implemented for physical plan: {}",