Skip to content

Commit

Permalink
Stabilize chainHead methods (#1538)
Browse files Browse the repository at this point in the history
* Stabilize chainHead methods

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Rename fn snake case

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* docs: Fix documentation

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

---------

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
  • Loading branch information
lexnv authored Apr 19, 2024
1 parent ac606cf commit c124e17
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 75 deletions.
2 changes: 1 addition & 1 deletion subxt/src/backend/unstable/follow_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ impl<Hash> FollowStream<Hash> {
let methods = methods.clone();
Box::pin(async move {
// Make the RPC call:
let stream = methods.chainhead_unstable_follow(true).await?;
let stream = methods.chainhead_v1_follow(true).await?;
// Extract the subscription ID:
let Some(sub_id) = stream.subscription_id().map(ToOwned::to_owned) else {
return Err(Error::Other(
Expand Down
2 changes: 1 addition & 1 deletion subxt/src/backend/unstable/follow_stream_unpin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ impl<Hash: BlockHash> FollowStreamUnpin<Hash> {
let methods = methods.clone();
let fut: UnpinFut = Box::pin(async move {
// We ignore any errors trying to unpin at the moment.
let _ = methods.chainhead_unstable_unpin(&sub_id, hash).await;
let _ = methods.chainhead_v1_unpin(&sub_id, hash).await;
});
fut
});
Expand Down
8 changes: 4 additions & 4 deletions subxt/src/backend/unstable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ impl<T: Config> UnstableBackend<T> {

async move {
let res = methods
.chainhead_unstable_header(&sub_id, block_ref.hash())
.chainhead_v1_header(&sub_id, block_ref.hash())
.await
.transpose()?;

Expand Down Expand Up @@ -286,15 +286,15 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for UnstableBackend<T> {

async fn block_header(&self, at: T::Hash) -> Result<Option<T::Header>, Error> {
let sub_id = get_subscription_id(&self.follow_handle).await?;
self.methods.chainhead_unstable_header(&sub_id, at).await
self.methods.chainhead_v1_header(&sub_id, at).await
}

async fn block_body(&self, at: T::Hash) -> Result<Option<Vec<Vec<u8>>>, Error> {
let sub_id = get_subscription_id(&self.follow_handle).await?;

// Subscribe to the body response and get our operationId back.
let follow_events = self.follow_handle.subscribe().events();
let status = self.methods.chainhead_unstable_body(&sub_id, at).await?;
let status = self.methods.chainhead_v1_body(&sub_id, at).await?;
let operation_id = match status {
MethodResponse::LimitReached => {
return Err(RpcError::request_rejected("limit reached").into())
Expand Down Expand Up @@ -645,7 +645,7 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for UnstableBackend<T> {
let call_parameters = call_parameters.unwrap_or(&[]);
let status = self
.methods
.chainhead_unstable_call(&sub_id, at, method, call_parameters)
.chainhead_v1_call(&sub_id, at, method, call_parameters)
.await?;
let operation_id = match status {
MethodResponse::LimitReached => {
Expand Down
79 changes: 35 additions & 44 deletions subxt/src/backend/unstable/rpc_methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,118 +33,112 @@ impl<T: Config> UnstableRpcMethods<T> {
}
}

/// Subscribe to `chainHead_unstable_follow` to obtain all reported blocks by the chain.
/// Subscribe to `chainHead_v1_follow` to obtain all reported blocks by the chain.
///
/// The subscription ID can be used to make queries for the
/// block's body ([`chainhead_unstable_body`](UnstableRpcMethods::chainhead_unstable_follow)),
/// block's header ([`chainhead_unstable_header`](UnstableRpcMethods::chainhead_unstable_header)),
/// block's storage ([`chainhead_unstable_storage`](UnstableRpcMethods::chainhead_unstable_storage)) and submitting
/// runtime API calls at this block ([`chainhead_unstable_call`](UnstableRpcMethods::chainhead_unstable_call)).
/// block's body ([`chainHead_v1_body`](UnstableRpcMethods::chainhead_v1_follow)),
/// block's header ([`chainHead_v1_header`](UnstableRpcMethods::chainhead_v1_header)),
/// block's storage ([`chainHead_v1_storage`](UnstableRpcMethods::chainhead_v1_storage)) and submitting
/// runtime API calls at this block ([`chainHead_v1_call`](UnstableRpcMethods::chainhead_v1_call)).
///
/// # Note
///
/// When the user is no longer interested in a block, the user is responsible
/// for calling the [`chainhead_unstable_unpin`](UnstableRpcMethods::chainhead_unstable_unpin) method.
/// for calling the [`chainHead_v1_unpin`](UnstableRpcMethods::chainhead_v1_unpin) method.
/// Failure to do so will result in the subscription being stopped by generating the `Stop` event.
pub async fn chainhead_unstable_follow(
pub async fn chainhead_v1_follow(
&self,
with_runtime: bool,
) -> Result<FollowSubscription<T::Hash>, Error> {
let sub = self
.client
.subscribe(
"chainHead_unstable_follow",
"chainHead_v1_follow",
rpc_params![with_runtime],
"chainHead_unstable_unfollow",
"chainHead_v1_unfollow",
)
.await?;

Ok(FollowSubscription { sub, done: false })
}

/// Resumes a storage fetch started with chainHead_unstable_storage after it has generated an
/// Resumes a storage fetch started with chainHead_v1_storage after it has generated an
/// `operationWaitingForContinue` event.
///
/// Has no effect if the operationId is invalid or refers to an operation that has emitted a
/// `{"event": "operationInaccessible"` event, or if the followSubscription is invalid or stale.
pub async fn chainhead_unstable_continue(
pub async fn chainhead_v1_continue(
&self,
follow_subscription: &str,
operation_id: &str,
) -> Result<(), Error> {
self.client
.request(
"chainHead_unstable_continue",
"chainHead_v1_continue",
rpc_params![follow_subscription, operation_id],
)
.await?;

Ok(())
}

/// Stops an operation started with `chainHead_unstable_body`, `chainHead_unstable_call`, or
/// `chainHead_unstable_storage¦. If the operation was still in progress, this interrupts it.
/// Stops an operation started with `chainHead_v1_body`, `chainHead_v1_call`, or
/// `chainHead_v1_storage¦. If the operation was still in progress, this interrupts it.
/// If the operation was already finished, this call has no effect.
///
/// Has no effect if the `followSubscription` is invalid or stale.
pub async fn chainhead_unstable_stop_operation(
pub async fn chainhead_v1_stop_operation(
&self,
follow_subscription: &str,
operation_id: &str,
) -> Result<(), Error> {
self.client
.request(
"chainHead_unstable_stopOperation",
"chainHead_v1_stopOperation",
rpc_params![follow_subscription, operation_id],
)
.await?;

Ok(())
}

/// Call the `chainHead_unstable_body` method and return an operation ID to obtain the block's body.
/// Call the `chainHead_v1_body` method and return an operation ID to obtain the block's body.
///
/// The response events are provided on the `chainHead_follow` subscription and identified by
/// the returned operation ID.
///
/// # Note
///
/// The subscription ID is obtained from an open subscription created by
/// [`chainhead_unstable_follow`](UnstableRpcMethods::chainhead_unstable_follow).
pub async fn chainhead_unstable_body(
/// [`chainHead_v1_follow`](UnstableRpcMethods::chainhead_v1_follow).
pub async fn chainhead_v1_body(
&self,
subscription_id: &str,
hash: T::Hash,
) -> Result<MethodResponse, Error> {
let response = self
.client
.request(
"chainHead_unstable_body",
rpc_params![subscription_id, hash],
)
.request("chainHead_v1_body", rpc_params![subscription_id, hash])
.await?;

Ok(response)
}

/// Get the block's header using the `chainHead_unstable_header` method.
/// Get the block's header using the `chainHead_v1_header` method.
///
/// # Note
///
/// The subscription ID is obtained from an open subscription created by
/// [`chainhead_unstable_follow`](UnstableRpcMethods::chainhead_unstable_follow).
pub async fn chainhead_unstable_header(
/// [`chainHead_v1_follow`](UnstableRpcMethods::chainhead_v1_follow).
pub async fn chainhead_v1_header(
&self,
subscription_id: &str,
hash: T::Hash,
) -> Result<Option<T::Header>, Error> {
// header returned as hex encoded SCALE encoded bytes.
let header: Option<Bytes> = self
.client
.request(
"chainHead_unstable_header",
rpc_params![subscription_id, hash],
)
.request("chainHead_v1_header", rpc_params![subscription_id, hash])
.await?;

let header = header
Expand All @@ -153,16 +147,16 @@ impl<T: Config> UnstableRpcMethods<T> {
Ok(header)
}

/// Call the `chainhead_unstable_storage` method and return an operation ID to obtain the block's storage.
/// Call the `chainHead_v1_storage` method and return an operation ID to obtain the block's storage.
///
/// The response events are provided on the `chainHead_follow` subscription and identified by
/// the returned operation ID.
///
/// # Note
///
/// The subscription ID is obtained from an open subscription created by
/// [`chainhead_unstable_follow`](UnstableRpcMethods::chainhead_unstable_follow).
pub async fn chainhead_unstable_storage(
/// [`chainHead_v1_follow`](UnstableRpcMethods::chainhead_v1_follow).
pub async fn chainhead_v1_storage(
&self,
subscription_id: &str,
hash: T::Hash,
Expand All @@ -180,24 +174,24 @@ impl<T: Config> UnstableRpcMethods<T> {
let response = self
.client
.request(
"chainHead_unstable_storage",
"chainHead_v1_storage",
rpc_params![subscription_id, hash, items, child_key.map(to_hex)],
)
.await?;

Ok(response)
}

/// Call the `chainhead_unstable_storage` method and return an operation ID to obtain the runtime API result.
/// Call the `chainHead_v1_storage` method and return an operation ID to obtain the runtime API result.
///
/// The response events are provided on the `chainHead_follow` subscription and identified by
/// the returned operation ID.
///
/// # Note
///
/// The subscription ID is obtained from an open subscription created by
/// [`chainhead_unstable_follow`](UnstableRpcMethods::chainhead_unstable_follow).
pub async fn chainhead_unstable_call(
/// [`chainHead_v1_follow`](UnstableRpcMethods::chainhead_v1_follow).
pub async fn chainhead_v1_call(
&self,
subscription_id: &str,
hash: T::Hash,
Expand All @@ -207,7 +201,7 @@ impl<T: Config> UnstableRpcMethods<T> {
let response = self
.client
.request(
"chainHead_unstable_call",
"chainHead_v1_call",
rpc_params![subscription_id, hash, function, to_hex(call_parameters)],
)
.await?;
Expand All @@ -220,17 +214,14 @@ impl<T: Config> UnstableRpcMethods<T> {
/// # Note
///
/// The subscription ID is obtained from an open subscription created by
/// [`chainhead_unstable_follow`](UnstableRpcMethods::chainhead_unstable_follow).
pub async fn chainhead_unstable_unpin(
/// [`chainHead_v1_follow`](UnstableRpcMethods::chainhead_v1_follow).
pub async fn chainhead_v1_unpin(
&self,
subscription_id: &str,
hash: T::Hash,
) -> Result<(), Error> {
self.client
.request(
"chainHead_unstable_unpin",
rpc_params![subscription_id, hash],
)
.request("chainHead_v1_unpin", rpc_params![subscription_id, hash])
.await?;

Ok(())
Expand Down
8 changes: 2 additions & 6 deletions subxt/src/backend/unstable/storage_items.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl<T: Config> StorageItems<T> {
// Subscribe to events and make the initial request to get an operation ID.
let follow_events = follow_handle.subscribe().events();
let status = methods
.chainhead_unstable_storage(&sub_id, at, queries, None)
.chainhead_v1_storage(&sub_id, at, queries, None)
.await?;
let operation_id: Arc<str> = match status {
MethodResponse::LimitReached => {
Expand All @@ -59,11 +59,7 @@ impl<T: Config> StorageItems<T> {
let operation_id = operation_id.clone();
let methods = methods.clone();

Box::pin(async move {
methods
.chainhead_unstable_continue(&sub_id, &operation_id)
.await
})
Box::pin(async move { methods.chainhead_v1_continue(&sub_id, &operation_id).await })
})
};

Expand Down
Loading

0 comments on commit c124e17

Please sign in to comment.