From 7a72492ce0f588519edb23007e2a920d671ee1e3 Mon Sep 17 00:00:00 2001 From: Sean Smith Date: Mon, 2 Oct 2023 09:47:45 -0500 Subject: [PATCH] feat: Track more data source metrics during execution (#1860) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds additional metrics tracking for data sources: ``` > CREATE EXTERNAL DATABASE my_pg ::: FROM postgres OPTIONS ( ::: host = 'pg.demo.glaredb.com', ::: port = '5432', ::: user = 'demo', ::: password = 'demo', ::: database = 'postgres', ::: ); Database created > explain analyze select * from my_pg.public.customer; ┌───────────────────┬──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐ │ plan_type │ plan │ │ ── │ ── │ │ Utf8 │ Utf8 │ ╞═══════════════════╪══════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╡ │ Plan with Metrics │ RuntimeGroupExec: runtime_preference=local, metrics=[]↵ │ │ │ RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, metrics=[send_time=514.826µs, fetch_time=1.588055543s, repart_time=1ns]↵ │ │ │ PostgresBinaryCopyExec, metrics=[output_rows=150000, elapsed_compute=1ns, bytes_processed=40962000]↵ │ │ │ │ └───────────────────┴──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘ ``` This also includes a `bytes_processed` counter for tracking the number of bytes when reading in batches. The idea is that we can walk the plan after execution to get the total number of bytes processed, and emit that _somewhere_. --- I opened this up to get thoughts before moving on to replace the `session_query_metrics` table and related code and look at how we want to persist them. --- crates/datafusion_ext/src/lib.rs | 1 + crates/datafusion_ext/src/metrics.rs | 247 ++++++++++++++++++ .../src/runtime/runtime_group.rs | 5 + crates/datasources/src/bigquery/mod.rs | 26 +- crates/datasources/src/mongodb/exec.rs | 19 +- crates/datasources/src/mysql/mod.rs | 16 +- crates/datasources/src/native/access.rs | 4 +- crates/datasources/src/object_store/http.rs | 2 - crates/datasources/src/object_store/mod.rs | 13 +- crates/datasources/src/postgres/mod.rs | 24 +- crates/datasources/src/snowflake/mod.rs | 15 +- crates/protogen/src/sqlexec/physical_plan.rs | 7 +- crates/sqlexec/src/dispatch/external.rs | 4 +- crates/sqlexec/src/extension_codec.rs | 13 + 14 files changed, 363 insertions(+), 33 deletions(-) create mode 100644 crates/datafusion_ext/src/metrics.rs diff --git a/crates/datafusion_ext/src/lib.rs b/crates/datafusion_ext/src/lib.rs index f7a1f6027..abaacd2d2 100644 --- a/crates/datafusion_ext/src/lib.rs +++ b/crates/datafusion_ext/src/lib.rs @@ -1,4 +1,5 @@ pub mod errors; +pub mod metrics; pub mod planner; pub mod runtime; pub mod vars; diff --git a/crates/datafusion_ext/src/metrics.rs b/crates/datafusion_ext/src/metrics.rs new file mode 100644 index 000000000..0d98c0cd0 --- /dev/null +++ b/crates/datafusion_ext/src/metrics.rs @@ -0,0 +1,247 @@ +use datafusion::{ + arrow::datatypes::SchemaRef, + arrow::{datatypes::Schema, record_batch::RecordBatch}, + error::Result, + execution::TaskContext, + physical_expr::PhysicalSortExpr, + physical_plan::{ + metrics::{BaselineMetrics, ExecutionPlanMetricsSet, Gauge, MetricBuilder, MetricsSet}, + DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, + SendableRecordBatchStream, Statistics, + }, +}; +use futures::{Stream, StreamExt}; +use std::fmt; +use std::sync::Arc; +use std::task::{Context, Poll}; +use std::{any::Any, pin::Pin}; + +const BYTES_PROCESSED_GAUGE_NAME: &str = "bytes_processed"; + +/// Standard metrics we should be collecting for all data sources during +/// queries. +#[derive(Debug, Clone)] +pub struct DataSourceMetrics { + /// Track bytes processed by source plans. + pub bytes_processed: Gauge, + + /// Baseline metrics like output rows and elapsed time. + pub baseline: BaselineMetrics, +} + +impl DataSourceMetrics { + pub fn new(partition: usize, metrics: &ExecutionPlanMetricsSet) -> Self { + let bytes_processed = + MetricBuilder::new(metrics).gauge(BYTES_PROCESSED_GAUGE_NAME, partition); + let baseline = BaselineMetrics::new(metrics, partition); + + Self { + bytes_processed, + baseline, + } + } + + /// Track metrics based on the poll result from an async stream. + pub fn record_poll( + &self, + poll: Poll>>, + ) -> Poll>> { + if let Poll::Ready(maybe_batch) = &poll { + match maybe_batch { + Some(Ok(batch)) => { + self.bytes_processed.add(batch.get_array_memory_size()); + self.baseline.record_output(batch.num_rows()); + } + Some(Err(_)) => self.baseline.done(), + None => self.baseline.done(), + } + } + poll + } +} + +/// Thin wrapper around a record batch stream that automatically records metrics +/// about batches that are sent through the stream. +/// +/// Note this should only be used when "ingesting" data during execution (data +/// sources or reading from tables) to avoid double counting bytes processed. +pub struct DataSourceMetricsStreamAdapter { + pub stream: S, + pub metrics: DataSourceMetrics, +} + +impl DataSourceMetricsStreamAdapter { + /// Create a new stream with a new set of data source metrics for the given + /// partition. + pub fn new(stream: S, partition: usize, metrics: &ExecutionPlanMetricsSet) -> Self { + Self { + stream, + metrics: DataSourceMetrics::new(partition, metrics), + } + } +} + +impl Stream for DataSourceMetricsStreamAdapter { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let poll = self.stream.poll_next_unpin(cx); + self.metrics.record_poll(poll) + } +} + +impl RecordBatchStream for DataSourceMetricsStreamAdapter { + fn schema(&self) -> SchemaRef { + self.stream.schema() + } +} + +/// Wrapper around and execution plan that returns a +/// `BoxedDataSourceMetricsStreamAdapter` for additional metrics collection. +/// +/// This should _generally_ only be used for execution plans that we're not able +/// to modify directly to record metrics (e.g. Delta). Otherwise, this should be +/// skipped and metrics collection should be added to the execution plan +/// directly. +#[derive(Debug, Clone)] +pub struct DataSourceMetricsExecAdapter { + child: Arc, + metrics: ExecutionPlanMetricsSet, +} + +impl DataSourceMetricsExecAdapter { + pub fn new(plan: Arc) -> Self { + Self { + child: plan, + metrics: ExecutionPlanMetricsSet::new(), + } + } +} + +impl ExecutionPlan for DataSourceMetricsExecAdapter { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> Arc { + self.child.schema() + } + + fn output_partitioning(&self) -> Partitioning { + self.child.output_partitioning() + } + + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + self.child.output_ordering() + } + + fn children(&self) -> Vec> { + vec![self.child.clone()] + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> Result> { + Ok(Arc::new(Self::new(children[0].clone()))) + } + + fn execute( + &self, + partition: usize, + context: Arc, + ) -> Result { + let stream = self.child.execute(partition, context)?; + Ok(Box::pin(BoxedStreamAdapater::new( + stream, + partition, + &self.metrics, + ))) + } + + fn statistics(&self) -> Statistics { + self.child.statistics() + } + + fn metrics(&self) -> Option { + Some(self.metrics.clone_inner()) + } +} + +impl DisplayAs for DataSourceMetricsExecAdapter { + fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "DataSourceMetricsExecAdapter") + } +} + +struct BoxedStreamAdapater { + stream: SendableRecordBatchStream, + metrics: DataSourceMetrics, +} + +impl BoxedStreamAdapater { + fn new( + stream: SendableRecordBatchStream, + partition: usize, + metrics: &ExecutionPlanMetricsSet, + ) -> Self { + Self { + stream, + metrics: DataSourceMetrics::new(partition, metrics), + } + } +} + +impl Stream for BoxedStreamAdapater { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let poll = self.stream.poll_next_unpin(cx); + self.metrics.record_poll(poll) + } +} + +impl RecordBatchStream for BoxedStreamAdapater { + fn schema(&self) -> SchemaRef { + self.stream.schema() + } +} + +#[derive(Debug, Clone)] +pub struct AggregatedMetrics { + /// Total time taken for a plan to execute. + pub elapsed_compute_ns: u64, + /// Total bytes processed. + pub bytes_processed: u64, +} + +impl AggregatedMetrics { + /// Computes aggregated metrics from a plan. + /// + /// The plan should have already been executed to completion, otherwise + /// partial or incorrect results will be reported. + pub fn new_from_plan(plan: &dyn ExecutionPlan) -> Self { + let mut agg = AggregatedMetrics { + elapsed_compute_ns: 0, + bytes_processed: 0, + }; + agg.aggregate_recurse(plan); + agg + } + + fn aggregate_recurse(&mut self, plan: &dyn ExecutionPlan) { + let metrics = match plan.metrics() { + Some(metrics) => metrics, + None => return, + }; + self.elapsed_compute_ns += metrics.elapsed_compute().unwrap_or_default() as u64; + self.bytes_processed += metrics + .sum_by_name(BYTES_PROCESSED_GAUGE_NAME) + .map(|m| m.as_usize() as u64) + .unwrap_or_default(); + + for child in plan.children() { + self.aggregate_recurse(child.as_ref()); + } + } +} diff --git a/crates/datafusion_ext/src/runtime/runtime_group.rs b/crates/datafusion_ext/src/runtime/runtime_group.rs index ede71c130..28a0c0881 100644 --- a/crates/datafusion_ext/src/runtime/runtime_group.rs +++ b/crates/datafusion_ext/src/runtime/runtime_group.rs @@ -2,6 +2,7 @@ use datafusion::arrow::datatypes::Schema; use datafusion::error::Result as DataFusionResult; use datafusion::execution::TaskContext; use datafusion::physical_expr::PhysicalSortExpr; +use datafusion::physical_plan::metrics::MetricsSet; use datafusion::physical_plan::{ DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics, @@ -69,6 +70,10 @@ impl ExecutionPlan for RuntimeGroupExec { fn statistics(&self) -> Statistics { self.child.statistics() } + + fn metrics(&self) -> Option { + None + } } impl DisplayAs for RuntimeGroupExec { diff --git a/crates/datasources/src/bigquery/mod.rs b/crates/datasources/src/bigquery/mod.rs index 0db7896db..6ad1c09ce 100644 --- a/crates/datasources/src/bigquery/mod.rs +++ b/crates/datasources/src/bigquery/mod.rs @@ -10,8 +10,6 @@ use bigquery_storage::yup_oauth2::{ ServiceAccountAuthenticator, }; use bigquery_storage::{BufferedArrowIpcReader, Client}; -use datafusion::arrow::record_batch::RecordBatch; -use datafusion::arrow::{datatypes::Fields, ipc::reader::StreamReader as ArrowStreamReader}; use datafusion::datasource::TableProvider; use datafusion::error::{DataFusionError, Result as DatafusionResult}; use datafusion::execution::context::SessionState; @@ -29,7 +27,16 @@ use datafusion::{ }, physical_plan::memory::MemoryExec, }; -use datafusion_ext::{errors::ExtensionError, functions::VirtualLister}; +use datafusion::{ + arrow::record_batch::RecordBatch, physical_plan::metrics::ExecutionPlanMetricsSet, +}; +use datafusion::{ + arrow::{datatypes::Fields, ipc::reader::StreamReader as ArrowStreamReader}, + physical_plan::metrics::MetricsSet, +}; +use datafusion_ext::{ + errors::ExtensionError, functions::VirtualLister, metrics::DataSourceMetricsStreamAdapter, +}; use errors::{BigQueryError, Result}; use futures::{Stream, StreamExt}; use gcp_bigquery_client::model::table_field_schema::TableFieldSchema as BigQuerySchema; @@ -358,6 +365,7 @@ impl TableProvider for BigQueryTableProvider { arrow_schema: projected_schema, receiver: recv, num_partitions, + metrics: ExecutionPlanMetricsSet::new(), })) } } @@ -367,6 +375,7 @@ struct BigQueryExec { arrow_schema: ArrowSchemaRef, receiver: Receiver, num_partitions: usize, + metrics: ExecutionPlanMetricsSet, } impl ExecutionPlan for BigQueryExec { @@ -404,16 +413,21 @@ impl ExecutionPlan for BigQueryExec { partition: usize, _context: Arc, ) -> DatafusionResult { - Ok(Box::pin(BufferedIpcStream::new( - self.schema(), - self.receiver.clone(), + let stream = BufferedIpcStream::new(self.schema(), self.receiver.clone(), partition); + Ok(Box::pin(DataSourceMetricsStreamAdapter::new( + stream, partition, + &self.metrics, ))) } fn statistics(&self) -> Statistics { Statistics::default() } + + fn metrics(&self) -> Option { + Some(self.metrics.clone_inner()) + } } impl DisplayAs for BigQueryExec { diff --git a/crates/datasources/src/mongodb/exec.rs b/crates/datasources/src/mongodb/exec.rs index 5b20d74a7..57686da26 100644 --- a/crates/datasources/src/mongodb/exec.rs +++ b/crates/datasources/src/mongodb/exec.rs @@ -7,10 +7,12 @@ use datafusion::arrow::record_batch::RecordBatch; use datafusion::error::{DataFusionError, Result as DatafusionResult}; use datafusion::execution::context::TaskContext; use datafusion::physical_expr::PhysicalSortExpr; +use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use datafusion::physical_plan::{ DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream, Statistics, }; +use datafusion_ext::metrics::DataSourceMetricsStreamAdapter; use futures::{Stream, StreamExt}; use mongodb::bson::{Document, RawDocumentBuf}; use mongodb::{options::FindOptions, Collection}; @@ -29,6 +31,7 @@ pub struct MongoBsonExec { schema: Arc, collection: Collection, limit: Option, + metrics: ExecutionPlanMetricsSet, } impl MongoBsonExec { @@ -41,6 +44,7 @@ impl MongoBsonExec { schema, collection, limit, + metrics: ExecutionPlanMetricsSet::new(), } } } @@ -77,19 +81,24 @@ impl ExecutionPlan for MongoBsonExec { fn execute( &self, - _partition: usize, + partition: usize, _context: Arc, ) -> DatafusionResult { - Ok(Box::pin(BsonStream::new( - self.schema.clone(), - self.collection.clone(), - self.limit, + let stream = BsonStream::new(self.schema.clone(), self.collection.clone(), self.limit); + Ok(Box::pin(DataSourceMetricsStreamAdapter::new( + stream, + partition, + &self.metrics, ))) } fn statistics(&self) -> Statistics { Statistics::default() } + + fn metrics(&self) -> Option { + Some(self.metrics.clone_inner()) + } } impl DisplayAs for MongoBsonExec { diff --git a/crates/datasources/src/mysql/mod.rs b/crates/datasources/src/mysql/mod.rs index c20cda17f..badf806c4 100644 --- a/crates/datasources/src/mysql/mod.rs +++ b/crates/datasources/src/mysql/mod.rs @@ -21,12 +21,14 @@ use datafusion::error::{DataFusionError, Result as DatafusionResult}; use datafusion::execution::context::{SessionState, TaskContext}; use datafusion::logical_expr::{Expr, TableProviderFilterPushDown, TableType}; use datafusion::physical_expr::PhysicalSortExpr; +use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use datafusion::physical_plan::{ DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream, Statistics, }; use datafusion_ext::errors::ExtensionError; use datafusion_ext::functions::VirtualLister; +use datafusion_ext::metrics::DataSourceMetricsStreamAdapter; use futures::{Stream, StreamExt, TryStreamExt}; use mysql_async::consts::{ColumnFlags, ColumnType}; use mysql_async::prelude::Queryable; @@ -369,6 +371,7 @@ impl TableProvider for MysqlTableProvider { accessor: self.accessor.clone(), query, arrow_schema: projected_schema, + metrics: ExecutionPlanMetricsSet::new(), })) } } @@ -380,6 +383,7 @@ struct MysqlExec { accessor: Arc, query: String, arrow_schema: ArrowSchemaRef, + metrics: ExecutionPlanMetricsSet, } impl ExecutionPlan for MysqlExec { @@ -414,7 +418,7 @@ impl ExecutionPlan for MysqlExec { fn execute( &self, - _partition: usize, + partition: usize, _context: Arc, ) -> DatafusionResult { let stream = MysqlQueryStream::open( @@ -424,12 +428,20 @@ impl ExecutionPlan for MysqlExec { ) .map_err(|e| DataFusionError::External(Box::new(e)))?; - Ok(Box::pin(stream)) + Ok(Box::pin(DataSourceMetricsStreamAdapter::new( + stream, + partition, + &self.metrics, + ))) } fn statistics(&self) -> Statistics { Statistics::default() } + + fn metrics(&self) -> Option { + Some(self.metrics.clone_inner()) + } } impl DisplayAs for MysqlExec { diff --git a/crates/datasources/src/native/access.rs b/crates/datasources/src/native/access.rs index f27908c94..4e1c9aeff 100644 --- a/crates/datasources/src/native/access.rs +++ b/crates/datasources/src/native/access.rs @@ -9,6 +9,7 @@ use datafusion::logical_expr::{LogicalPlan, TableProviderFilterPushDown, TableTy use datafusion::physical_plan::empty::EmptyExec; use datafusion::physical_plan::{ExecutionPlan, Statistics}; use datafusion::prelude::Expr; +use datafusion_ext::metrics::DataSourceMetricsExecAdapter; use deltalake::operations::create::CreateBuilder; use deltalake::operations::delete::DeleteBuilder; use deltalake::operations::update::UpdateBuilder; @@ -289,7 +290,8 @@ impl TableProvider for NativeTable { let schema = TableProvider::schema(&self.delta); Ok(Arc::new(EmptyExec::new(false, schema))) } else { - self.delta.scan(session, projection, filters, limit).await + let plan = self.delta.scan(session, projection, filters, limit).await?; + Ok(Arc::new(DataSourceMetricsExecAdapter::new(plan))) } } diff --git a/crates/datasources/src/object_store/http.rs b/crates/datasources/src/object_store/http.rs index a45fa6327..60dd54d7d 100644 --- a/crates/datasources/src/object_store/http.rs +++ b/crates/datasources/src/object_store/http.rs @@ -125,7 +125,6 @@ impl ObjStoreAccess for HttpStoreAccess { file_format: file_format.clone(), base_url, objects, - _predicate_pushdown: true, }); providers.push(prov); @@ -163,7 +162,6 @@ impl HttpStoreAccess { base_url, objects, file_format, - _predicate_pushdown: true, }) } } diff --git a/crates/datasources/src/object_store/mod.rs b/crates/datasources/src/object_store/mod.rs index 99488e480..b8641ab51 100644 --- a/crates/datasources/src/object_store/mod.rs +++ b/crates/datasources/src/object_store/mod.rs @@ -16,6 +16,7 @@ use datafusion::logical_expr::TableType; use datafusion::physical_plan::union::UnionExec; use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::Expr; +use datafusion_ext::metrics::DataSourceMetricsExecAdapter; use errors::ObjectStoreSourceError; use futures::StreamExt; use glob::{MatchOptions, Pattern}; @@ -212,7 +213,6 @@ pub trait ObjStoreAccess: Debug + Display + Send + Sync { base_url, objects, file_format, - _predicate_pushdown: true, })) } } @@ -251,7 +251,6 @@ impl ObjStoreAccessor { state: &SessionState, file_format: Arc, objects: Vec, - _predicate_pushdown: bool, ) -> Result> { let store = self.store; let arrow_schema = file_format.infer_schema(state, &store, &objects).await?; @@ -263,7 +262,6 @@ impl ObjStoreAccessor { base_url, objects, file_format, - _predicate_pushdown, })) } } @@ -275,8 +273,6 @@ pub struct ObjStoreTableProvider { base_url: ObjectStoreUrl, objects: Vec, file_format: Arc, - - _predicate_pushdown: bool, } #[async_trait] @@ -331,10 +327,13 @@ impl TableProvider for ObjStoreTableProvider { ctx.runtime_env() .register_object_store(self.base_url.as_ref(), self.store.clone()); - self.file_format + let plan = self + .file_format .create_physical_plan(ctx, config, filters.as_ref()) .await - .map_err(|e| DataFusionError::External(Box::new(e))) + .map_err(|e| DataFusionError::External(Box::new(e)))?; + + Ok(Arc::new(DataSourceMetricsExecAdapter::new(plan))) } } diff --git a/crates/datasources/src/postgres/mod.rs b/crates/datasources/src/postgres/mod.rs index 28bb4ca7d..d47c9f9cb 100644 --- a/crates/datasources/src/postgres/mod.rs +++ b/crates/datasources/src/postgres/mod.rs @@ -19,12 +19,15 @@ use datafusion::execution::context::SessionState; use datafusion::execution::context::TaskContext; use datafusion::logical_expr::{Expr, TableProviderFilterPushDown, TableType}; use datafusion::physical_expr::PhysicalSortExpr; +use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; +use datafusion::physical_plan::metrics::MetricsSet; use datafusion::physical_plan::{ DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream, Statistics, }; use datafusion_ext::errors::ExtensionError; use datafusion_ext::functions::VirtualLister; +use datafusion_ext::metrics::DataSourceMetricsStreamAdapter; use errors::{PostgresError, Result}; use futures::{future::BoxFuture, ready, stream::BoxStream, FutureExt, Stream, StreamExt}; use protogen::metastore::types::options::TunnelOptions; @@ -700,6 +703,7 @@ pub struct PostgresBinaryCopyExec { pg_types: Arc>, arrow_schema: ArrowSchemaRef, opener: StreamOpener, + metrics: ExecutionPlanMetricsSet, } impl PostgresBinaryCopyExec { @@ -719,6 +723,7 @@ impl PostgresBinaryCopyExec { pg_types: Arc::new(pg_types), arrow_schema: Arc::new(arrow_schema), opener, + metrics: ExecutionPlanMetricsSet::new(), }) } BinaryCopyConfig::State { @@ -732,6 +737,7 @@ impl PostgresBinaryCopyExec { pg_types, arrow_schema, opener, + metrics: ExecutionPlanMetricsSet::new(), }) } } @@ -764,13 +770,13 @@ impl ExecutionPlan for PostgresBinaryCopyExec { _children: Vec>, ) -> DatafusionResult> { Err(DataFusionError::Execution( - "cannot replace children for BinaryCopyExec".to_string(), + "cannot replace children for PostgresBinaryCopyExec".to_string(), )) } fn execute( &self, - _partition: usize, + partition: usize, _context: Arc, ) -> DatafusionResult { let stream = ChunkStream { @@ -779,23 +785,31 @@ impl ExecutionPlan for PostgresBinaryCopyExec { opener: self.opener.clone(), arrow_schema: self.arrow_schema.clone(), }; - Ok(Box::pin(stream)) + Ok(Box::pin(DataSourceMetricsStreamAdapter::new( + stream, + partition, + &self.metrics, + ))) } fn statistics(&self) -> Statistics { Statistics::default() } + + fn metrics(&self) -> Option { + Some(self.metrics.clone_inner()) + } } impl DisplayAs for PostgresBinaryCopyExec { fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "BinaryCopyExec",) + write!(f, "PostgresBinaryCopyExec",) } } impl fmt::Debug for PostgresBinaryCopyExec { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("BinaryCopyExec") + f.debug_struct("PostgresBinaryCopyExec") .field("pg_types", &self.pg_types) .field("arrow_schema", &self.arrow_schema) .finish() diff --git a/crates/datasources/src/snowflake/mod.rs b/crates/datasources/src/snowflake/mod.rs index 82606624f..96b49950b 100644 --- a/crates/datasources/src/snowflake/mod.rs +++ b/crates/datasources/src/snowflake/mod.rs @@ -12,6 +12,7 @@ use datafusion::arrow::datatypes::Fields; use datafusion::arrow::record_batch::RecordBatch; use datafusion::execution::context::TaskContext; use datafusion::physical_expr::PhysicalSortExpr; +use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use datafusion::physical_plan::{ DisplayAs, DisplayFormatType, Partitioning, RecordBatchStream, Statistics, }; @@ -26,6 +27,7 @@ use datafusion::{ }; use datafusion_ext::errors::ExtensionError; use datafusion_ext::functions::VirtualLister; +use datafusion_ext::metrics::DataSourceMetricsStreamAdapter; use futures::{Stream, StreamExt}; use snowflake_connector::{ datatype::SnowflakeDataType, snowflake_to_arrow_datatype, Connection as SnowflakeConnection, @@ -375,6 +377,7 @@ impl TableProvider for SnowflakeTableProvider { arrow_schema: projection_schema, num_partitions, result: Mutex::new(result), + metrics: ExecutionPlanMetricsSet::new(), })) } } @@ -384,6 +387,7 @@ struct SnowflakeExec { arrow_schema: ArrowSchemaRef, num_partitions: usize, result: Mutex, + metrics: ExecutionPlanMetricsSet, } impl ExecutionPlan for SnowflakeExec { @@ -427,12 +431,21 @@ impl ExecutionPlan for SnowflakeExec { "missing chunk for partition: {partition}" )))? }; - Ok(Box::pin(ChunkStream::new(self.schema(), chunk))) + let stream = ChunkStream::new(self.schema(), chunk); + Ok(Box::pin(DataSourceMetricsStreamAdapter::new( + stream, + partition, + &self.metrics, + ))) } fn statistics(&self) -> Statistics { Statistics::default() } + + fn metrics(&self) -> Option { + Some(self.metrics.clone_inner()) + } } impl DisplayAs for SnowflakeExec { diff --git a/crates/protogen/src/sqlexec/physical_plan.rs b/crates/protogen/src/sqlexec/physical_plan.rs index 39fc6bb1d..7842c6ddc 100644 --- a/crates/protogen/src/sqlexec/physical_plan.rs +++ b/crates/protogen/src/sqlexec/physical_plan.rs @@ -292,6 +292,9 @@ pub struct InterleaveExec {} #[derive(Clone, PartialEq, Message)] pub struct RuntimeGroupExec {} +#[derive(Clone, PartialEq, Message)] +pub struct DataSourceMetricsExecAdapter {} + #[derive(Clone, PartialEq, Message)] pub struct AnalyzeExec { #[prost(bool, tag = "1")] @@ -306,7 +309,7 @@ pub struct AnalyzeExec { pub struct ExecutionPlanExtension { #[prost( oneof = "ExecutionPlanExtensionType", - tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29" + tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30" )] pub inner: Option, } @@ -376,4 +379,6 @@ pub enum ExecutionPlanExtensionType { RuntimeGroupExec(RuntimeGroupExec), #[prost(message, tag = "29")] AnalyzeExec(AnalyzeExec), + #[prost(message, tag = "30")] + DataSourceMetricsExecAdapter(DataSourceMetricsExecAdapter), } diff --git a/crates/sqlexec/src/dispatch/external.rs b/crates/sqlexec/src/dispatch/external.rs index 76591ef15..75ce175fd 100644 --- a/crates/sqlexec/src/dispatch/external.rs +++ b/crates/sqlexec/src/dispatch/external.rs @@ -451,9 +451,7 @@ impl<'a> ExternalDispatcher<'a> { let objects = accessor.list_globbed(location).await?; let state = self.df_ctx.state(); - let provider = accessor - .into_table_provider(&state, ft, objects, /* predicate_pushdown = */ true) - .await?; + let provider = accessor.into_table_provider(&state, ft, objects).await?; Ok(provider) } diff --git a/crates/sqlexec/src/extension_codec.rs b/crates/sqlexec/src/extension_codec.rs index 0f953ba4d..adbb8f833 100644 --- a/crates/sqlexec/src/extension_codec.rs +++ b/crates/sqlexec/src/extension_codec.rs @@ -17,6 +17,7 @@ use datafusion::physical_plan::union::InterleaveExec; use datafusion::physical_plan::values::ValuesExec; use datafusion::physical_plan::{displayable, ExecutionPlan}; use datafusion::prelude::{Expr, SessionContext}; +use datafusion_ext::metrics::DataSourceMetricsExecAdapter; use datafusion_ext::runtime::runtime_group::RuntimeGroupExec; use datafusion_proto::logical_plan::from_proto::parse_expr; use datafusion_proto::logical_plan::LogicalExtensionCodec; @@ -714,6 +715,14 @@ impl<'a> PhysicalExtensionCodec for GlareDBExtensionCodec<'a> { Arc::new((&schema).try_into()?), )) } + proto::ExecutionPlanExtensionType::DataSourceMetricsExecAdapter(_ext) => { + Arc::new(DataSourceMetricsExecAdapter::new( + inputs + .get(0) + .ok_or_else(|| DataFusionError::Internal("missing child".to_string()))? + .clone(), + )) + } }; Ok(plan) @@ -978,6 +987,10 @@ impl<'a> PhysicalExtensionCodec for GlareDBExtensionCodec<'a> { show_statistics: true, schema: Some(exec.schema().try_into()?), }) + } else if let Some(_exec) = node.as_any().downcast_ref::() { + proto::ExecutionPlanExtensionType::DataSourceMetricsExecAdapter( + proto::DataSourceMetricsExecAdapter {}, + ) } else { return Err(DataFusionError::NotImplemented(format!( "encoding not implemented for physical plan: {}",