Skip to content

Commit

Permalink
fix: don't schedule the exec, just exec immediately
Browse files Browse the repository at this point in the history
  • Loading branch information
universalmind303 committed Jan 19, 2024
1 parent 64bf58e commit 75ac923
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 53 deletions.
31 changes: 0 additions & 31 deletions crates/sqlexec/src/distexec/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,37 +41,6 @@ pub trait Sink: Send + Sync + Debug {
fn finish(&self, child: usize, partition: usize) -> Result<()>;
}

#[derive(Debug)]
// NOOP that just logs the input
pub struct LogSink;

impl Sink for LogSink {
fn push(&self, input: RecordBatch, child: usize, partition: usize) -> Result<()> {
tracing::debug!(
"DebugSink: child={}, partition={}, batch={:?}",
child,
partition,
input
);
Ok(())
}

fn finish(&self, child: usize, partition: usize) -> Result<()> {
tracing::debug!(
"DebugSink: child={}, partition={} finished",
child,
partition
);
Ok(())
}
}
impl ErrorSink for LogSink {
fn push_error(&self, err: DistExecError, partition: usize) -> Result<()> {
tracing::error!("DebugSink: partition={} error={:?}", partition, err);
Ok(())
}
}

pub trait ErrorSink: Send + Sync + Debug {
fn push_error(&self, err: DistExecError, partition: usize) -> Result<()>;
}
Expand Down
1 change: 0 additions & 1 deletion crates/sqlexec/src/distexec/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ impl Scheduler {
context: Arc<TaskContext>,
output: OutputSink,
) -> Result<()> {
println!("Scheduling plan");
let pipeline = PipelineBuilder::new(plan, context)
.build(output.batches.clone(), output.errors.clone())?;

Expand Down
27 changes: 6 additions & 21 deletions crates/sqlexec/src/planner/physical_plan/drop_tables.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use crate::distexec::pipeline::LogSink;
use crate::distexec::scheduler::{OutputSink, Scheduler};
use crate::planner::logical_plan::OwnedFullObjectReference;

use super::{new_operation_batch, GENERIC_OPERATION_PHYSICAL_SCHEMA};
Expand All @@ -13,7 +11,7 @@ use datafusion::physical_plan::{
stream::RecordBatchStreamAdapter, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
SendableRecordBatchStream, Statistics,
};
use futures::stream;
use futures::{stream, StreamExt};
use protogen::metastore::types::catalog::TableEntry;
use protogen::metastore::types::service::{self, Mutation};
use sqlbuiltins::functions::table::system::remove_delta_tables::DeleteDeltaTablesOperation;
Expand Down Expand Up @@ -94,17 +92,11 @@ async fn drop_tables(
context: Arc<TaskContext>,
plan: DropTablesExec,
) -> DataFusionResult<RecordBatch> {
println!("Executing DropTablesExec");
let mutator = context
.session_config()
.get_extension::<CatalogMutator>()
.expect("context should have catalog mutator");

let scheduler = context
.session_config()
.get_extension::<Scheduler>()
.expect("context should have scheduler");

let drops = plan.tbl_references.into_iter().map(|r| {
Mutation::DropObject(service::DropObject {
schema: r.schema.into_owned(),
Expand All @@ -120,20 +112,13 @@ async fn drop_tables(
.map_err(|e| DataFusionError::Execution(format!("failed to drop tables: {e}")))?;

// only after the catalog is updated, we can delete the delta tables
// we don't do this immediately as to not block the client, so we schedule it
// TODO: this should be done in the scheduler.
let sys_exec =
SystemOperationExec::new(DeleteDeltaTablesOperation::new(plan.tbl_entries.clone()).into());

scheduler
.schedule(
Arc::new(sys_exec),
context.clone(),
OutputSink {
batches: Arc::new(LogSink),
errors: Arc::new(LogSink),
},
)
.unwrap();
let _ = sys_exec
.execute(0, context.clone())?
.collect::<Vec<_>>()
.await;

Ok(new_operation_batch("drop_tables"))
}

0 comments on commit 75ac923

Please sign in to comment.