Skip to content

Commit

Permalink
Merge pull request #2030 from subspace/gemini-3f-backport-fix-tokio-r…
Browse files Browse the repository at this point in the history
…e-entrance

Gemini 3f backport: Fix tokio re-entrance due to rayon thread pool behavior
  • Loading branch information
nazar-pc authored Oct 1, 2023
2 parents c45197a + d373b89 commit b2ad9ad
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use subspace_farmer::single_disk_farm::{
use subspace_farmer::utils::farmer_piece_getter::FarmerPieceGetter;
use subspace_farmer::utils::piece_validator::SegmentCommitmentPieceValidator;
use subspace_farmer::utils::readers_and_pieces::ReadersAndPieces;
use subspace_farmer::utils::run_future_in_dedicated_thread;
use subspace_farmer::utils::{run_future_in_dedicated_thread, tokio_rayon_spawn_handler};
use subspace_farmer::{Identity, NodeClient, NodeRpcClient};
use subspace_farmer_components::plotting::PlottedSector;
use subspace_networking::libp2p::identity::{ed25519, Keypair};
Expand Down Expand Up @@ -181,18 +181,21 @@ where
ThreadPoolBuilder::new()
.thread_name(move |thread_index| format!("farming#{thread_index}"))
.num_threads(farming_thread_pool_size)
.spawn_handler(tokio_rayon_spawn_handler())
.build()?,
);
let plotting_thread_pool = Arc::new(
ThreadPoolBuilder::new()
.thread_name(move |thread_index| format!("plotting#{thread_index}"))
.num_threads(plotting_thread_pool_size)
.spawn_handler(tokio_rayon_spawn_handler())
.build()?,
);
let replotting_thread_pool = Arc::new(
ThreadPoolBuilder::new()
.thread_name(move |thread_index| format!("replotting#{thread_index}"))
.num_threads(replotting_thread_pool_size)
.spawn_handler(tokio_rayon_spawn_handler())
.build()?,
);

Expand Down
39 changes: 13 additions & 26 deletions crates/subspace-farmer/src/single_disk_farm/plotting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,9 @@ where
let piece_getter = piece_getter.clone();
let kzg = kzg.clone();
let erasure_coding = erasure_coding.clone();
let handle = Handle::current();

if replotting {
replotting_thread_pool.install(move || {
let plotting_fn = move || {
tokio::task::block_in_place(move || {
let plot_sector_fut = plot_sector::<_, PosTable>(
&public_key,
sector_index,
Expand All @@ -203,30 +202,18 @@ where
&mut table_generator,
);

handle.block_on(plot_sector_fut).map(|plotted_sector| {
(sector, sector_metadata, table_generator, plotted_sector)
})
})?
} else {
plotting_thread_pool.install(move || {
let plot_sector_fut = plot_sector::<_, PosTable>(
&public_key,
sector_index,
&piece_getter,
PieceGetterRetryPolicy::Limited(PIECE_GETTER_RETRY_NUMBER.get()),
&farmer_app_info.protocol_info,
&kzg,
&erasure_coding,
pieces_in_sector,
&mut sector,
&mut sector_metadata,
&mut table_generator,
);
Handle::current()
.block_on(plot_sector_fut)
.map(|plotted_sector| {
(sector, sector_metadata, table_generator, plotted_sector)
})
})
};

handle.block_on(plot_sector_fut).map(|plotted_sector| {
(sector, sector_metadata, table_generator, plotted_sector)
})
})?
if replotting {
replotting_thread_pool.install(plotting_fn)?
} else {
plotting_thread_pool.install(plotting_fn)?
}
};

Expand Down
25 changes: 25 additions & 0 deletions crates/subspace-farmer/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use futures::channel::oneshot;
use futures::channel::oneshot::Canceled;
use futures::future::{Either, Fuse, FusedFuture};
use futures::FutureExt;
use rayon::ThreadBuilder;
use std::future::Future;
use std::ops::Deref;
use std::pin::Pin;
Expand Down Expand Up @@ -113,3 +114,27 @@ where
result
})
}

/// This function is supposed to be used with [`rayon::ThreadPoolBuilder::spawn_handler()`] to
/// inherit current tokio runtime.
pub fn tokio_rayon_spawn_handler() -> impl FnMut(ThreadBuilder) -> io::Result<()> {
let handle = Handle::current();

move |thread: ThreadBuilder| {
let mut b = thread::Builder::new();
if let Some(name) = thread.name() {
b = b.name(name.to_owned());
}
if let Some(stack_size) = thread.stack_size() {
b = b.stack_size(stack_size);
}

let handle = handle.clone();
b.spawn(move || {
let _guard = handle.enter();

tokio::task::block_in_place(|| thread.run())
})?;
Ok(())
}
}

0 comments on commit b2ad9ad

Please sign in to comment.