Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Add retry mechanism for pov-recovery, fix full-node pov-recovery (#2164)
Browse files Browse the repository at this point in the history
* Increase delay for pov-recovery

* Update client/service/src/lib.rs

Co-authored-by: Bastian Köcher <git@kchr.de>

* Comment

* FMT

* Clear waiting_recovery when block is recovered or recovery failed

* Introduce recovery queue that preserved insertion order

* Better error logs

* Decrease slot duration

* Style improvements

* Add option to use unordered queue

* Maintain cache of finalized blocks

* Wait for one relay chain slot before recovery

* Make retries testable

* fmt

* Improve docs

* Improve docs

* Simplify RecoveryQueue

* Remove unwanted changes

* Adjust to comments

* Apply suggestions from code review

Co-authored-by: Bastian Köcher <git@kchr.de>

* Move recovery delay into the queue

* Check for finalized number

* Clean up

* Use timer

Co-authored-by: Bastian Köcher <git@kchr.de>

* Simplify implementation

* Revert "Use timer"

This reverts commit 3809eed.

* Properly clear `to_recover` flag

---------

Co-authored-by: Bastian Köcher <git@kchr.de>
  • Loading branch information
skunert and bkchr authored Feb 9, 2023
1 parent b4d0992 commit afcfd24
Show file tree
Hide file tree
Showing 15 changed files with 420 additions and 162 deletions.
9 changes: 7 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions client/consensus/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ polkadot-primitives = { git = "https://github.com/paritytech/polkadot", branch =
cumulus-primitives-core = { path = "../../../primitives/core" }
cumulus-relay-chain-interface = { path = "../../relay-chain-interface" }
cumulus-client-pov-recovery = { path = "../../pov-recovery" }
schnellru = "0.2.1"

[dev-dependencies]
futures-timer = "3.0.2"
Expand Down
168 changes: 119 additions & 49 deletions client/consensus/common/src/parachain_consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,73 @@ use sc_client_api::{
Backend, BlockBackend, BlockImportNotification, BlockchainEvents, Finalizer, UsageProvider,
};
use sc_consensus::{BlockImport, BlockImportParams, ForkChoiceStrategy};
use schnellru::{ByLength, LruMap};
use sp_blockchain::Error as ClientError;
use sp_consensus::{BlockOrigin, BlockStatus};
use sp_runtime::traits::{Block as BlockT, Header as HeaderT};

use cumulus_client_pov_recovery::{RecoveryDelay, RecoveryKind, RecoveryRequest};
use cumulus_client_pov_recovery::{RecoveryKind, RecoveryRequest};
use cumulus_relay_chain_interface::{RelayChainInterface, RelayChainResult};

use polkadot_primitives::{Hash as PHash, Id as ParaId, OccupiedCoreAssumption};

use codec::Decode;
use futures::{channel::mpsc::Sender, pin_mut, select, FutureExt, Stream, StreamExt};

use std::{sync::Arc, time::Duration};
use std::sync::Arc;

const LOG_TARGET: &str = "cumulus-consensus";
const FINALIZATION_CACHE_SIZE: u32 = 40;

// Delay range to trigger explicit requests.
// The chosen value doesn't have any special meaning, a random delay within the order of
// seconds in practice should be a good enough to allow a quick recovery without DOSing
// the relay chain.
const RECOVERY_DELAY: RecoveryDelay =
RecoveryDelay { min: Duration::ZERO, max: Duration::from_secs(30) };
fn handle_new_finalized_head<P, Block, B>(
parachain: &Arc<P>,
finalized_head: Vec<u8>,
last_seen_finalized_hashes: &mut LruMap<Block::Hash, ()>,
) where
Block: BlockT,
B: Backend<Block>,
P: Finalizer<Block, B> + UsageProvider<Block> + BlockchainEvents<Block>,
{
let header = match Block::Header::decode(&mut &finalized_head[..]) {
Ok(header) => header,
Err(err) => {
tracing::debug!(
target: LOG_TARGET,
error = ?err,
"Could not decode parachain header while following finalized heads.",
);
return
},
};

let hash = header.hash();

last_seen_finalized_hashes.insert(hash, ());

// Only finalize if we are below the incoming finalized parachain head
if parachain.usage_info().chain.finalized_number < *header.number() {
tracing::debug!(
target: LOG_TARGET,
block_hash = ?hash,
"Attempting to finalize header.",
);
if let Err(e) = parachain.finalize_block(hash, None, true) {
match e {
ClientError::UnknownBlock(_) => tracing::debug!(
target: LOG_TARGET,
block_hash = ?hash,
"Could not finalize block because it is unknown.",
),
_ => tracing::warn!(
target: LOG_TARGET,
error = ?e,
block_hash = ?hash,
"Failed to finalize block",
),
}
}
}
}

/// Follow the finalized head of the given parachain.
///
Expand All @@ -48,57 +93,75 @@ const RECOVERY_DELAY: RecoveryDelay =
async fn follow_finalized_head<P, Block, B, R>(para_id: ParaId, parachain: Arc<P>, relay_chain: R)
where
Block: BlockT,
P: Finalizer<Block, B> + UsageProvider<Block>,
P: Finalizer<Block, B> + UsageProvider<Block> + BlockchainEvents<Block>,
R: RelayChainInterface + Clone,
B: Backend<Block>,
{
let finalized_heads = match finalized_heads(relay_chain, para_id).await {
Ok(finalized_heads_stream) => finalized_heads_stream,
Ok(finalized_heads_stream) => finalized_heads_stream.fuse(),
Err(err) => {
tracing::error!(target: LOG_TARGET, error = ?err, "Unable to retrieve finalized heads stream.");
return
},
};

let mut imported_blocks = parachain.import_notification_stream().fuse();

pin_mut!(finalized_heads);

loop {
let finalized_head = if let Some(h) = finalized_heads.next().await {
h
} else {
tracing::debug!(target: LOG_TARGET, "Stopping following finalized head.");
return
};
// We use this cache to finalize blocks that are imported late.
// For example, a block that has been recovered via PoV-Recovery
// on a full node can have several minutes delay. With this cache
// we have some "memory" of recently finalized blocks.
let mut last_seen_finalized_hashes = LruMap::new(ByLength::new(FINALIZATION_CACHE_SIZE));

let header = match Block::Header::decode(&mut &finalized_head[..]) {
Ok(header) => header,
Err(err) => {
tracing::debug!(
target: LOG_TARGET,
error = ?err,
"Could not decode parachain header while following finalized heads.",
);
continue
loop {
select! {
fin = finalized_heads.next() => {
match fin {
Some(finalized_head) =>
handle_new_finalized_head(&parachain, finalized_head, &mut last_seen_finalized_hashes),
None => {
tracing::debug!(target: LOG_TARGET, "Stopping following finalized head.");
return
}
}
},
};

let hash = header.hash();

// don't finalize the same block multiple times.
if parachain.usage_info().chain.finalized_hash != hash {
if let Err(e) = parachain.finalize_block(hash, None, true) {
match e {
ClientError::UnknownBlock(_) => tracing::debug!(
target: LOG_TARGET,
block_hash = ?hash,
"Could not finalize block because it is unknown.",
),
_ => tracing::warn!(
target: LOG_TARGET,
error = ?e,
block_hash = ?hash,
"Failed to finalize block",
),
imported = imported_blocks.next() => {
match imported {
Some(imported_block) => {
// When we see a block import that is already finalized, we immediately finalize it.
if last_seen_finalized_hashes.peek(&imported_block.hash).is_some() {
tracing::debug!(
target: LOG_TARGET,
block_hash = ?imported_block.hash,
"Setting newly imported block as finalized.",
);

if let Err(e) = parachain.finalize_block(imported_block.hash, None, true) {
match e {
ClientError::UnknownBlock(_) => tracing::debug!(
target: LOG_TARGET,
block_hash = ?imported_block.hash,
"Could not finalize block because it is unknown.",
),
_ => tracing::warn!(
target: LOG_TARGET,
error = ?e,
block_hash = ?imported_block.hash,
"Failed to finalize block",
),
}
}
}
},
None => {
tracing::debug!(
target: LOG_TARGET,
"Stopping following imported blocks.",
);
return
}
}
}
}
Expand Down Expand Up @@ -266,7 +329,11 @@ async fn handle_new_block_imported<Block, P>(
let unset_best_header = unset_best_header_opt
.take()
.expect("We checked above that the value is set; qed");

tracing::debug!(
target: LOG_TARGET,
?unset_hash,
"Importing block as new best for parachain.",
);
import_block_as_new_best(unset_hash, unset_best_header, parachain).await;
},
state => tracing::debug!(
Expand Down Expand Up @@ -315,7 +382,11 @@ async fn handle_new_best_parachain_head<Block, P>(
match parachain.block_status(hash) {
Ok(BlockStatus::InChainWithState) => {
unset_best_header.take();

tracing::debug!(
target: LOG_TARGET,
?hash,
"Importing block as new best for parachain.",
);
import_block_as_new_best(hash, parachain_head, parachain).await;
},
Ok(BlockStatus::InChainPruned) => {
Expand All @@ -338,8 +409,7 @@ async fn handle_new_best_parachain_head<Block, P>(
// Best effort channel to actively encourage block recovery.
// An error here is not fatal; the relay chain continuously re-announces
// the best block, thus we will have other opportunities to retry.
let req =
RecoveryRequest { hash, delay: RECOVERY_DELAY, kind: RecoveryKind::Full };
let req = RecoveryRequest { hash, kind: RecoveryKind::Full };
if let Err(err) = recovery_chan_tx.try_send(req) {
tracing::warn!(
target: LOG_TARGET,
Expand Down
1 change: 1 addition & 0 deletions client/pov-recovery/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ polkadot-primitives = { git = "https://github.com/paritytech/polkadot", branch =
# Cumulus
cumulus-primitives-core = { path = "../../primitives/core" }
cumulus-relay-chain-interface = {path = "../relay-chain-interface"}
async-trait = "0.1.64"

[dev-dependencies]
tokio = { version = "1.25.0", features = ["macros"] }
Expand Down
18 changes: 7 additions & 11 deletions client/pov-recovery/src/active_candidate_recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@ use sp_runtime::traits::Block as BlockT;

use polkadot_node_primitives::AvailableData;
use polkadot_node_subsystem::messages::AvailabilityRecoveryMessage;
use polkadot_overseer::Handle as OverseerHandle;

use futures::{channel::oneshot, stream::FuturesUnordered, Future, FutureExt, StreamExt};

use std::{collections::HashSet, pin::Pin};

use crate::RecoveryHandle;

/// The active candidate recovery.
///
/// This handles the candidate recovery and tracks the activate recoveries.
Expand All @@ -34,12 +35,12 @@ pub(crate) struct ActiveCandidateRecovery<Block: BlockT> {
>,
/// The block hashes of the candidates currently being recovered.
candidates: HashSet<Block::Hash>,
overseer_handle: OverseerHandle,
recovery_handle: Box<dyn RecoveryHandle>,
}

impl<Block: BlockT> ActiveCandidateRecovery<Block> {
pub fn new(overseer_handle: OverseerHandle) -> Self {
Self { recoveries: Default::default(), candidates: Default::default(), overseer_handle }
pub fn new(recovery_handle: Box<dyn RecoveryHandle>) -> Self {
Self { recoveries: Default::default(), candidates: Default::default(), recovery_handle }
}

/// Recover the given `candidate`.
Expand All @@ -50,8 +51,8 @@ impl<Block: BlockT> ActiveCandidateRecovery<Block> {
) {
let (tx, rx) = oneshot::channel();

self.overseer_handle
.send_msg(
self.recovery_handle
.send_recovery_msg(
AvailabilityRecoveryMessage::RecoverAvailableData(
candidate.receipt.clone(),
candidate.session_index,
Expand Down Expand Up @@ -90,11 +91,6 @@ impl<Block: BlockT> ActiveCandidateRecovery<Block> {
);
}

/// Returns if the given `candidate` is being recovered.
pub fn is_being_recovered(&self, candidate: &Block::Hash) -> bool {
self.candidates.contains(candidate)
}

/// Waits for the next recovery.
///
/// If the returned [`AvailableData`] is `None`, it means that the recovery failed.
Expand Down
Loading

0 comments on commit afcfd24

Please sign in to comment.