Skip to content

Commit

Permalink
feat: Track more data source metrics during execution (#1860)
Browse files Browse the repository at this point in the history
Adds additional metrics tracking for data sources:

```
> CREATE EXTERNAL DATABASE my_pg
::: FROM postgres OPTIONS (
:::   host = 'pg.demo.glaredb.com',
:::   port = '5432',
:::   user = 'demo',
:::   password = 'demo',
:::   database = 'postgres',
::: );
Database created
> explain analyze select * from my_pg.public.customer;
┌───────────────────┬──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ plan_type         │ plan                                                                                                                                             │
│ ──                │ ──                                                                                                                                               │
│ Utf8              │ Utf8                                                                                                                                             │
╞═══════════════════╪══════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╡
│ Plan with Metrics │ RuntimeGroupExec: runtime_preference=local, metrics=[]↵                                                                                          │
│                   │   RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, metrics=[send_time=514.826µs, fetch_time=1.588055543s, repart_time=1ns]↵ │
│                   │     PostgresBinaryCopyExec, metrics=[output_rows=150000, elapsed_compute=1ns, bytes_processed=40962000]↵                                         │
│                   │                                                                                                                                                  │
└───────────────────┴──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
```

This also includes a `bytes_processed` counter for tracking the number
of bytes when reading in batches. The idea is that we can walk the plan
after execution to get the total number of bytes processed, and emit
that _somewhere_.

---

I opened this up to get thoughts before moving on to replace the
`session_query_metrics` table and related code and look at how we want
to persist them.
  • Loading branch information
scsmithr authored Oct 2, 2023
1 parent e5e36fa commit 7a72492
Show file tree
Hide file tree
Showing 14 changed files with 363 additions and 33 deletions.
1 change: 1 addition & 0 deletions crates/datafusion_ext/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod errors;
pub mod metrics;
pub mod planner;
pub mod runtime;
pub mod vars;
Expand Down
247 changes: 247 additions & 0 deletions crates/datafusion_ext/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
use datafusion::{
arrow::datatypes::SchemaRef,
arrow::{datatypes::Schema, record_batch::RecordBatch},
error::Result,
execution::TaskContext,
physical_expr::PhysicalSortExpr,
physical_plan::{
metrics::{BaselineMetrics, ExecutionPlanMetricsSet, Gauge, MetricBuilder, MetricsSet},
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
SendableRecordBatchStream, Statistics,
},
};
use futures::{Stream, StreamExt};
use std::fmt;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::{any::Any, pin::Pin};

const BYTES_PROCESSED_GAUGE_NAME: &str = "bytes_processed";

/// Standard metrics we should be collecting for all data sources during
/// queries.
#[derive(Debug, Clone)]
pub struct DataSourceMetrics {
/// Track bytes processed by source plans.
pub bytes_processed: Gauge,

/// Baseline metrics like output rows and elapsed time.
pub baseline: BaselineMetrics,
}

impl DataSourceMetrics {
pub fn new(partition: usize, metrics: &ExecutionPlanMetricsSet) -> Self {
let bytes_processed =
MetricBuilder::new(metrics).gauge(BYTES_PROCESSED_GAUGE_NAME, partition);
let baseline = BaselineMetrics::new(metrics, partition);

Self {
bytes_processed,
baseline,
}
}

/// Track metrics based on the poll result from an async stream.
pub fn record_poll(
&self,
poll: Poll<Option<Result<RecordBatch>>>,
) -> Poll<Option<Result<RecordBatch>>> {
if let Poll::Ready(maybe_batch) = &poll {
match maybe_batch {
Some(Ok(batch)) => {
self.bytes_processed.add(batch.get_array_memory_size());
self.baseline.record_output(batch.num_rows());
}
Some(Err(_)) => self.baseline.done(),
None => self.baseline.done(),
}
}
poll
}
}

/// Thin wrapper around a record batch stream that automatically records metrics
/// about batches that are sent through the stream.
///
/// Note this should only be used when "ingesting" data during execution (data
/// sources or reading from tables) to avoid double counting bytes processed.
pub struct DataSourceMetricsStreamAdapter<S> {
pub stream: S,
pub metrics: DataSourceMetrics,
}

impl<S> DataSourceMetricsStreamAdapter<S> {
/// Create a new stream with a new set of data source metrics for the given
/// partition.
pub fn new(stream: S, partition: usize, metrics: &ExecutionPlanMetricsSet) -> Self {
Self {
stream,
metrics: DataSourceMetrics::new(partition, metrics),
}
}
}

impl<S: RecordBatchStream + Unpin> Stream for DataSourceMetricsStreamAdapter<S> {
type Item = Result<RecordBatch>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let poll = self.stream.poll_next_unpin(cx);
self.metrics.record_poll(poll)
}
}

impl<S: RecordBatchStream + Unpin> RecordBatchStream for DataSourceMetricsStreamAdapter<S> {
fn schema(&self) -> SchemaRef {
self.stream.schema()
}
}

/// Wrapper around and execution plan that returns a
/// `BoxedDataSourceMetricsStreamAdapter` for additional metrics collection.
///
/// This should _generally_ only be used for execution plans that we're not able
/// to modify directly to record metrics (e.g. Delta). Otherwise, this should be
/// skipped and metrics collection should be added to the execution plan
/// directly.
#[derive(Debug, Clone)]
pub struct DataSourceMetricsExecAdapter {
child: Arc<dyn ExecutionPlan>,
metrics: ExecutionPlanMetricsSet,
}

impl DataSourceMetricsExecAdapter {
pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
Self {
child: plan,
metrics: ExecutionPlanMetricsSet::new(),
}
}
}

impl ExecutionPlan for DataSourceMetricsExecAdapter {
fn as_any(&self) -> &dyn Any {
self
}

fn schema(&self) -> Arc<Schema> {
self.child.schema()
}

fn output_partitioning(&self) -> Partitioning {
self.child.output_partitioning()
}

fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
self.child.output_ordering()
}

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![self.child.clone()]
}

fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(Self::new(children[0].clone())))
}

fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let stream = self.child.execute(partition, context)?;
Ok(Box::pin(BoxedStreamAdapater::new(
stream,
partition,
&self.metrics,
)))
}

fn statistics(&self) -> Statistics {
self.child.statistics()
}

fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}
}

impl DisplayAs for DataSourceMetricsExecAdapter {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "DataSourceMetricsExecAdapter")
}
}

struct BoxedStreamAdapater {
stream: SendableRecordBatchStream,
metrics: DataSourceMetrics,
}

impl BoxedStreamAdapater {
fn new(
stream: SendableRecordBatchStream,
partition: usize,
metrics: &ExecutionPlanMetricsSet,
) -> Self {
Self {
stream,
metrics: DataSourceMetrics::new(partition, metrics),
}
}
}

impl Stream for BoxedStreamAdapater {
type Item = Result<RecordBatch>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let poll = self.stream.poll_next_unpin(cx);
self.metrics.record_poll(poll)
}
}

impl RecordBatchStream for BoxedStreamAdapater {
fn schema(&self) -> SchemaRef {
self.stream.schema()
}
}

#[derive(Debug, Clone)]
pub struct AggregatedMetrics {
/// Total time taken for a plan to execute.
pub elapsed_compute_ns: u64,
/// Total bytes processed.
pub bytes_processed: u64,
}

impl AggregatedMetrics {
/// Computes aggregated metrics from a plan.
///
/// The plan should have already been executed to completion, otherwise
/// partial or incorrect results will be reported.
pub fn new_from_plan(plan: &dyn ExecutionPlan) -> Self {
let mut agg = AggregatedMetrics {
elapsed_compute_ns: 0,
bytes_processed: 0,
};
agg.aggregate_recurse(plan);
agg
}

fn aggregate_recurse(&mut self, plan: &dyn ExecutionPlan) {
let metrics = match plan.metrics() {
Some(metrics) => metrics,
None => return,
};
self.elapsed_compute_ns += metrics.elapsed_compute().unwrap_or_default() as u64;
self.bytes_processed += metrics
.sum_by_name(BYTES_PROCESSED_GAUGE_NAME)
.map(|m| m.as_usize() as u64)
.unwrap_or_default();

for child in plan.children() {
self.aggregate_recurse(child.as_ref());
}
}
}
5 changes: 5 additions & 0 deletions crates/datafusion_ext/src/runtime/runtime_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use datafusion::arrow::datatypes::Schema;
use datafusion::error::Result as DataFusionResult;
use datafusion::execution::TaskContext;
use datafusion::physical_expr::PhysicalSortExpr;
use datafusion::physical_plan::metrics::MetricsSet;
use datafusion::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
Statistics,
Expand Down Expand Up @@ -69,6 +70,10 @@ impl ExecutionPlan for RuntimeGroupExec {
fn statistics(&self) -> Statistics {
self.child.statistics()
}

fn metrics(&self) -> Option<MetricsSet> {
None
}
}

impl DisplayAs for RuntimeGroupExec {
Expand Down
26 changes: 20 additions & 6 deletions crates/datasources/src/bigquery/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ use bigquery_storage::yup_oauth2::{
ServiceAccountAuthenticator,
};
use bigquery_storage::{BufferedArrowIpcReader, Client};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::arrow::{datatypes::Fields, ipc::reader::StreamReader as ArrowStreamReader};
use datafusion::datasource::TableProvider;
use datafusion::error::{DataFusionError, Result as DatafusionResult};
use datafusion::execution::context::SessionState;
Expand All @@ -29,7 +27,16 @@ use datafusion::{
},
physical_plan::memory::MemoryExec,
};
use datafusion_ext::{errors::ExtensionError, functions::VirtualLister};
use datafusion::{
arrow::record_batch::RecordBatch, physical_plan::metrics::ExecutionPlanMetricsSet,
};
use datafusion::{
arrow::{datatypes::Fields, ipc::reader::StreamReader as ArrowStreamReader},
physical_plan::metrics::MetricsSet,
};
use datafusion_ext::{
errors::ExtensionError, functions::VirtualLister, metrics::DataSourceMetricsStreamAdapter,
};
use errors::{BigQueryError, Result};
use futures::{Stream, StreamExt};
use gcp_bigquery_client::model::table_field_schema::TableFieldSchema as BigQuerySchema;
Expand Down Expand Up @@ -358,6 +365,7 @@ impl TableProvider for BigQueryTableProvider {
arrow_schema: projected_schema,
receiver: recv,
num_partitions,
metrics: ExecutionPlanMetricsSet::new(),
}))
}
}
Expand All @@ -367,6 +375,7 @@ struct BigQueryExec {
arrow_schema: ArrowSchemaRef,
receiver: Receiver<BufferedArrowIpcReader>,
num_partitions: usize,
metrics: ExecutionPlanMetricsSet,
}

impl ExecutionPlan for BigQueryExec {
Expand Down Expand Up @@ -404,16 +413,21 @@ impl ExecutionPlan for BigQueryExec {
partition: usize,
_context: Arc<TaskContext>,
) -> DatafusionResult<SendableRecordBatchStream> {
Ok(Box::pin(BufferedIpcStream::new(
self.schema(),
self.receiver.clone(),
let stream = BufferedIpcStream::new(self.schema(), self.receiver.clone(), partition);
Ok(Box::pin(DataSourceMetricsStreamAdapter::new(
stream,
partition,
&self.metrics,
)))
}

fn statistics(&self) -> Statistics {
Statistics::default()
}

fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}
}

impl DisplayAs for BigQueryExec {
Expand Down
19 changes: 14 additions & 5 deletions crates/datasources/src/mongodb/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ use datafusion::arrow::record_batch::RecordBatch;
use datafusion::error::{DataFusionError, Result as DatafusionResult};
use datafusion::execution::context::TaskContext;
use datafusion::physical_expr::PhysicalSortExpr;
use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use datafusion::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
SendableRecordBatchStream, Statistics,
};
use datafusion_ext::metrics::DataSourceMetricsStreamAdapter;
use futures::{Stream, StreamExt};
use mongodb::bson::{Document, RawDocumentBuf};
use mongodb::{options::FindOptions, Collection};
Expand All @@ -29,6 +31,7 @@ pub struct MongoBsonExec {
schema: Arc<ArrowSchema>,
collection: Collection<RawDocumentBuf>,
limit: Option<usize>,
metrics: ExecutionPlanMetricsSet,
}

impl MongoBsonExec {
Expand All @@ -41,6 +44,7 @@ impl MongoBsonExec {
schema,
collection,
limit,
metrics: ExecutionPlanMetricsSet::new(),
}
}
}
Expand Down Expand Up @@ -77,19 +81,24 @@ impl ExecutionPlan for MongoBsonExec {

fn execute(
&self,
_partition: usize,
partition: usize,
_context: Arc<TaskContext>,
) -> DatafusionResult<SendableRecordBatchStream> {
Ok(Box::pin(BsonStream::new(
self.schema.clone(),
self.collection.clone(),
self.limit,
let stream = BsonStream::new(self.schema.clone(), self.collection.clone(), self.limit);
Ok(Box::pin(DataSourceMetricsStreamAdapter::new(
stream,
partition,
&self.metrics,
)))
}

fn statistics(&self) -> Statistics {
Statistics::default()
}

fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}
}

impl DisplayAs for MongoBsonExec {
Expand Down
Loading

0 comments on commit 7a72492

Please sign in to comment.