Skip to content

Commit

Permalink
Fix thread panic when "unreachable" SpawnedTask code is reachable. (#…
Browse files Browse the repository at this point in the history
…12086)

* test: demonstrate that the unreachable in SpawnedTask is reachable

* chore: use workspace tokio and add feature

* fix(12089): SpawnedTask will no longer panic during shutdown

* chore(12089): add new error type for JoinError

* refactor(12089): handle join error when using SpawnedTask::join_unwind

* Revert "chore: use workspace tokio and add feature"

This reverts commit 3010288.

* refactor(12089): update test to avoid the looping and global (to package tests) panic hook manipulation

* refactor(12089): make single conditional for unwind vs no-unwind, and update test for cancellation error
  • Loading branch information
wiedld authored Aug 23, 2024
1 parent 3de50c8 commit 7be9897
Show file tree
Hide file tree
Showing 9 changed files with 77 additions and 12 deletions.
2 changes: 2 additions & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions datafusion/common-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,8 @@ name = "datafusion_common_runtime"
path = "src/lib.rs"

[dependencies]
log = { workspace = true }
tokio = { workspace = true }

[dev-dependencies]
tokio = { version = "1.36", features = ["rt", "rt-multi-thread", "time"] }
42 changes: 38 additions & 4 deletions datafusion/common-runtime/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,18 +60,52 @@ impl<R: 'static> SpawnedTask<R> {
}

/// Joins the task and unwinds the panic if it happens.
pub async fn join_unwind(self) -> R {
self.join().await.unwrap_or_else(|e| {
pub async fn join_unwind(self) -> Result<R, JoinError> {
self.join().await.map_err(|e| {
// `JoinError` can be caused either by panic or cancellation. We have to handle panics:
if e.is_panic() {
std::panic::resume_unwind(e.into_panic());
} else {
// Cancellation may be caused by two reasons:
// 1. Abort is called, but since we consumed `self`, it's not our case (`JoinHandle` not accessible outside).
// 2. The runtime is shutting down.
// So we consider this branch as unreachable.
unreachable!("SpawnedTask was cancelled unexpectedly");
log::warn!("SpawnedTask was polled during shutdown");
e
}
})
}
}

#[cfg(test)]
mod tests {
use super::*;

use std::future::{pending, Pending};

use tokio::runtime::Runtime;

#[tokio::test]
async fn runtime_shutdown() {
let rt = Runtime::new().unwrap();
let task = rt
.spawn(async {
SpawnedTask::spawn(async {
let fut: Pending<()> = pending();
fut.await;
unreachable!("should never return");
})
})
.await
.unwrap();

// caller shutdown their DF runtime (e.g. timeout, error in caller, etc)
rt.shutdown_background();

// race condition
// poll occurs during shutdown (buffered stream poll calls, etc)
assert!(matches!(
task.join_unwind().await,
Err(e) if e.is_cancelled()
));
}
}
1 change: 1 addition & 0 deletions datafusion/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ parquet = { workspace = true, optional = true, default-features = true }
paste = "1.0.15"
pyo3 = { version = "0.21.0", optional = true }
sqlparser = { workspace = true }
tokio = { workspace = true }

[target.'cfg(target_family = "wasm")'.dependencies]
instant = { version = "0.1", features = ["wasm-bindgen"] }
Expand Down
8 changes: 8 additions & 0 deletions datafusion/common/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use arrow::error::ArrowError;
#[cfg(feature = "parquet")]
use parquet::errors::ParquetError;
use sqlparser::parser::ParserError;
use tokio::task::JoinError;

/// Result type for operations that could result in an [DataFusionError]
pub type Result<T, E = DataFusionError> = result::Result<T, E>;
Expand Down Expand Up @@ -112,6 +113,10 @@ pub enum DataFusionError {
/// SQL method, opened a CSV file that is broken, or tried to divide an
/// integer by zero.
Execution(String),
/// [`JoinError`] during execution of the query.
///
/// This error can unoccur for unjoined tasks, such as execution shutdown.
ExecutionJoin(JoinError),
/// Error when resources (such as memory of scratch disk space) are exhausted.
///
/// This error is thrown when a consumer cannot acquire additional memory
Expand Down Expand Up @@ -306,6 +311,7 @@ impl Error for DataFusionError {
DataFusionError::Plan(_) => None,
DataFusionError::SchemaError(e, _) => Some(e),
DataFusionError::Execution(_) => None,
DataFusionError::ExecutionJoin(e) => Some(e),
DataFusionError::ResourcesExhausted(_) => None,
DataFusionError::External(e) => Some(e.as_ref()),
DataFusionError::Context(_, e) => Some(e.as_ref()),
Expand Down Expand Up @@ -418,6 +424,7 @@ impl DataFusionError {
DataFusionError::Configuration(_) => "Invalid or Unsupported Configuration: ",
DataFusionError::SchemaError(_, _) => "Schema error: ",
DataFusionError::Execution(_) => "Execution error: ",
DataFusionError::ExecutionJoin(_) => "ExecutionJoin error: ",
DataFusionError::ResourcesExhausted(_) => "Resources exhausted: ",
DataFusionError::External(_) => "External error: ",
DataFusionError::Context(_, _) => "",
Expand Down Expand Up @@ -453,6 +460,7 @@ impl DataFusionError {
Cow::Owned(format!("{desc}{backtrace}"))
}
DataFusionError::Execution(ref desc) => Cow::Owned(desc.to_string()),
DataFusionError::ExecutionJoin(ref desc) => Cow::Owned(desc.to_string()),
DataFusionError::ResourcesExhausted(ref desc) => Cow::Owned(desc.to_string()),
DataFusionError::External(ref desc) => Cow::Owned(desc.to_string()),
#[cfg(feature = "object_store")]
Expand Down
5 changes: 4 additions & 1 deletion datafusion/core/src/datasource/file_format/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,10 @@ impl DataSink for ArrowFileSink {
}
}

demux_task.join_unwind().await?;
demux_task
.join_unwind()
.await
.map_err(DataFusionError::ExecutionJoin)??;
Ok(row_count as u64)
}
}
Expand Down
18 changes: 14 additions & 4 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -836,7 +836,10 @@ impl DataSink for ParquetSink {
}
}

demux_task.join_unwind().await?;
demux_task
.join_unwind()
.await
.map_err(DataFusionError::ExecutionJoin)??;

Ok(row_count as u64)
}
Expand Down Expand Up @@ -942,7 +945,10 @@ fn spawn_rg_join_and_finalize_task(
let num_cols = column_writer_tasks.len();
let mut finalized_rg = Vec::with_capacity(num_cols);
for task in column_writer_tasks.into_iter() {
let (writer, _col_reservation) = task.join_unwind().await?;
let (writer, _col_reservation) = task
.join_unwind()
.await
.map_err(DataFusionError::ExecutionJoin)??;
let encoded_size = writer.get_estimated_total_bytes();
rg_reservation.grow(encoded_size);
finalized_rg.push(writer.close()?);
Expand Down Expand Up @@ -1070,7 +1076,8 @@ async fn concatenate_parallel_row_groups(
while let Some(task) = serialize_rx.recv().await {
let result = task.join_unwind().await;
let mut rg_out = parquet_writer.next_row_group()?;
let (serialized_columns, mut rg_reservation, _cnt) = result?;
let (serialized_columns, mut rg_reservation, _cnt) =
result.map_err(DataFusionError::ExecutionJoin)??;
for chunk in serialized_columns {
chunk.append_to_row_group(&mut rg_out)?;
rg_reservation.free();
Expand Down Expand Up @@ -1134,7 +1141,10 @@ async fn output_single_parquet_file_parallelized(
)
.await?;

launch_serialization_task.join_unwind().await?;
launch_serialization_task
.join_unwind()
.await
.map_err(DataFusionError::ExecutionJoin)??;
Ok(file_metadata)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,8 +298,8 @@ pub(crate) async fn stateless_multipart_put(
write_coordinator_task.join_unwind(),
demux_task.join_unwind()
);
r1?;
r2?;
r1.map_err(DataFusionError::ExecutionJoin)??;
r2.map_err(DataFusionError::ExecutionJoin)??;

let total_count = rx_row_cnt.await.map_err(|_| {
internal_datafusion_err!("Did not receive row count from write coordinator")
Expand Down
5 changes: 4 additions & 1 deletion datafusion/core/src/datasource/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,9 @@ impl DataSink for StreamWrite {
}
}
drop(sender);
write_task.join_unwind().await
write_task
.join_unwind()
.await
.map_err(DataFusionError::ExecutionJoin)?
}
}

0 comments on commit 7be9897

Please sign in to comment.