Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use atomics for SQLMetric implementation, remove unused name field #25

Merged
merged 1 commit into from
Apr 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 6 additions & 10 deletions datafusion/src/physical_plan/hash_aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
//! Defines the execution plan for the hash aggregate operation

use std::any::Any;
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use std::task::{Context, Poll};

use ahash::RandomState;
Expand Down Expand Up @@ -95,7 +95,7 @@ pub struct HashAggregateExec {
/// to the partial aggregate
input_schema: SchemaRef,
/// Metric to track number of output rows
output_rows: Arc<Mutex<SQLMetric>>,
output_rows: Arc<SQLMetric>,
}

fn create_schema(
Expand Down Expand Up @@ -144,7 +144,7 @@ impl HashAggregateExec {

let schema = Arc::new(schema);

let output_rows = SQLMetric::counter("outputRows");
let output_rows = SQLMetric::counter();

Ok(HashAggregateExec {
mode,
Expand Down Expand Up @@ -253,10 +253,7 @@ impl ExecutionPlan for HashAggregateExec {

fn metrics(&self) -> HashMap<String, SQLMetric> {
let mut metrics = HashMap::new();
metrics.insert(
"outputRows".to_owned(),
self.output_rows.lock().unwrap().clone(),
);
metrics.insert("outputRows".to_owned(), (*self.output_rows).clone());
metrics
}
}
Expand Down Expand Up @@ -292,7 +289,7 @@ pin_project! {
#[pin]
output: futures::channel::oneshot::Receiver<ArrowResult<RecordBatch>>,
finished: bool,
output_rows: Arc<Mutex<SQLMetric>>,
output_rows: Arc<SQLMetric>,
}
}

Expand Down Expand Up @@ -644,7 +641,7 @@ impl GroupedHashAggregateStream {
group_expr: Vec<Arc<dyn PhysicalExpr>>,
aggr_expr: Vec<Arc<dyn AggregateExpr>>,
input: SendableRecordBatchStream,
output_rows: Arc<Mutex<SQLMetric>>,
output_rows: Arc<SQLMetric>,
) -> Self {
let (tx, rx) = futures::channel::oneshot::channel();

Expand Down Expand Up @@ -702,7 +699,6 @@ impl Stream for GroupedHashAggregateStream {
};

if let Ok(batch) = &result {
let mut output_rows = output_rows.lock().unwrap();
output_rows.add(batch.num_rows())
}

Expand Down
40 changes: 25 additions & 15 deletions datafusion/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
//! Traits for physical query plan, supporting parallel execution for partitioned relations.

use std::fmt::{Debug, Display};
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::{any::Any, pin::Pin};

use crate::execution::context::ExecutionContextState;
Expand Down Expand Up @@ -58,44 +59,53 @@ pub enum MetricType {

/// SQL metric such as counter (number of input or output rows) or timing information about
/// a physical operator.
#[derive(Debug, Clone)]
#[derive(Debug)]
pub struct SQLMetric {
/// Metric name
name: String,
/// Metric value
value: usize,
value: AtomicUsize,
/// Metric type
metric_type: MetricType,
}

impl Clone for SQLMetric {
fn clone(&self) -> Self {
Self {
value: AtomicUsize::new(self.value.load(Ordering::Relaxed)),
metric_type: self.metric_type.clone(),
}
}
}

impl SQLMetric {
// relaxed ordering for operations on `value` poses no issues
// we're purely using atomic ops with no associated memory ops

/// Create a new metric for tracking a counter
pub fn counter(name: &str) -> Arc<Mutex<SQLMetric>> {
Arc::new(Mutex::new(SQLMetric::new(name, MetricType::Counter)))
pub fn counter() -> Arc<SQLMetric> {
Arc::new(SQLMetric::new(MetricType::Counter))
}

/// Create a new metric for tracking time in nanoseconds
pub fn time_nanos(name: &str) -> Arc<Mutex<SQLMetric>> {
Arc::new(Mutex::new(SQLMetric::new(name, MetricType::TimeNanos)))
pub fn time_nanos() -> Arc<SQLMetric> {
Arc::new(SQLMetric::new(MetricType::TimeNanos))
}

/// Create a new SQLMetric
pub fn new(name: &str, metric_type: MetricType) -> Self {
pub fn new(metric_type: MetricType) -> Self {
Self {
name: name.to_owned(),
value: 0,
value: AtomicUsize::new(0),
metric_type,
}
}

/// Add to the value
pub fn add(&mut self, n: usize) {
self.value += n;
pub fn add(&self, n: usize) {
self.value.fetch_add(n, Ordering::Relaxed);
}

/// Get the current value
pub fn value(&self) -> usize {
self.value
self.value.load(Ordering::Relaxed)
}
}

Expand Down
32 changes: 12 additions & 20 deletions datafusion/src/physical_plan/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

use std::any::Any;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Instant;

Expand Down Expand Up @@ -52,9 +52,9 @@ pub struct SortExec {
/// Sort expressions
expr: Vec<PhysicalSortExpr>,
/// Output rows
output_rows: Arc<Mutex<SQLMetric>>,
output_rows: Arc<SQLMetric>,
/// Time to sort batches
sort_time_nanos: Arc<Mutex<SQLMetric>>,
sort_time_nanos: Arc<SQLMetric>,
}

impl SortExec {
Expand All @@ -66,8 +66,8 @@ impl SortExec {
Ok(Self {
expr,
input,
output_rows: SQLMetric::counter("outputRows"),
sort_time_nanos: SQLMetric::time_nanos("sortTime"),
output_rows: SQLMetric::counter(),
sort_time_nanos: SQLMetric::time_nanos(),
})
}

Expand Down Expand Up @@ -147,14 +147,8 @@ impl ExecutionPlan for SortExec {

fn metrics(&self) -> HashMap<String, SQLMetric> {
let mut metrics = HashMap::new();
metrics.insert(
"outputRows".to_owned(),
self.output_rows.lock().unwrap().clone(),
);
metrics.insert(
"sortTime".to_owned(),
self.sort_time_nanos.lock().unwrap().clone(),
);
metrics.insert("outputRows".to_owned(), (*self.output_rows).clone());
metrics.insert("sortTime".to_owned(), (*self.sort_time_nanos).clone());
metrics
}
}
Expand Down Expand Up @@ -224,16 +218,16 @@ pin_project! {
output: futures::channel::oneshot::Receiver<ArrowResult<Option<RecordBatch>>>,
finished: bool,
schema: SchemaRef,
output_rows: Arc<Mutex<SQLMetric>>,
output_rows: Arc<SQLMetric>,
}
}

impl SortStream {
fn new(
input: SendableRecordBatchStream,
expr: Vec<PhysicalSortExpr>,
output_rows: Arc<Mutex<SQLMetric>>,
sort_time: Arc<Mutex<SQLMetric>>,
output_rows: Arc<SQLMetric>,
sort_time: Arc<SQLMetric>,
) -> Self {
let (tx, rx) = futures::channel::oneshot::channel();

Expand All @@ -246,7 +240,6 @@ impl SortStream {
.and_then(move |batches| {
let now = Instant::now();
let result = sort_batches(&batches, &schema, &expr);
let mut sort_time = sort_time.lock().unwrap();
sort_time.add(now.elapsed().as_nanos() as usize);
result
});
Expand Down Expand Up @@ -288,7 +281,6 @@ impl Stream for SortStream {
};

if let Some(Ok(batch)) = &result {
let mut output_rows = output_rows.lock().unwrap();
output_rows.add(batch.num_rows());
}

Expand Down Expand Up @@ -431,8 +423,8 @@ mod tests {
assert_eq!(DataType::Float64, *sort_exec.schema().field(1).data_type());

let result: Vec<RecordBatch> = collect(sort_exec.clone()).await?;
assert!(sort_exec.metrics().get("sortTime").unwrap().value > 0);
assert_eq!(sort_exec.metrics().get("outputRows").unwrap().value, 8);
assert!(sort_exec.metrics().get("sortTime").unwrap().value() > 0);
assert_eq!(sort_exec.metrics().get("outputRows").unwrap().value(), 8);
assert_eq!(result.len(), 1);

let columns = result[0].columns();
Expand Down