From 75ac9230fd36cb7ca515320bd5efa75e3704288a Mon Sep 17 00:00:00 2001 From: universalmind303 Date: Fri, 19 Jan 2024 11:15:51 -0600 Subject: [PATCH] fix: don't schedule the exec, just exec immediately --- crates/sqlexec/src/distexec/pipeline.rs | 31 ------------------- crates/sqlexec/src/distexec/scheduler.rs | 1 - .../src/planner/physical_plan/drop_tables.rs | 27 ++++------------ 3 files changed, 6 insertions(+), 53 deletions(-) diff --git a/crates/sqlexec/src/distexec/pipeline.rs b/crates/sqlexec/src/distexec/pipeline.rs index 1f3a2ca47..9b14b4c83 100644 --- a/crates/sqlexec/src/distexec/pipeline.rs +++ b/crates/sqlexec/src/distexec/pipeline.rs @@ -41,37 +41,6 @@ pub trait Sink: Send + Sync + Debug { fn finish(&self, child: usize, partition: usize) -> Result<()>; } -#[derive(Debug)] -// NOOP that just logs the input -pub struct LogSink; - -impl Sink for LogSink { - fn push(&self, input: RecordBatch, child: usize, partition: usize) -> Result<()> { - tracing::debug!( - "DebugSink: child={}, partition={}, batch={:?}", - child, - partition, - input - ); - Ok(()) - } - - fn finish(&self, child: usize, partition: usize) -> Result<()> { - tracing::debug!( - "DebugSink: child={}, partition={} finished", - child, - partition - ); - Ok(()) - } -} -impl ErrorSink for LogSink { - fn push_error(&self, err: DistExecError, partition: usize) -> Result<()> { - tracing::error!("DebugSink: partition={} error={:?}", partition, err); - Ok(()) - } -} - pub trait ErrorSink: Send + Sync + Debug { fn push_error(&self, err: DistExecError, partition: usize) -> Result<()>; } diff --git a/crates/sqlexec/src/distexec/scheduler.rs b/crates/sqlexec/src/distexec/scheduler.rs index d1af4a78d..fc6599786 100644 --- a/crates/sqlexec/src/distexec/scheduler.rs +++ b/crates/sqlexec/src/distexec/scheduler.rs @@ -56,7 +56,6 @@ impl Scheduler { context: Arc, output: OutputSink, ) -> Result<()> { - println!("Scheduling plan"); let pipeline = PipelineBuilder::new(plan, context) .build(output.batches.clone(), output.errors.clone())?; diff --git a/crates/sqlexec/src/planner/physical_plan/drop_tables.rs b/crates/sqlexec/src/planner/physical_plan/drop_tables.rs index 10dde0ac3..ab88b82b7 100644 --- a/crates/sqlexec/src/planner/physical_plan/drop_tables.rs +++ b/crates/sqlexec/src/planner/physical_plan/drop_tables.rs @@ -1,5 +1,3 @@ -use crate::distexec::pipeline::LogSink; -use crate::distexec::scheduler::{OutputSink, Scheduler}; use crate::planner::logical_plan::OwnedFullObjectReference; use super::{new_operation_batch, GENERIC_OPERATION_PHYSICAL_SCHEMA}; @@ -13,7 +11,7 @@ use datafusion::physical_plan::{ stream::RecordBatchStreamAdapter, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics, }; -use futures::stream; +use futures::{stream, StreamExt}; use protogen::metastore::types::catalog::TableEntry; use protogen::metastore::types::service::{self, Mutation}; use sqlbuiltins::functions::table::system::remove_delta_tables::DeleteDeltaTablesOperation; @@ -94,17 +92,11 @@ async fn drop_tables( context: Arc, plan: DropTablesExec, ) -> DataFusionResult { - println!("Executing DropTablesExec"); let mutator = context .session_config() .get_extension::() .expect("context should have catalog mutator"); - let scheduler = context - .session_config() - .get_extension::() - .expect("context should have scheduler"); - let drops = plan.tbl_references.into_iter().map(|r| { Mutation::DropObject(service::DropObject { schema: r.schema.into_owned(), @@ -120,20 +112,13 @@ async fn drop_tables( .map_err(|e| DataFusionError::Execution(format!("failed to drop tables: {e}")))?; // only after the catalog is updated, we can delete the delta tables - // we don't do this immediately as to not block the client, so we schedule it + // TODO: this should be done in the scheduler. let sys_exec = SystemOperationExec::new(DeleteDeltaTablesOperation::new(plan.tbl_entries.clone()).into()); - - scheduler - .schedule( - Arc::new(sys_exec), - context.clone(), - OutputSink { - batches: Arc::new(LogSink), - errors: Arc::new(LogSink), - }, - ) - .unwrap(); + let _ = sys_exec + .execute(0, context.clone())? + .collect::>() + .await; Ok(new_operation_batch("drop_tables")) }