Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement BlocksClient for working with blocks #671

Merged
merged 23 commits into from
Oct 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
738cd82
rpc: Fill in any missing finalized blocks
lexnv Sep 29, 2022
453ca83
tests: Move fill blocks test to RPC location
lexnv Sep 29, 2022
8e919f7
events: Remove the fill in strategy
lexnv Sep 29, 2022
4985682
Merge remote-tracking branch 'origin/master' into lexnv/fill_blocks
lexnv Sep 29, 2022
65a2960
blocks: Introduce blocks client
lexnv Sep 30, 2022
bc8ec5c
client: Enable the block API
lexnv Sep 30, 2022
e1892c7
blocks: Simplify `subscribe_finalized_headers` method
lexnv Sep 30, 2022
be1a20c
tests: Add tests for `subscribe_finalized_headers`
lexnv Sep 30, 2022
2250701
blocks: Implement `subscribe_headers`
lexnv Sep 30, 2022
a86237a
tests: Add tests for `subscribe_headers`
lexnv Sep 30, 2022
15b0478
tests: Move `missing_block_headers_will_be_filled_in` to blocks
lexnv Sep 30, 2022
af98f7f
events: Use the new subscribe to blocks
lexnv Sep 30, 2022
cea372e
Merge remote-tracking branch 'origin/master' into lexnv/fill_blocks
lexnv Oct 4, 2022
64dfd79
blocks: Change API to return future similar to events
lexnv Oct 4, 2022
2b5066b
events: Use blocks API for subscribing to blocks
lexnv Oct 4, 2022
c069473
Update subxt/src/blocks/blocks_client.rs
lexnv Oct 4, 2022
1a70b4a
blocks: Simplify docs for `subscribe_finalized_headers`
lexnv Oct 4, 2022
dc9a435
blocks: Use `PhantomDataSendSync` to avoid other bounds on `T: Config`
lexnv Oct 4, 2022
bedc6a4
blocks: Add docs for best blocks
lexnv Oct 4, 2022
75bc024
blocks: Avoid one clone for the `client.rpc()`
lexnv Oct 4, 2022
bf5863f
Update testing/integration-tests/src/blocks/mod.rs
lexnv Oct 4, 2022
ae7f5ff
blocks: Improve `subscribe_headers` doc
lexnv Oct 4, 2022
afcf759
Merge remote-tracking branch 'origin/lexnv/fill_blocks' into lexnv/fi…
lexnv Oct 4, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 152 additions & 0 deletions subxt/src/blocks/blocks_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
// Copyright 2019-2022 Parity Technologies (UK) Ltd.
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.

use crate::{
client::OnlineClientT,
error::Error,
utils::PhantomDataSendSync,
Config,
};
use derivative::Derivative;
use futures::{
future::Either,
stream,
Stream,
StreamExt,
};
use sp_runtime::traits::Header;
use std::future::Future;

/// A client for working with blocks.
#[derive(Derivative)]
#[derivative(Clone(bound = "Client: Clone"))]
pub struct BlocksClient<T, Client> {
client: Client,
_marker: PhantomDataSendSync<T>,
}

impl<T, Client> BlocksClient<T, Client> {
/// Create a new [`BlocksClient`].
pub fn new(client: Client) -> Self {
Self {
client,
_marker: PhantomDataSendSync::new(),
}
}
}

impl<T, Client> BlocksClient<T, Client>
where
T: Config,
Client: OnlineClientT<T>,
{
/// Subscribe to new best block headers.
///
/// # Note
///
/// This does not produce all the blocks from the chain, just the best blocks.
/// The best block is selected by the consensus algorithm.
/// This calls under the hood the `chain_subscribeNewHeads` RPC method, if you need
/// a subscription of all the blocks please use the `chain_subscribeAllHeads` method.
///
/// These blocks haven't necessarily been finalised yet. Prefer
/// [`BlocksClient::subscribe_finalized_headers()`] if that is important.
pub fn subscribe_headers(
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
&self,
) -> impl Future<Output = Result<impl Stream<Item = Result<T::Header, Error>>, Error>>
+ Send
+ 'static {
let client = self.client.clone();
async move { client.rpc().subscribe_blocks().await }
}
Comment on lines +55 to +62
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just thinking about that other PR; does this return all new blocks or all new "best" blocks?

Might be worth clarifying this in the docs either way?

I wonder whether if we only subscribe to all new best blocks, we might miss some blocks?
But I also guess that the block number might be the same for more than one block we get back via this call, if the "best" chain changes (or if we are asking for all blocks anyway).

So maybe w can't fill things in, but we can document precisely what is returned either way :)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just thinking about that other PR; does this return all new blocks or all new "best" blocks?

This does subscribe to all new "best" blocks indeed.

Might be worth clarifying this in the docs either way?

It's a good idea to document this properly.

But I also guess that the block number might be the same for more than one block we get back via this call, if the "best" chain changes (or if we are asking for all blocks anyway).

Yep, I think we can end up in a situation where we have two blocks with the same block number. In this case, we can't guarantee that we can fill in missing blocks.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, I think we can end up in a situation where we have two blocks with the same block number. In this case, we can't guarantee that we can fill in missing blocks.

Yup, for that reason I don't think we should even try; as long as we document what will be returned it's all good.

I do wonder whether we should be subscribing to all new blocks too. Or whether just to add both "subscribe_best_headers" and "subscribe_all_headers" to go with "subscribe_finalized_headers". But as you said inthe other PR, let's just do what the new RPC API is going to support for now (I can't remember!)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new API will support all events: blocks, best blocks, and finalized blocks.
We don't support the simple block events (non best and non finalized) in subxt yet.

We can easily extend this to add all types of events, tho I think we can wait for a bit and implement it on top of the substrate's new API once it's released.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As long as the API and naming here means that we can migrate easily enough and add whatever else we need, I'm happy with it! And the current choice of function is what we use in events, so it makes sense to me how it is!

Let's just make sure to clearly document it :)


/// Subscribe to finalized block headers.
///
/// While the Substrate RPC method does not guarantee that all finalized block headers are
/// provided, this function does.
/// ```
pub fn subscribe_finalized_headers(
&self,
) -> impl Future<Output = Result<impl Stream<Item = Result<T::Header, Error>>, Error>>
+ Send
+ 'static {
let client = self.client.clone();
async move { subscribe_finalized_headers(client).await }
}
}

async fn subscribe_finalized_headers<T, Client>(
client: Client,
) -> Result<impl Stream<Item = Result<T::Header, Error>>, Error>
where
T: Config,
Client: OnlineClientT<T>,
{
// Fetch the last finalised block details immediately, so that we'll get
// all blocks after this one.
let last_finalized_block_hash = client.rpc().finalized_head().await?;
let last_finalized_block_num = client
.rpc()
.header(Some(last_finalized_block_hash))
.await?
.map(|h| (*h.number()).into());

let sub = client.rpc().subscribe_finalized_blocks().await?;

// Adjust the subscription stream to fill in any missing blocks.
Ok(
subscribe_to_block_headers_filling_in_gaps(client, last_finalized_block_num, sub)
.boxed(),
)
}

/// Note: This is exposed for testing but is not considered stable and may change
/// without notice in a patch release.
#[doc(hidden)]
pub fn subscribe_to_block_headers_filling_in_gaps<T, Client, S, E>(
client: Client,
mut last_block_num: Option<u64>,
sub: S,
) -> impl Stream<Item = Result<T::Header, Error>> + Send
where
T: Config,
Client: OnlineClientT<T>,
S: Stream<Item = Result<T::Header, E>> + Send,
E: Into<Error> + Send + 'static,
{
sub.flat_map(move |s| {
let client = client.clone();

// Get the header, or return a stream containing just the error.
let header = match s {
Ok(header) => header,
Err(e) => return Either::Left(stream::once(async { Err(e.into()) })),
};

// We want all previous details up to, but not including this current block num.
let end_block_num = (*header.number()).into();

// This is one after the last block we returned details for last time.
let start_block_num = last_block_num.map(|n| n + 1).unwrap_or(end_block_num);

// Iterate over all of the previous blocks we need headers for, ignoring the current block
// (which we already have the header info for):
let previous_headers = stream::iter(start_block_num..end_block_num)
.then(move |n| {
let rpc = client.rpc().clone();
async move {
let hash = rpc.block_hash(Some(n.into())).await?;
let header = rpc.header(hash).await?;
Ok::<_, Error>(header)
}
})
.filter_map(|h| async { h.transpose() });

// On the next iteration, we'll get details starting just after this end block.
last_block_num = Some(end_block_num);

// Return a combination of any previous headers plus the new header.
Either::Right(previous_headers.chain(stream::once(async { Ok(header) })))
})
}
12 changes: 12 additions & 0 deletions subxt/src/blocks/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
// Copyright 2019-2022 Parity Technologies (UK) Ltd.
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.

//! This module exposes the necessary functionality for working with events.

mod blocks_client;

pub use blocks_client::{
subscribe_to_block_headers_filling_in_gaps,
BlocksClient,
};
6 changes: 6 additions & 0 deletions subxt/src/client/offline_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// see LICENSE for license details.

use crate::{
blocks::BlocksClient,
constants::ConstantsClient,
events::EventsClient,
rpc::RuntimeVersion,
Expand Down Expand Up @@ -43,6 +44,11 @@ pub trait OfflineClientT<T: Config>: Clone + Send + Sync + 'static {
fn constants(&self) -> ConstantsClient<T, Self> {
ConstantsClient::new(self.clone())
}

/// Work with blocks.
fn blocks(&self) -> BlocksClient<T, Self> {
BlocksClient::new(self.clone())
}
}

/// A client that is capable of performing offline-only operations.
Expand Down
6 changes: 6 additions & 0 deletions subxt/src/client/online_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use super::{
OfflineClientT,
};
use crate::{
blocks::BlocksClient,
constants::ConstantsClient,
error::Error,
events::EventsClient,
Expand Down Expand Up @@ -203,6 +204,11 @@ impl<T: Config> OnlineClient<T> {
pub fn constants(&self) -> ConstantsClient<T, Self> {
<Self as OfflineClientT<T>>::constants(self)
}

/// Work with blocks.
pub fn blocks(&self) -> BlocksClient<T, Self> {
<Self as OfflineClientT<T>>::blocks(self)
}
}

impl<T: Config> OfflineClientT<T> for OnlineClient<T> {
Expand Down
3 changes: 1 addition & 2 deletions subxt/src/events/event_subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use crate::{
client::OnlineClientT,
error::Error,
events::EventsClient,
rpc::Subscription,
Config,
};
use derivative::Derivative;
Expand Down Expand Up @@ -40,7 +39,7 @@ pub type FinalizedEventSub<Header> = BoxStream<'static, Result<Header, Error>>;
/// A Subscription. This forms a part of the `EventSubscription` type handed back
/// in codegen from `subscribe`, and is exposed to be used in codegen.
#[doc(hidden)]
pub type EventSub<Item> = Subscription<Item>;
pub type EventSub<Item> = BoxStream<'static, Result<Item, Error>>;

/// A subscription to events that implements [`Stream`], and returns [`Events`] objects for each block.
#[derive(Derivative)]
Expand Down
86 changes: 7 additions & 79 deletions subxt/src/events/events_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,10 @@ use crate::{
Config,
};
use derivative::Derivative;
use futures::{
future::Either,
stream,
Stream,
StreamExt,
};
use sp_core::{
storage::StorageKey,
twox_128,
};
use sp_runtime::traits::Header;
use std::future::Future;

/// A client for working with events.
Expand Down Expand Up @@ -96,7 +89,10 @@ where
) -> impl Future<
Output = Result<EventSubscription<T, Client, EventSub<T::Header>>, Error>,
> + Send
+ 'static {
+ 'static
where
Client: Send + Sync + 'static,
{
let client = self.client.clone();
async move { subscribe(client).await }
}
Expand Down Expand Up @@ -157,8 +153,8 @@ where
T: Config,
Client: OnlineClientT<T>,
{
let block_subscription = client.rpc().subscribe_blocks().await?;
Ok(EventSubscription::new(client, block_subscription))
let block_subscription = client.blocks().subscribe_headers().await?;
Ok(EventSubscription::new(client, Box::pin(block_subscription)))
}

/// Subscribe to events from finalized blocks.
Expand All @@ -169,78 +165,10 @@ where
T: Config,
Client: OnlineClientT<T>,
{
// fetch the last finalised block details immediately, so that we'll get
// events for each block after this one.
let last_finalized_block_hash = client.rpc().finalized_head().await?;
let last_finalized_block_number = client
.rpc()
.header(Some(last_finalized_block_hash))
.await?
.map(|h| (*h.number()).into());

let sub = client.rpc().subscribe_finalized_blocks().await?;

// Fill in any gaps between the block above and the finalized blocks reported.
let block_subscription = subscribe_to_block_headers_filling_in_gaps(
client.clone(),
last_finalized_block_number,
sub,
);

let block_subscription = client.blocks().subscribe_finalized_headers().await?;
Ok(EventSubscription::new(client, Box::pin(block_subscription)))
}

/// Note: This is exposed for testing but is not considered stable and may change
/// without notice in a patch release.
#[doc(hidden)]
pub fn subscribe_to_block_headers_filling_in_gaps<T, Client, S, E>(
client: Client,
mut last_block_num: Option<u64>,
sub: S,
) -> impl Stream<Item = Result<T::Header, Error>> + Send
where
T: Config,
Client: OnlineClientT<T> + Send + Sync,
S: Stream<Item = Result<T::Header, E>> + Send,
E: Into<Error> + Send + 'static,
{
sub.flat_map(move |s| {
let client = client.clone();

// Get the header, or return a stream containing just the error. Our EventSubscription
// stream will return `None` as soon as it hits an error like this.
let header = match s {
Ok(header) => header,
Err(e) => return Either::Left(stream::once(async { Err(e.into()) })),
};

// We want all previous details up to, but not including this current block num.
let end_block_num = (*header.number()).into();

// This is one after the last block we returned details for last time.
let start_block_num = last_block_num.map(|n| n + 1).unwrap_or(end_block_num);

// Iterate over all of the previous blocks we need headers for, ignoring the current block
// (which we already have the header info for):
let previous_headers = stream::iter(start_block_num..end_block_num)
.then(move |n| {
let client = client.clone();
async move {
let hash = client.rpc().block_hash(Some(n.into())).await?;
let header = client.rpc().header(hash).await?;
Ok::<_, Error>(header)
}
})
.filter_map(|h| async { h.transpose() });

// On the next iteration, we'll get details starting just after this end block.
last_block_num = Some(end_block_num);

// Return a combination of any previous headers plus the new header.
Either::Right(previous_headers.chain(stream::once(async { Ok(header) })))
})
}

// The storage key needed to access events.
fn system_events_key() -> StorageKey {
let mut storage_key = twox_128(b"System").to_vec();
Expand Down
5 changes: 1 addition & 4 deletions subxt/src/events/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@ pub use event_subscription::{
EventSubscription,
FinalizedEventSub,
};
pub use events_client::{
subscribe_to_block_headers_filling_in_gaps,
EventsClient,
};
pub use events_client::EventsClient;
pub use events_type::{
EventDetails,
Events,
Expand Down
3 changes: 2 additions & 1 deletion subxt/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@

pub use subxt_macro::subxt;

pub mod blocks;
pub mod client;
pub mod config;
pub mod constants;
Expand All @@ -148,7 +149,7 @@ pub mod tx;
pub mod utils;

// Expose a few of the most common types at root,
// but leave most types behind their respoctive modules.
// but leave most types behind their respective modules.
pub use crate::{
client::{
OfflineClient,
Expand Down
Loading