Skip to content

Commit

Permalink
refactor: Remove the callback or sending apply-chunks-done message an…
Browse files Browse the repository at this point in the history
…d explicitly propagate Sender (#11012)

We propagate `DoneApplyChunkCallback` throughout the apply-chunks code,
the callback is invoked when apply-chunks is finished. This callback
today only sends the `ApplyChunksDoneMessage` to the ClientActors. This
change removes the indirection by explicitly sending the Sender for the
message instead of hiding it behind a callback. In some cases, the
callback is no-op (eg. catchup logic), in these cases a no-op function
is replaced with None.

As part of this change, we move ApplyChunksDoneMessage from client to
chain.
  • Loading branch information
tayfunelmas authored Apr 18, 2024
1 parent fc55791 commit 73855ff
Show file tree
Hide file tree
Showing 16 changed files with 90 additions and 141 deletions.
7 changes: 0 additions & 7 deletions chain/chain/src/block_processing_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,6 @@ pub struct BlockProcessingArtifact {
pub invalid_chunks: Vec<ShardChunkHeader>,
}

/// This struct defines the callback function that will be called after apply chunks are finished
/// for each block. Multiple functions that might trigger the start processing of new blocks has
/// this as an argument. Caller of these functions must note that this callback can be called multiple
/// times, for different blocks, because these functions may trigger the processing of more than
/// one block.
pub type DoneApplyChunkCallback = Arc<dyn Fn(CryptoHash) -> () + Send + Sync + 'static>;

#[derive(Debug)]
pub struct BlockNotInPoolError;

Expand Down
54 changes: 29 additions & 25 deletions chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::block_processing_utils::{
BlockPreprocessInfo, BlockProcessingArtifact, BlocksInProcessing, DoneApplyChunkCallback,
BlockPreprocessInfo, BlockProcessingArtifact, BlocksInProcessing,
};
use crate::blocks_delay_tracker::BlocksDelayTracker;
use crate::chain_update::ChainUpdate;
Expand Down Expand Up @@ -132,6 +132,13 @@ enum ApplyChunksMode {
NotCaughtUp,
}

/// `ApplyChunksDoneMessage` is a message that signals the finishing of applying chunks of a block.
/// Upon receiving this message, ClientActors know that it's time to finish processing the blocks that
/// just finished applying chunks.
#[derive(actix::Message, Debug)]
#[rtype(result = "()")]
pub struct ApplyChunksDoneMessage;

/// Contains information for missing chunks in a block
pub struct BlockMissingChunks {
/// previous block hash
Expand Down Expand Up @@ -1334,17 +1341,17 @@ impl Chain {
/// these blocks that are ready.
/// `block_processing_artifacts`: Callers can pass an empty object or an existing BlockProcessingArtifact.
/// This function will add the effect from processing this block to there.
/// `apply_chunks_done_callback`: This callback will be called after apply_chunks are finished
/// `apply_chunks_done_sender`: An ApplyChunksDoneMessage message will be sent via this sender after apply_chunks is finished
/// (so it also happens asynchronously in the rayon thread pool). Callers can
/// use this callback as a way to receive notifications when apply chunks are done
/// use this sender as a way to receive notifications when apply chunks are done
/// so it can call postprocess_ready_blocks.
pub fn start_process_block_async(
&mut self,
me: &Option<AccountId>,
block: MaybeValidated<Block>,
provenance: Provenance,
block_processing_artifacts: &mut BlockProcessingArtifact,
apply_chunks_done_callback: DoneApplyChunkCallback,
apply_chunks_done_sender: Option<near_async::messaging::Sender<ApplyChunksDoneMessage>>,
) -> Result<(), Error> {
let _span =
debug_span!(target: "chain", "start_process_block_async", ?provenance).entered();
Expand All @@ -1358,7 +1365,7 @@ impl Chain {
block,
provenance,
block_processing_artifacts,
apply_chunks_done_callback,
apply_chunks_done_sender,
block_received_time,
);

Expand Down Expand Up @@ -1386,7 +1393,7 @@ impl Chain {
&mut self,
me: &Option<AccountId>,
block_processing_artifacts: &mut BlockProcessingArtifact,
apply_chunks_done_callback: DoneApplyChunkCallback,
apply_chunks_done_sender: Option<near_async::messaging::Sender<ApplyChunksDoneMessage>>,
) -> (Vec<AcceptedBlock>, HashMap<CryptoHash, Error>) {
let _span = debug_span!(target: "chain", "postprocess_ready_blocks_chain").entered();
let mut accepted_blocks = vec![];
Expand All @@ -1397,7 +1404,7 @@ impl Chain {
block_hash,
apply_result,
block_processing_artifacts,
apply_chunks_done_callback.clone(),
apply_chunks_done_sender.clone(),
) {
Err(e) => {
errors.insert(block_hash, e);
Expand Down Expand Up @@ -1558,7 +1565,7 @@ impl Chain {
me: &Option<AccountId>,
sync_hash: CryptoHash,
block_processing_artifacts: &mut BlockProcessingArtifact,
apply_chunks_done_callback: DoneApplyChunkCallback,
apply_chunks_done_sender: Option<near_async::messaging::Sender<ApplyChunksDoneMessage>>,
) -> Result<(), Error> {
let _span = tracing::debug_span!(target: "sync", "reset_heads_post_state_sync").entered();
// Get header we were syncing into.
Expand All @@ -1583,7 +1590,7 @@ impl Chain {
// Check if there are any orphans unlocked by this state sync.
// We can't fail beyond this point because the caller will not process accepted blocks
// and the blocks with missing chunks if this method fails
self.check_orphans(me, hash, block_processing_artifacts, apply_chunks_done_callback);
self.check_orphans(me, hash, block_processing_artifacts, apply_chunks_done_sender);
Ok(())
}

Expand All @@ -1595,7 +1602,7 @@ impl Chain {
block: MaybeValidated<Block>,
provenance: Provenance,
block_processing_artifact: &mut BlockProcessingArtifact,
apply_chunks_done_callback: DoneApplyChunkCallback,
apply_chunks_done_sender: Option<near_async::messaging::Sender<ApplyChunksDoneMessage>>,
block_received_time: Instant,
) -> Result<(), Error> {
let block_height = block.header().height();
Expand Down Expand Up @@ -1712,22 +1719,22 @@ impl Chain {
block_height,
apply_chunk_work,
apply_chunks_done_marker,
apply_chunks_done_callback.clone(),
apply_chunks_done_sender,
);

Ok(())
}

/// Applying chunks async by starting the work at the rayon thread pool
/// `apply_chunks_done_marker`: a marker that will be set to true once applying chunks is finished
/// `apply_chunks_done_callback`: a callback that will be called once applying chunks is finished
/// `apply_chunks_done_sender`: a sender to send a ApplyChunksDoneMessage message once applying chunks is finished
fn schedule_apply_chunks(
&self,
block_hash: CryptoHash,
block_height: BlockHeight,
work: Vec<UpdateShardJob>,
apply_chunks_done_marker: Arc<OnceCell<()>>,
apply_chunks_done_callback: DoneApplyChunkCallback,
apply_chunks_done_sender: Option<near_async::messaging::Sender<ApplyChunksDoneMessage>>,
) {
let sc = self.apply_chunks_sender.clone();
self.apply_chunks_spawner.spawn("apply_chunks", move || {
Expand All @@ -1740,7 +1747,9 @@ impl Chain {
// This should never happen, if it does, it means there is a bug in our code.
log_assert!(false, "apply chunks are called twice for block {block_hash:?}");
}
apply_chunks_done_callback(block_hash);
if let Some(sender) = apply_chunks_done_sender {
sender.send(ApplyChunksDoneMessage {});
}
});
}

Expand Down Expand Up @@ -1777,7 +1786,7 @@ impl Chain {
block_hash: CryptoHash,
apply_results: Vec<(ShardId, Result<ShardUpdateResult, Error>)>,
block_processing_artifacts: &mut BlockProcessingArtifact,
apply_chunks_done_callback: DoneApplyChunkCallback,
apply_chunks_done_sender: Option<near_async::messaging::Sender<ApplyChunksDoneMessage>>,
) -> Result<AcceptedBlock, Error> {
let timer = metrics::BLOCK_POSTPROCESSING_TIME.start_timer();
let (block, block_preprocess_info) =
Expand Down Expand Up @@ -1894,12 +1903,7 @@ impl Chain {
block_start_processing_time,
);

self.check_orphans(
me,
*block.hash(),
block_processing_artifacts,
apply_chunks_done_callback,
);
self.check_orphans(me, *block.hash(), block_processing_artifacts, apply_chunks_done_sender);

// Determine the block status of this block (whether it is a side fork and updates the chain head)
// Block status is needed in Client::on_block_accepted_with_optional_chunk_produce to
Expand Down Expand Up @@ -2199,7 +2203,7 @@ impl Chain {
&mut self,
me: &Option<AccountId>,
block_processing_artifact: &mut BlockProcessingArtifact,
apply_chunks_done_callback: DoneApplyChunkCallback,
apply_chunks_done_sender: Option<near_async::messaging::Sender<ApplyChunksDoneMessage>>,
) {
let blocks = self.blocks_with_missing_chunks.ready_blocks();
if !blocks.is_empty() {
Expand All @@ -2213,7 +2217,7 @@ impl Chain {
block.block,
block.provenance,
block_processing_artifact,
apply_chunks_done_callback.clone(),
apply_chunks_done_sender.clone(),
);
match res {
Ok(_) => {
Expand Down Expand Up @@ -3025,7 +3029,7 @@ impl Chain {
me: &Option<AccountId>,
epoch_first_block: &CryptoHash,
block_processing_artifacts: &mut BlockProcessingArtifact,
apply_chunks_done_callback: DoneApplyChunkCallback,
apply_chunks_done_sender: Option<near_async::messaging::Sender<ApplyChunksDoneMessage>>,
affected_blocks: &[CryptoHash],
) -> Result<(), Error> {
debug!(
Expand Down Expand Up @@ -3062,7 +3066,7 @@ impl Chain {
me,
*hash,
block_processing_artifacts,
apply_chunks_done_callback.clone(),
apply_chunks_done_sender.clone(),
);
}

Expand Down
2 changes: 1 addition & 1 deletion chain/chain/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
pub use block_processing_utils::{BlockProcessingArtifact, DoneApplyChunkCallback};
pub use block_processing_utils::BlockProcessingArtifact;
pub use chain::{check_known, collect_receipts, Chain};
pub use chain_update::ChainUpdate;
pub use doomslug::{Doomslug, DoomslugBlockProductionReadiness, DoomslugThresholdMode};
Expand Down
8 changes: 5 additions & 3 deletions chain/chain/src/orphan.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use crate::chain::ApplyChunksDoneMessage;
use near_async::messaging::Sender;
use near_async::time::{Duration, Instant};
use near_chain_primitives::Error;
use near_primitives::block::Block;
Expand All @@ -10,7 +12,7 @@ use std::fmt::{Debug, Formatter};
use tracing::{debug, debug_span};

use crate::missing_chunks::BlockLike;
use crate::{metrics, BlockProcessingArtifact, Chain, DoneApplyChunkCallback, Provenance};
use crate::{metrics, BlockProcessingArtifact, Chain, Provenance};

/// Maximum number of orphans chain can store.
const MAX_ORPHAN_SIZE: usize = 1024;
Expand Down Expand Up @@ -362,7 +364,7 @@ impl Chain {
me: &Option<AccountId>,
prev_hash: CryptoHash,
block_processing_artifacts: &mut BlockProcessingArtifact,
apply_chunks_done_callback: DoneApplyChunkCallback,
apply_chunks_done_sender: Option<Sender<ApplyChunksDoneMessage>>,
) {
let _span = debug_span!(
target: "chain",
Expand Down Expand Up @@ -393,7 +395,7 @@ impl Chain {
orphan.block,
orphan.provenance,
block_processing_artifacts,
apply_chunks_done_callback.clone(),
apply_chunks_done_sender.clone(),
);
if let Err(err) = res {
debug!(target: "chain", "Orphan {:?} declined, error: {:?}", block_hash, err);
Expand Down
10 changes: 2 additions & 8 deletions chain/chain/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,16 +107,10 @@ pub fn process_block_sync(
block_processing_artifacts: &mut BlockProcessingArtifact,
) -> Result<Vec<AcceptedBlock>, Error> {
let block_hash = *block.hash();
chain.start_process_block_async(
me,
block,
provenance,
block_processing_artifacts,
Arc::new(|_| {}),
)?;
chain.start_process_block_async(me, block, provenance, block_processing_artifacts, None)?;
wait_for_block_in_processing(chain, &block_hash).unwrap();
let (accepted_blocks, errors) =
chain.postprocess_ready_blocks(me, block_processing_artifacts, Arc::new(|_| {}));
chain.postprocess_ready_blocks(me, block_processing_artifacts, None);
// This is in test, we should never get errors when postprocessing blocks
debug_assert!(errors.is_empty());
Ok(accepted_blocks)
Expand Down
7 changes: 1 addition & 6 deletions chain/chain/src/tests/simple_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use near_primitives::hash::CryptoHash;
use near_primitives::test_utils::TestBlockBuilder;
use near_primitives::version::PROTOCOL_VERSION;
use num_rational::Ratio;
use std::sync::Arc;

#[test]
fn build_chain() {
Expand Down Expand Up @@ -101,11 +100,7 @@ fn build_chain_with_orphans() {
);
chain.process_block_test(&None, blocks.pop().unwrap()).unwrap();
while wait_for_all_blocks_in_processing(&mut chain) {
chain.postprocess_ready_blocks(
&None,
&mut BlockProcessingArtifact::default(),
Arc::new(|_| {}),
);
chain.postprocess_ready_blocks(&None, &mut BlockProcessingArtifact::default(), None);
}
assert_eq!(chain.head().unwrap().height, 10);
assert_matches!(
Expand Down
Loading

0 comments on commit 73855ff

Please sign in to comment.