Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Remove the RelaychainClient trait (#2068)
Browse files Browse the repository at this point in the history
* Remove the `RelaychainClient` trait

It was just some historical trait that isn't really required anymore. Besides that this pr re-exports
types that are being used by the relay chain interface to make its usage easier.

* Fix warning
  • Loading branch information
bkchr authored Jan 9, 2023
1 parent 62991ea commit ac2dc71
Show file tree
Hide file tree
Showing 6 changed files with 192 additions and 118 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

130 changes: 51 additions & 79 deletions client/consensus/common/src/parachain_consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
// You should have received a copy of the GNU General Public License
// along with Cumulus. If not, see <http://www.gnu.org/licenses/>.

use async_trait::async_trait;
use sc_client_api::{
Backend, BlockBackend, BlockImportNotification, BlockchainEvents, Finalizer, UsageProvider,
};
Expand All @@ -29,9 +28,9 @@ use cumulus_relay_chain_interface::{RelayChainInterface, RelayChainResult};
use polkadot_primitives::v2::{Hash as PHash, Id as ParaId, OccupiedCoreAssumption};

use codec::Decode;
use futures::{channel::mpsc::Sender, select, FutureExt, Stream, StreamExt};
use futures::{channel::mpsc::Sender, pin_mut, select, FutureExt, Stream, StreamExt};

use std::{pin::Pin, sync::Arc, time::Duration};
use std::{sync::Arc, time::Duration};

const LOG_TARGET: &str = "cumulus-consensus";

Expand All @@ -42,29 +41,6 @@ const LOG_TARGET: &str = "cumulus-consensus";
const RECOVERY_DELAY: RecoveryDelay =
RecoveryDelay { min: Duration::ZERO, max: Duration::from_secs(30) };

/// Helper for the relay chain client. This is expected to be a lightweight handle like an `Arc`.
#[async_trait]
pub trait RelaychainClient: Clone + 'static {
/// The error type for interacting with the Polkadot client.
type Error: std::fmt::Debug + Send;

/// A stream that yields head-data for a parachain.
type HeadStream: Stream<Item = Vec<u8>> + Send + Unpin;

/// Get a stream of new best heads for the given parachain.
async fn new_best_heads(&self, para_id: ParaId) -> RelayChainResult<Self::HeadStream>;

/// Get a stream of finalized heads for the given parachain.
async fn finalized_heads(&self, para_id: ParaId) -> RelayChainResult<Self::HeadStream>;

/// Returns the parachain head for the given `para_id` at the given block id.
async fn parachain_head_at(
&self,
at: PHash,
para_id: ParaId,
) -> RelayChainResult<Option<Vec<u8>>>;
}

/// Follow the finalized head of the given parachain.
///
/// For every finalized block of the relay chain, it will get the included parachain header
Expand All @@ -73,17 +49,19 @@ async fn follow_finalized_head<P, Block, B, R>(para_id: ParaId, parachain: Arc<P
where
Block: BlockT,
P: Finalizer<Block, B> + UsageProvider<Block>,
R: RelaychainClient,
R: RelayChainInterface + Clone,
B: Backend<Block>,
{
let mut finalized_heads = match relay_chain.finalized_heads(para_id).await {
let finalized_heads = match finalized_heads(relay_chain, para_id).await {
Ok(finalized_heads_stream) => finalized_heads_stream,
Err(err) => {
tracing::error!(target: LOG_TARGET, error = ?err, "Unable to retrieve finalized heads stream.");
return
},
};

pin_mut!(finalized_heads);

loop {
let finalized_head = if let Some(h) = finalized_heads.next().await {
h
Expand Down Expand Up @@ -152,7 +130,7 @@ pub async fn run_parachain_consensus<P, R, Block, B>(
+ BlockBackend<Block>
+ BlockchainEvents<Block>,
for<'a> &'a P: BlockImport<Block>,
R: RelaychainClient,
R: RelayChainInterface + Clone,
B: Backend<Block>,
{
let follow_new_best = follow_new_best(
Expand All @@ -175,7 +153,7 @@ async fn follow_new_best<P, R, Block, B>(
parachain: Arc<P>,
relay_chain: R,
announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>,
recovery_chan_tx: Option<Sender<RecoveryRequest<Block>>>,
mut recovery_chan_tx: Option<Sender<RecoveryRequest<Block>>>,
) where
Block: BlockT,
P: Finalizer<Block, B>
Expand All @@ -185,20 +163,22 @@ async fn follow_new_best<P, R, Block, B>(
+ BlockBackend<Block>
+ BlockchainEvents<Block>,
for<'a> &'a P: BlockImport<Block>,
R: RelaychainClient,
R: RelayChainInterface + Clone,
B: Backend<Block>,
{
let mut new_best_heads = match relay_chain.new_best_heads(para_id).await {
let new_best_heads = match new_best_heads(relay_chain, para_id).await {
Ok(best_heads_stream) => best_heads_stream.fuse(),
Err(err) => {
tracing::error!(target: LOG_TARGET, error = ?err, "Unable to retrieve best heads stream.");
return
},
};

pin_mut!(new_best_heads);

let mut imported_blocks = parachain.import_notification_stream().fuse();
// The unset best header of the parachain. Will be `Some(_)` when we have imported a relay chain
// block before the parachain block it included. In this case we need to wait for this block to
// block before the associated parachain block. In this case we need to wait for this block to
// be imported to set it as new best.
let mut unset_best_header = None;

Expand All @@ -210,7 +190,7 @@ async fn follow_new_best<P, R, Block, B>(
h,
&*parachain,
&mut unset_best_header,
recovery_chan_tx.clone(),
recovery_chan_tx.as_mut(),
).await,
None => {
tracing::debug!(
Expand Down Expand Up @@ -304,7 +284,7 @@ async fn handle_new_best_parachain_head<Block, P>(
head: Vec<u8>,
parachain: &P,
unset_best_header: &mut Option<Block::Header>,
mut recovery_chan_tx: Option<Sender<RecoveryRequest<Block>>>,
mut recovery_chan_tx: Option<&mut Sender<RecoveryRequest<Block>>>,
) where
Block: BlockT,
P: UsageProvider<Block> + Send + Sync + BlockBackend<Block>,
Expand Down Expand Up @@ -416,50 +396,42 @@ where
}
}

#[async_trait]
impl<RCInterface> RelaychainClient for RCInterface
where
RCInterface: RelayChainInterface + Clone + 'static,
{
type Error = ClientError;

type HeadStream = Pin<Box<dyn Stream<Item = Vec<u8>> + Send>>;

async fn new_best_heads(&self, para_id: ParaId) -> RelayChainResult<Self::HeadStream> {
let relay_chain = self.clone();

let new_best_notification_stream = self
.new_best_notification_stream()
.await?
.filter_map(move |n| {
let relay_chain = relay_chain.clone();
async move { relay_chain.parachain_head_at(n.hash(), para_id).await.ok().flatten() }
})
.boxed();
Ok(new_best_notification_stream)
}
/// Returns a stream that will yield best heads for the given `para_id`.
async fn new_best_heads(
relay_chain: impl RelayChainInterface + Clone,
para_id: ParaId,
) -> RelayChainResult<impl Stream<Item = Vec<u8>>> {
let new_best_notification_stream =
relay_chain.new_best_notification_stream().await?.filter_map(move |n| {
let relay_chain = relay_chain.clone();
async move { parachain_head_at(&relay_chain, n.hash(), para_id).await.ok().flatten() }
});

Ok(new_best_notification_stream)
}

async fn finalized_heads(&self, para_id: ParaId) -> RelayChainResult<Self::HeadStream> {
let relay_chain = self.clone();

let finality_notification_stream = self
.finality_notification_stream()
.await?
.filter_map(move |n| {
let relay_chain = relay_chain.clone();
async move { relay_chain.parachain_head_at(n.hash(), para_id).await.ok().flatten() }
})
.boxed();
Ok(finality_notification_stream)
}
/// Returns a stream that will yield finalized heads for the given `para_id`.
async fn finalized_heads(
relay_chain: impl RelayChainInterface + Clone,
para_id: ParaId,
) -> RelayChainResult<impl Stream<Item = Vec<u8>>> {
let finality_notification_stream =
relay_chain.finality_notification_stream().await?.filter_map(move |n| {
let relay_chain = relay_chain.clone();
async move { parachain_head_at(&relay_chain, n.hash(), para_id).await.ok().flatten() }
});

Ok(finality_notification_stream)
}

async fn parachain_head_at(
&self,
at: PHash,
para_id: ParaId,
) -> RelayChainResult<Option<Vec<u8>>> {
self.persisted_validation_data(at, para_id, OccupiedCoreAssumption::TimedOut)
.await
.map(|s| s.map(|s| s.parent_head.0))
}
/// Returns head of the parachain at the given relay chain block.
async fn parachain_head_at(
relay_chain: &impl RelayChainInterface,
at: PHash,
para_id: ParaId,
) -> RelayChainResult<Option<Vec<u8>>> {
relay_chain
.persisted_validation_data(at, para_id, OccupiedCoreAssumption::TimedOut)
.await
.map(|s| s.map(|s| s.parent_head.0))
}
Loading

0 comments on commit ac2dc71

Please sign in to comment.