Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Add retry mechanism for pov-recovery, fix full-node pov-recovery #2164

Merged
merged 29 commits into from
Feb 9, 2023
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
977da03
Increase delay for pov-recovery
skunert Jan 25, 2023
dbef277
Update client/service/src/lib.rs
skunert Jan 25, 2023
72e7646
Comment
skunert Jan 25, 2023
190b012
FMT
skunert Jan 27, 2023
2278719
Merge branch 'master' into skunert/pov-recovery-increase-delay
skunert Jan 27, 2023
b63782f
Clear waiting_recovery when block is recovered or recovery failed
skunert Jan 31, 2023
3c660da
Introduce recovery queue that preserved insertion order
skunert Jan 31, 2023
8eaec6d
Better error logs
skunert Jan 31, 2023
8da6d8c
Decrease slot duration
skunert Feb 1, 2023
993dba5
Style improvements
skunert Feb 2, 2023
bfbae02
Add option to use unordered queue
skunert Feb 2, 2023
1139b80
Maintain cache of finalized blocks
skunert Feb 3, 2023
18e63a5
Wait for one relay chain slot before recovery
skunert Feb 3, 2023
c3ae356
Make retries testable
skunert Feb 3, 2023
508bee6
fmt
skunert Feb 3, 2023
d40ff11
Improve docs
skunert Feb 3, 2023
be9d523
Improve docs
skunert Feb 6, 2023
2bf0824
Simplify RecoveryQueue
skunert Feb 6, 2023
f744bec
Remove unwanted changes
skunert Feb 6, 2023
92c582b
Merge branch 'master' into skunert/experimental-pov-changes
skunert Feb 6, 2023
2512d09
Adjust to comments
skunert Feb 6, 2023
2af35d9
Apply suggestions from code review
skunert Feb 7, 2023
70ab63b
Move recovery delay into the queue
skunert Feb 8, 2023
0da8638
Check for finalized number
skunert Feb 8, 2023
6f0d8ef
Clean up
skunert Feb 8, 2023
3809eed
Use timer
skunert Feb 9, 2023
5bf500a
Simplify implementation
skunert Feb 8, 2023
49710ce
Revert "Use timer"
skunert Feb 9, 2023
78ccb2a
Properly clear `to_recover` flag
skunert Feb 9, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions client/consensus/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ polkadot-primitives = { git = "https://github.com/paritytech/polkadot", branch =
cumulus-primitives-core = { path = "../../../primitives/core" }
cumulus-relay-chain-interface = { path = "../../relay-chain-interface" }
cumulus-client-pov-recovery = { path = "../../pov-recovery" }
schnellru = "0.2.1"

[dev-dependencies]
futures-timer = "3.0.2"
Expand Down
168 changes: 119 additions & 49 deletions client/consensus/common/src/parachain_consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,73 @@ use sc_client_api::{
Backend, BlockBackend, BlockImportNotification, BlockchainEvents, Finalizer, UsageProvider,
};
use sc_consensus::{BlockImport, BlockImportParams, ForkChoiceStrategy};
use schnellru::{ByLength, LruMap};
use sp_blockchain::Error as ClientError;
use sp_consensus::{BlockOrigin, BlockStatus};
use sp_runtime::traits::{Block as BlockT, Header as HeaderT};

use cumulus_client_pov_recovery::{RecoveryDelay, RecoveryKind, RecoveryRequest};
use cumulus_client_pov_recovery::{RecoveryKind, RecoveryRequest};
use cumulus_relay_chain_interface::{RelayChainInterface, RelayChainResult};

use polkadot_primitives::{Hash as PHash, Id as ParaId, OccupiedCoreAssumption};

use codec::Decode;
use futures::{channel::mpsc::Sender, pin_mut, select, FutureExt, Stream, StreamExt};

use std::{sync::Arc, time::Duration};
use std::sync::Arc;

const LOG_TARGET: &str = "cumulus-consensus";
const FINALIZATION_CACHE_SIZE: u32 = 40;

// Delay range to trigger explicit requests.
// The chosen value doesn't have any special meaning, a random delay within the order of
// seconds in practice should be a good enough to allow a quick recovery without DOSing
// the relay chain.
const RECOVERY_DELAY: RecoveryDelay =
RecoveryDelay { min: Duration::ZERO, max: Duration::from_secs(30) };
fn handle_new_finalized_head<P, Block, B>(
parachain: &Arc<P>,
finalized_head: Vec<u8>,
last_seen_finalized_hashes: &mut LruMap<Block::Hash, ()>,
) where
Block: BlockT,
B: Backend<Block>,
P: Finalizer<Block, B> + UsageProvider<Block> + BlockchainEvents<Block>,
{
let header = match Block::Header::decode(&mut &finalized_head[..]) {
Ok(header) => header,
Err(err) => {
tracing::debug!(
target: LOG_TARGET,
error = ?err,
"Could not decode parachain header while following finalized heads.",
);
return
},
};

let hash = header.hash();

last_seen_finalized_hashes.insert(hash, ());

// Only finalize if we are below the incoming finalized parachain head
if parachain.usage_info().chain.finalized_number < *header.number() {
tracing::debug!(
target: LOG_TARGET,
block_hash = ?hash,
"Attempting to finalize header.",
);
if let Err(e) = parachain.finalize_block(hash, None, true) {
match e {
ClientError::UnknownBlock(_) => tracing::debug!(
target: LOG_TARGET,
block_hash = ?hash,
"Could not finalize block because it is unknown.",
),
_ => tracing::warn!(
target: LOG_TARGET,
error = ?e,
block_hash = ?hash,
"Failed to finalize block",
),
}
}
}
}

/// Follow the finalized head of the given parachain.
///
Expand All @@ -48,57 +93,75 @@ const RECOVERY_DELAY: RecoveryDelay =
async fn follow_finalized_head<P, Block, B, R>(para_id: ParaId, parachain: Arc<P>, relay_chain: R)
where
Block: BlockT,
P: Finalizer<Block, B> + UsageProvider<Block>,
P: Finalizer<Block, B> + UsageProvider<Block> + BlockchainEvents<Block>,
R: RelayChainInterface + Clone,
B: Backend<Block>,
{
let finalized_heads = match finalized_heads(relay_chain, para_id).await {
Ok(finalized_heads_stream) => finalized_heads_stream,
Ok(finalized_heads_stream) => finalized_heads_stream.fuse(),
Err(err) => {
tracing::error!(target: LOG_TARGET, error = ?err, "Unable to retrieve finalized heads stream.");
return
},
};

let mut imported_blocks = parachain.import_notification_stream().fuse();

pin_mut!(finalized_heads);

loop {
let finalized_head = if let Some(h) = finalized_heads.next().await {
h
} else {
tracing::debug!(target: LOG_TARGET, "Stopping following finalized head.");
return
};
// We use this cache to finalize blocks that are imported late.
// For example, a block that has been recovered via PoV-Recovery
// on a full node can have several minutes delay. With this cache
// we have some "memory" of recently finalized blocks.
let mut last_seen_finalized_hashes = LruMap::new(ByLength::new(FINALIZATION_CACHE_SIZE));

let header = match Block::Header::decode(&mut &finalized_head[..]) {
Ok(header) => header,
Err(err) => {
tracing::debug!(
target: LOG_TARGET,
error = ?err,
"Could not decode parachain header while following finalized heads.",
);
continue
loop {
select! {
fin = finalized_heads.next() => {
match fin {
Some(finalized_head) =>
handle_new_finalized_head(&parachain, finalized_head, &mut last_seen_finalized_hashes),
None => {
tracing::debug!(target: LOG_TARGET, "Stopping following finalized head.");
return
}
}
},
};

let hash = header.hash();

// don't finalize the same block multiple times.
if parachain.usage_info().chain.finalized_hash != hash {
if let Err(e) = parachain.finalize_block(hash, None, true) {
match e {
ClientError::UnknownBlock(_) => tracing::debug!(
target: LOG_TARGET,
block_hash = ?hash,
"Could not finalize block because it is unknown.",
),
_ => tracing::warn!(
target: LOG_TARGET,
error = ?e,
block_hash = ?hash,
"Failed to finalize block",
),
imported = imported_blocks.next() => {
match imported {
Some(imported_block) => {
// When we see a block import that is already finalized, we immediately finalize it.
if last_seen_finalized_hashes.peek(&imported_block.hash).is_some() {
davxy marked this conversation as resolved.
Show resolved Hide resolved
tracing::debug!(
target: LOG_TARGET,
skunert marked this conversation as resolved.
Show resolved Hide resolved
block_hash = ?imported_block.hash,
"Setting newly imported block as finalized.",
);

if let Err(e) = parachain.finalize_block(imported_block.hash, None, true) {
match e {
ClientError::UnknownBlock(_) => tracing::debug!(
target: LOG_TARGET,
block_hash = ?imported_block.hash,
"Could not finalize block because it is unknown.",
),
_ => tracing::warn!(
target: LOG_TARGET,
error = ?e,
block_hash = ?imported_block.hash,
"Failed to finalize block",
),
}
}
}
},
None => {
tracing::debug!(
target: LOG_TARGET,
"Stopping following imported blocks.",
);
return
}
}
}
}
Expand Down Expand Up @@ -266,7 +329,11 @@ async fn handle_new_block_imported<Block, P>(
let unset_best_header = unset_best_header_opt
.take()
.expect("We checked above that the value is set; qed");

tracing::debug!(
target: LOG_TARGET,
?unset_hash,
"Importing block as new best for parachain.",
);
import_block_as_new_best(unset_hash, unset_best_header, parachain).await;
},
state => tracing::debug!(
Expand Down Expand Up @@ -315,7 +382,11 @@ async fn handle_new_best_parachain_head<Block, P>(
match parachain.block_status(hash) {
Ok(BlockStatus::InChainWithState) => {
unset_best_header.take();

tracing::debug!(
target: LOG_TARGET,
?hash,
"Importing block as new best for parachain.",
);
import_block_as_new_best(hash, parachain_head, parachain).await;
},
Ok(BlockStatus::InChainPruned) => {
Expand All @@ -338,8 +409,7 @@ async fn handle_new_best_parachain_head<Block, P>(
// Best effort channel to actively encourage block recovery.
// An error here is not fatal; the relay chain continuously re-announces
// the best block, thus we will have other opportunities to retry.
let req =
RecoveryRequest { hash, delay: RECOVERY_DELAY, kind: RecoveryKind::Full };
let req = RecoveryRequest { hash, kind: RecoveryKind::Full };
if let Err(err) = recovery_chan_tx.try_send(req) {
tracing::warn!(
target: LOG_TARGET,
Expand Down
1 change: 1 addition & 0 deletions client/pov-recovery/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ polkadot-primitives = { git = "https://github.com/paritytech/polkadot", branch =
# Cumulus
cumulus-primitives-core = { path = "../../primitives/core" }
cumulus-relay-chain-interface = {path = "../relay-chain-interface"}
async-trait = "0.1.64"

[dev-dependencies]
tokio = { version = "1.24.2", features = ["macros"] }
Expand Down
15 changes: 8 additions & 7 deletions client/pov-recovery/src/active_candidate_recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@ use sp_runtime::traits::Block as BlockT;

use polkadot_node_primitives::AvailableData;
use polkadot_node_subsystem::messages::AvailabilityRecoveryMessage;
use polkadot_overseer::Handle as OverseerHandle;

use futures::{channel::oneshot, stream::FuturesUnordered, Future, FutureExt, StreamExt};

use std::{collections::HashSet, pin::Pin};

use crate::RecoveryHandle;

/// The active candidate recovery.
///
/// This handles the candidate recovery and tracks the activate recoveries.
Expand All @@ -34,12 +35,12 @@ pub(crate) struct ActiveCandidateRecovery<Block: BlockT> {
>,
/// The block hashes of the candidates currently being recovered.
candidates: HashSet<Block::Hash>,
overseer_handle: OverseerHandle,
recovery_handle: Box<dyn RecoveryHandle>,
}

impl<Block: BlockT> ActiveCandidateRecovery<Block> {
pub fn new(overseer_handle: OverseerHandle) -> Self {
Self { recoveries: Default::default(), candidates: Default::default(), overseer_handle }
pub fn new(recovery_handle: Box<dyn RecoveryHandle>) -> Self {
Self { recoveries: Default::default(), candidates: Default::default(), recovery_handle }
}

/// Recover the given `candidate`.
Expand All @@ -50,8 +51,8 @@ impl<Block: BlockT> ActiveCandidateRecovery<Block> {
) {
let (tx, rx) = oneshot::channel();

self.overseer_handle
.send_msg(
self.recovery_handle
.send_recovery_msg(
AvailabilityRecoveryMessage::RecoverAvailableData(
candidate.receipt.clone(),
candidate.session_index,
Expand Down Expand Up @@ -91,7 +92,7 @@ impl<Block: BlockT> ActiveCandidateRecovery<Block> {
}

/// Returns if the given `candidate` is being recovered.
pub fn is_being_recovered(&self, candidate: &Block::Hash) -> bool {
pub fn is_recovering(&self, candidate: &Block::Hash) -> bool {
self.candidates.contains(candidate)
}

Expand Down
Loading