From 25993e877897bdd97b8dcdea6eb123c0ba78baa4 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile <60601340+lexnv@users.noreply.github.com> Date: Tue, 8 Aug 2023 21:13:52 +0300 Subject: [PATCH] chainHead: Produce method responses on `chainHead_follow` (#14692) * chainHead/api: Make storage/body/call pure RPC methods Signed-off-by: Alexandru Vasile * chainHead: Add mpsc channel between RPC methods Signed-off-by: Alexandru Vasile * chainHead/subscriptions: Extract mpsc::Sender via BlockGuard Signed-off-by: Alexandru Vasile * chainHead/subscriptions: Generate and provide the method operation ID Signed-off-by: Alexandru Vasile * chainHead: Generate `chainHead_body` response Signed-off-by: Alexandru Vasile * chainHead: Generate `chainHead_call` response Signed-off-by: Alexandru Vasile * chainHead: Generate `chainHead_storage` responses Signed-off-by: Alexandru Vasile * chainHead: Propagate responses of methods to chainHead_follow Signed-off-by: Alexandru Vasile * chainHead/tests: Adjust `chainHead_body` responses Signed-off-by: Alexandru Vasile * chainHead/tests: Adjust `chainHead_call` responses Signed-off-by: Alexandru Vasile * chainHead/tests: Adjust `chainHead_call` responses Signed-off-by: Alexandru Vasile * chainHead/tests: Ensure unique operation IDs across methods Signed-off-by: Alexandru Vasile * chainHead/events: Remove old method events Signed-off-by: Alexandru Vasile * chainHead: Return `InvalidBlock` error if pinning fails Signed-off-by: Alexandru Vasile * chainHead: Wrap subscription IDs Signed-off-by: Alexandru Vasile * chainHead/tests: Ensure separate operation IDs across subscriptions Signed-off-by: Alexandru Vasile --------- Signed-off-by: Alexandru Vasile Co-authored-by: parity-processbot <> --- client/rpc-spec-v2/Cargo.toml | 2 +- client/rpc-spec-v2/src/chain_head/api.rs | 30 +- .../rpc-spec-v2/src/chain_head/chain_head.rs | 225 ++++--- .../src/chain_head/chain_head_follow.rs | 14 +- .../src/chain_head/chain_head_storage.rs | 78 ++- client/rpc-spec-v2/src/chain_head/event.rs | 168 ------ client/rpc-spec-v2/src/chain_head/mod.rs | 4 +- .../src/chain_head/subscription/inner.rs | 84 ++- .../src/chain_head/subscription/mod.rs | 7 +- client/rpc-spec-v2/src/chain_head/tests.rs | 547 +++++++++++++----- 10 files changed, 664 insertions(+), 495 deletions(-) diff --git a/client/rpc-spec-v2/Cargo.toml b/client/rpc-spec-v2/Cargo.toml index 4f5c11212a9b2..b1ab2a8799744 100644 --- a/client/rpc-spec-v2/Cargo.toml +++ b/client/rpc-spec-v2/Cargo.toml @@ -24,6 +24,7 @@ sp-api = { version = "4.0.0-dev", path = "../../primitives/api" } sp-blockchain = { version = "4.0.0-dev", path = "../../primitives/blockchain" } sp-version = { version = "22.0.0", path = "../../primitives/version" } sc-client-api = { version = "4.0.0-dev", path = "../api" } +sc-utils = { version = "4.0.0-dev", path = "../utils" } codec = { package = "parity-scale-codec", version = "3.6.1" } thiserror = "1.0" serde = "1.0" @@ -44,6 +45,5 @@ sp-consensus = { version = "0.10.0-dev", path = "../../primitives/consensus/comm sp-maybe-compressed-blob = { version = "4.1.0-dev", path = "../../primitives/maybe-compressed-blob" } sc-block-builder = { version = "0.10.0-dev", path = "../block-builder" } sc-service = { version = "0.10.0-dev", features = ["test-helpers"], path = "../service" } -sc-utils = { version = "4.0.0-dev", path = "../utils" } assert_matches = "1.3.0" pretty_assertions = "1.2.1" diff --git a/client/rpc-spec-v2/src/chain_head/api.rs b/client/rpc-spec-v2/src/chain_head/api.rs index 8905e03687747..c002b75efe037 100644 --- a/client/rpc-spec-v2/src/chain_head/api.rs +++ b/client/rpc-spec-v2/src/chain_head/api.rs @@ -19,7 +19,7 @@ #![allow(non_snake_case)] //! API trait of the chain head. -use crate::chain_head::event::{ChainHeadEvent, FollowEvent, StorageQuery}; +use crate::chain_head::event::{FollowEvent, MethodResponse, StorageQuery}; use jsonrpsee::{core::RpcResult, proc_macros::rpc}; #[rpc(client, server)] @@ -47,12 +47,12 @@ pub trait ChainHeadApi { /// # Unstable /// /// This method is unstable and subject to change in the future. - #[subscription( - name = "chainHead_unstable_body", - unsubscribe = "chainHead_unstable_stopBody", - item = ChainHeadEvent, - )] - fn chain_head_unstable_body(&self, follow_subscription: String, hash: Hash); + #[method(name = "chainHead_unstable_body", blocking)] + fn chain_head_unstable_body( + &self, + follow_subscription: String, + hash: Hash, + ) -> RpcResult; /// Retrieves the header of a pinned block. /// @@ -86,36 +86,28 @@ pub trait ChainHeadApi { /// # Unstable /// /// This method is unstable and subject to change in the future. - #[subscription( - name = "chainHead_unstable_storage", - unsubscribe = "chainHead_unstable_stopStorage", - item = ChainHeadEvent, - )] + #[method(name = "chainHead_unstable_storage", blocking)] fn chain_head_unstable_storage( &self, follow_subscription: String, hash: Hash, items: Vec>, child_trie: Option, - ); + ) -> RpcResult; /// Call into the Runtime API at a specified block's state. /// /// # Unstable /// /// This method is unstable and subject to change in the future. - #[subscription( - name = "chainHead_unstable_call", - unsubscribe = "chainHead_unstable_stopCall", - item = ChainHeadEvent, - )] + #[method(name = "chainHead_unstable_call", blocking)] fn chain_head_unstable_call( &self, follow_subscription: String, hash: Hash, function: String, call_parameters: String, - ); + ) -> RpcResult; /// Unpin a block reported by the `follow` method. /// diff --git a/client/rpc-spec-v2/src/chain_head/chain_head.rs b/client/rpc-spec-v2/src/chain_head/chain_head.rs index a2c9afc034906..16881b05fd7b9 100644 --- a/client/rpc-spec-v2/src/chain_head/chain_head.rs +++ b/client/rpc-spec-v2/src/chain_head/chain_head.rs @@ -18,12 +18,16 @@ //! API implementation for `chainHead`. +use super::{ + chain_head_storage::ChainHeadStorage, + event::{MethodResponseStarted, OperationBodyDone, OperationCallDone}, +}; use crate::{ chain_head::{ api::ChainHeadApiServer, chain_head_follow::ChainHeadFollower, error::Error as ChainHeadRpcError, - event::{ChainHeadEvent, ChainHeadResult, ErrorEvent, FollowEvent}, + event::{FollowEvent, MethodResponse, OperationError, StorageQuery, StorageQueryType}, hex_string, subscription::{SubscriptionManagement, SubscriptionManagementError}, }, @@ -47,11 +51,6 @@ use sp_core::{traits::CallContext, Bytes}; use sp_runtime::traits::Block as BlockT; use std::{marker::PhantomData, sync::Arc, time::Duration}; -use super::{ - chain_head_storage::ChainHeadStorage, - event::{ChainHeadStorageEvent, StorageQuery, StorageQueryType}, -}; - pub(crate) const LOG_TARGET: &str = "rpc-spec-v2"; /// An API for chain head RPC calls. @@ -81,7 +80,6 @@ impl, Block: BlockT, Client> ChainHead { max_pinned_duration: Duration, ) -> Self { let genesis_hash = hex_string(&genesis_hash.as_ref()); - Self { client, backend: backend.clone(), @@ -121,11 +119,8 @@ impl, Block: BlockT, Client> ChainHead { /// Parse hex-encoded string parameter as raw bytes. /// -/// If the parsing fails, the subscription is rejected. -fn parse_hex_param( - sink: &mut SubscriptionSink, - param: String, -) -> Result, SubscriptionEmptyError> { +/// If the parsing fails, returns an error propagated to the RPC method. +fn parse_hex_param(param: String) -> Result, ChainHeadRpcError> { // Methods can accept empty parameters. if param.is_empty() { return Ok(Default::default()) @@ -133,10 +128,7 @@ fn parse_hex_param( match array_bytes::hex2bytes(¶m) { Ok(bytes) => Ok(bytes), - Err(_) => { - let _ = sink.reject(ChainHeadRpcError::InvalidParam(param)); - Err(SubscriptionEmptyError) - }, + Err(_) => Err(ChainHeadRpcError::InvalidParam(param)), } } @@ -168,7 +160,7 @@ where }, }; // Keep track of the subscription. - let Some(rx_stop) = self.subscriptions.insert_subscription(sub_id.clone(), with_runtime) + let Some(sub_data) = self.subscriptions.insert_subscription(sub_id.clone(), with_runtime) else { // Inserting the subscription can only fail if the JsonRPSee // generated a duplicate subscription ID. @@ -190,7 +182,7 @@ where sub_id.clone(), ); - chain_head_follow.generate_events(sink, rx_stop).await; + chain_head_follow.generate_events(sink, sub_data).await; subscriptions.remove_subscription(&sub_id); debug!(target: LOG_TARGET, "[follow][id={:?}] Subscription removed", sub_id); @@ -202,59 +194,57 @@ where fn chain_head_unstable_body( &self, - mut sink: SubscriptionSink, follow_subscription: String, hash: Block::Hash, - ) -> SubscriptionResult { - let client = self.client.clone(); - let subscriptions = self.subscriptions.clone(); - - let block_guard = match subscriptions.lock_block(&follow_subscription, hash) { + ) -> RpcResult { + let block_guard = match self.subscriptions.lock_block(&follow_subscription, hash) { Ok(block) => block, Err(SubscriptionManagementError::SubscriptionAbsent) => { // Invalid invalid subscription ID. - let _ = sink.send(&ChainHeadEvent::::Disjoint); - return Ok(()) + return Ok(MethodResponse::LimitReached) }, Err(SubscriptionManagementError::BlockHashAbsent) => { // Block is not part of the subscription. - let _ = sink.reject(ChainHeadRpcError::InvalidBlock); - return Ok(()) - }, - Err(error) => { - let _ = sink.send(&ChainHeadEvent::::Error(ErrorEvent { - error: error.to_string(), - })); - return Ok(()) + return Err(ChainHeadRpcError::InvalidBlock.into()) }, + Err(_) => return Err(ChainHeadRpcError::InvalidBlock.into()), }; - let fut = async move { - let _block_guard = block_guard; - let event = match client.block(hash) { - Ok(Some(signed_block)) => { - let extrinsics = signed_block.block.extrinsics(); - let result = hex_string(&extrinsics.encode()); - ChainHeadEvent::Done(ChainHeadResult { result }) - }, - Ok(None) => { - // The block's body was pruned. This subscription ID has become invalid. - debug!( - target: LOG_TARGET, - "[body][id={:?}] Stopping subscription because hash={:?} was pruned", - &follow_subscription, - hash - ); - subscriptions.remove_subscription(&follow_subscription); - ChainHeadEvent::::Disjoint - }, - Err(error) => ChainHeadEvent::Error(ErrorEvent { error: error.to_string() }), - }; - let _ = sink.send(&event); + let event = match self.client.block(hash) { + Ok(Some(signed_block)) => { + let extrinsics = signed_block + .block + .extrinsics() + .iter() + .map(|extrinsic| hex_string(&extrinsic.encode())) + .collect(); + FollowEvent::::OperationBodyDone(OperationBodyDone { + operation_id: block_guard.operation_id(), + value: extrinsics, + }) + }, + Ok(None) => { + // The block's body was pruned. This subscription ID has become invalid. + debug!( + target: LOG_TARGET, + "[body][id={:?}] Stopping subscription because hash={:?} was pruned", + &follow_subscription, + hash + ); + self.subscriptions.remove_subscription(&follow_subscription); + return Err(ChainHeadRpcError::InvalidBlock.into()) + }, + Err(error) => FollowEvent::::OperationError(OperationError { + operation_id: block_guard.operation_id(), + error: error.to_string(), + }), }; - self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed()); - Ok(()) + let _ = block_guard.response_sender().unbounded_send(event); + Ok(MethodResponse::Started(MethodResponseStarted { + operation_id: block_guard.operation_id(), + discarded_items: None, + })) } fn chain_head_unstable_header( @@ -288,128 +278,113 @@ where fn chain_head_unstable_storage( &self, - mut sink: SubscriptionSink, follow_subscription: String, hash: Block::Hash, items: Vec>, child_trie: Option, - ) -> SubscriptionResult { + ) -> RpcResult { // Gain control over parameter parsing and returned error. let items = items .into_iter() .map(|query| { if query.query_type == StorageQueryType::ClosestDescendantMerkleValue { // Note: remove this once all types are implemented. - let _ = sink.reject(ChainHeadRpcError::InvalidParam( + return Err(ChainHeadRpcError::InvalidParam( "Storage query type not supported".into(), - )); - return Err(SubscriptionEmptyError) + )) } Ok(StorageQuery { - key: StorageKey(parse_hex_param(&mut sink, query.key)?), + key: StorageKey(parse_hex_param(query.key)?), query_type: query.query_type, }) }) .collect::, _>>()?; let child_trie = child_trie - .map(|child_trie| parse_hex_param(&mut sink, child_trie)) + .map(|child_trie| parse_hex_param(child_trie)) .transpose()? .map(ChildInfo::new_default_from_vec); - let client = self.client.clone(); - let subscriptions = self.subscriptions.clone(); - - let block_guard = match subscriptions.lock_block(&follow_subscription, hash) { + let block_guard = match self.subscriptions.lock_block(&follow_subscription, hash) { Ok(block) => block, Err(SubscriptionManagementError::SubscriptionAbsent) => { // Invalid invalid subscription ID. - let _ = sink.send(&ChainHeadStorageEvent::Disjoint); - return Ok(()) + return Ok(MethodResponse::LimitReached) }, Err(SubscriptionManagementError::BlockHashAbsent) => { // Block is not part of the subscription. - let _ = sink.reject(ChainHeadRpcError::InvalidBlock); - return Ok(()) - }, - Err(error) => { - let _ = sink - .send(&ChainHeadStorageEvent::Error(ErrorEvent { error: error.to_string() })); - return Ok(()) + return Err(ChainHeadRpcError::InvalidBlock.into()) }, + Err(_) => return Err(ChainHeadRpcError::InvalidBlock.into()), }; - let storage_client = ChainHeadStorage::::new(client); - + let storage_client = ChainHeadStorage::::new(self.client.clone()); + let operation_id = block_guard.operation_id(); let fut = async move { - let _block_guard = block_guard; - - storage_client.generate_events(sink, hash, items, child_trie); + storage_client.generate_events(block_guard, hash, items, child_trie); }; - self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed()); - Ok(()) + self.executor + .spawn_blocking("substrate-rpc-subscription", Some("rpc"), fut.boxed()); + Ok(MethodResponse::Started(MethodResponseStarted { + operation_id, + discarded_items: Some(0), + })) } fn chain_head_unstable_call( &self, - mut sink: SubscriptionSink, follow_subscription: String, hash: Block::Hash, function: String, call_parameters: String, - ) -> SubscriptionResult { - let call_parameters = Bytes::from(parse_hex_param(&mut sink, call_parameters)?); + ) -> RpcResult { + let call_parameters = Bytes::from(parse_hex_param(call_parameters)?); - let client = self.client.clone(); - let subscriptions = self.subscriptions.clone(); - - let block_guard = match subscriptions.lock_block(&follow_subscription, hash) { + let block_guard = match self.subscriptions.lock_block(&follow_subscription, hash) { Ok(block) => block, Err(SubscriptionManagementError::SubscriptionAbsent) => { // Invalid invalid subscription ID. - let _ = sink.send(&ChainHeadEvent::::Disjoint); - return Ok(()) + return Ok(MethodResponse::LimitReached) }, Err(SubscriptionManagementError::BlockHashAbsent) => { // Block is not part of the subscription. - let _ = sink.reject(ChainHeadRpcError::InvalidBlock); - return Ok(()) - }, - Err(error) => { - let _ = sink.send(&ChainHeadEvent::::Error(ErrorEvent { - error: error.to_string(), - })); - return Ok(()) + return Err(ChainHeadRpcError::InvalidBlock.into()) }, + Err(_) => return Err(ChainHeadRpcError::InvalidBlock.into()), }; - let fut = async move { - // Reject subscription if with_runtime is false. - if !block_guard.has_runtime() { - let _ = sink.reject(ChainHeadRpcError::InvalidParam( - "The runtime updates flag must be set".into(), - )); - return - } - - let res = client - .executor() - .call(hash, &function, &call_parameters, CallContext::Offchain) - .map(|result| { - let result = hex_string(&result); - ChainHeadEvent::Done(ChainHeadResult { result }) - }) - .unwrap_or_else(|error| { - ChainHeadEvent::Error(ErrorEvent { error: error.to_string() }) - }); + // Reject subscription if with_runtime is false. + if !block_guard.has_runtime() { + return Err(ChainHeadRpcError::InvalidParam( + "The runtime updates flag must be set".to_string(), + ) + .into()) + } - let _ = sink.send(&res); - }; + let event = self + .client + .executor() + .call(hash, &function, &call_parameters, CallContext::Offchain) + .map(|result| { + FollowEvent::::OperationCallDone(OperationCallDone { + operation_id: block_guard.operation_id(), + output: hex_string(&result), + }) + }) + .unwrap_or_else(|error| { + FollowEvent::::OperationError(OperationError { + operation_id: block_guard.operation_id(), + error: error.to_string(), + }) + }); - self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed()); - Ok(()) + let _ = block_guard.response_sender().unbounded_send(event); + Ok(MethodResponse::Started(MethodResponseStarted { + operation_id: block_guard.operation_id(), + discarded_items: None, + })) } fn chain_head_unstable_unpin( diff --git a/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs b/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs index 799978be532ae..0fa995ce73a09 100644 --- a/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs +++ b/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs @@ -24,7 +24,7 @@ use crate::chain_head::{ BestBlockChanged, Finalized, FollowEvent, Initialized, NewBlock, RuntimeEvent, RuntimeVersionEvent, }, - subscription::{SubscriptionManagement, SubscriptionManagementError}, + subscription::{InsertedSubscriptionData, SubscriptionManagement, SubscriptionManagementError}, }; use futures::{ channel::oneshot, @@ -80,6 +80,8 @@ enum NotificationType { NewBlock(BlockImportNotification), /// The finalized block notification obtained from `finality_notification_stream`. Finalized(FinalityNotification), + /// The response of `chainHead` method calls. + MethodResponse(FollowEvent), } /// The initial blocks that should be reported or ignored by the chainHead. @@ -515,6 +517,7 @@ where self.handle_import_blocks(notification, &startup_point), NotificationType::Finalized(notification) => self.handle_finalized_blocks(notification, &mut to_ignore, &startup_point), + NotificationType::MethodResponse(notification) => Ok(vec![notification]), }; let events = match events { @@ -572,7 +575,7 @@ where pub async fn generate_events( &mut self, mut sink: SubscriptionSink, - rx_stop: oneshot::Receiver<()>, + sub_data: InsertedSubscriptionData, ) { // Register for the new block and finalized notifications. let stream_import = self @@ -585,6 +588,10 @@ where .finality_notification_stream() .map(|notification| NotificationType::Finalized(notification)); + let stream_responses = sub_data + .response_receiver + .map(|response| NotificationType::MethodResponse(response)); + let startup_point = StartupPoint::from(self.client.info()); let (initial_events, pruned_forks) = match self.generate_init_events(&startup_point) { Ok(blocks) => blocks, @@ -602,9 +609,10 @@ where let initial = NotificationType::InitialEvents(initial_events); let merged = tokio_stream::StreamExt::merge(stream_import, stream_finalized); + let merged = tokio_stream::StreamExt::merge(merged, stream_responses); let stream = stream::once(futures::future::ready(initial)).chain(merged); - self.submit_events(&startup_point, stream.boxed(), pruned_forks, sink, rx_stop) + self.submit_events(&startup_point, stream.boxed(), pruned_forks, sink, sub_data.rx_stop) .await; } } diff --git a/client/rpc-spec-v2/src/chain_head/chain_head_storage.rs b/client/rpc-spec-v2/src/chain_head/chain_head_storage.rs index df1600628ded4..393e4489c8c07 100644 --- a/client/rpc-spec-v2/src/chain_head/chain_head_storage.rs +++ b/client/rpc-spec-v2/src/chain_head/chain_head_storage.rs @@ -20,17 +20,21 @@ use std::{marker::PhantomData, sync::Arc}; -use jsonrpsee::SubscriptionSink; use sc_client_api::{Backend, ChildInfo, StorageKey, StorageProvider}; +use sc_utils::mpsc::TracingUnboundedSender; use sp_api::BlockT; use sp_core::storage::well_known_keys; +use crate::chain_head::event::OperationStorageItems; + use super::{ event::{ - ChainHeadStorageEvent, ItemsEvent, StorageQuery, StorageQueryType, StorageResult, + OperationError, OperationId, StorageQuery, StorageQueryType, StorageResult, StorageResultType, }, - hex_string, ErrorEvent, + hex_string, + subscription::BlockGuard, + FollowEvent, }; /// The maximum number of items the `chainHead_storage` can return @@ -70,10 +74,10 @@ fn is_key_queryable(key: &[u8]) -> bool { } /// The result of making a query call. -type QueryResult = Result, ChainHeadStorageEvent>; +type QueryResult = Result, String>; /// The result of iterating over keys. -type QueryIterResult = Result, ChainHeadStorageEvent>; +type QueryIterResult = Result, String>; impl ChainHeadStorage where @@ -101,11 +105,7 @@ where result: StorageResultType::Value(hex_string(&storage_data.0)), })) }) - .unwrap_or_else(|err| { - QueryResult::Err(ChainHeadStorageEvent::Error(ErrorEvent { - error: err.to_string(), - })) - }) + .unwrap_or_else(|error| QueryResult::Err(error.to_string())) } /// Fetch the hash of a value from storage. @@ -128,11 +128,7 @@ where result: StorageResultType::Hash(hex_string(&storage_data.as_ref())), })) }) - .unwrap_or_else(|err| { - QueryResult::Err(ChainHeadStorageEvent::Error(ErrorEvent { - error: err.to_string(), - })) - }) + .unwrap_or_else(|error| QueryResult::Err(error.to_string())) } /// Handle iterating over (key, value) or (key, hash) pairs. @@ -148,7 +144,7 @@ where } else { self.client.storage_keys(hash, Some(key), None) } - .map_err(|err| ChainHeadStorageEvent::Error(ErrorEvent { error: err.to_string() }))?; + .map_err(|error| error.to_string())?; let mut ret = Vec::with_capacity(MAX_ITER_ITEMS); let mut keys_iter = keys_iter.take(MAX_ITER_ITEMS); @@ -169,14 +165,31 @@ where /// Generate the block events for the `chainHead_storage` method. pub fn generate_events( &self, - mut sink: SubscriptionSink, + block_guard: BlockGuard, hash: Block::Hash, items: Vec>, child_key: Option, ) { + /// Build and send the opaque error back to the `chainHead_follow` method. + fn send_error( + sender: &TracingUnboundedSender>, + operation_id: String, + error: String, + ) { + let _ = + sender.unbounded_send(FollowEvent::::OperationError(OperationError { + operation_id, + error, + })); + } + + let sender = block_guard.response_sender(); + if let Some(child_key) = child_key.as_ref() { if !is_key_queryable(child_key.storage_key()) { - let _ = sink.send(&ChainHeadStorageEvent::Done); + let _ = sender.unbounded_send(FollowEvent::::OperationStorageDone( + OperationId { operation_id: block_guard.operation_id() }, + )); return } } @@ -192,8 +205,8 @@ where match self.query_storage_value(hash, &item.key, child_key.as_ref()) { Ok(Some(value)) => storage_results.push(value), Ok(None) => continue, - Err(err) => { - let _ = sink.send(&err); + Err(error) => { + send_error::(&sender, block_guard.operation_id(), error); return }, } @@ -202,8 +215,8 @@ where match self.query_storage_hash(hash, &item.key, child_key.as_ref()) { Ok(Some(value)) => storage_results.push(value), Ok(None) => continue, - Err(err) => { - let _ = sink.send(&err); + Err(error) => { + send_error::(&sender, block_guard.operation_id(), error); return }, }, @@ -214,8 +227,8 @@ where IterQueryType::Value, ) { Ok(values) => storage_results.extend(values), - Err(err) => { - let _ = sink.send(&err); + Err(error) => { + send_error::(&sender, block_guard.operation_id(), error); return }, }, @@ -226,8 +239,8 @@ where IterQueryType::Hash, ) { Ok(values) => storage_results.extend(values), - Err(err) => { - let _ = sink.send(&err); + Err(error) => { + send_error::(&sender, block_guard.operation_id(), error); return }, }, @@ -236,10 +249,17 @@ where } if !storage_results.is_empty() { - let event = ChainHeadStorageEvent::Items(ItemsEvent { items: storage_results }); - let _ = sink.send(&event); + let _ = sender.unbounded_send(FollowEvent::::OperationStorageItems( + OperationStorageItems { + operation_id: block_guard.operation_id(), + items: storage_results, + }, + )); } - let _ = sink.send(&ChainHeadStorageEvent::Done); + let _ = + sender.unbounded_send(FollowEvent::::OperationStorageDone(OperationId { + operation_id: block_guard.operation_id(), + })); } } diff --git a/client/rpc-spec-v2/src/chain_head/event.rs b/client/rpc-spec-v2/src/chain_head/event.rs index 971a0a9f46b95..65bc8b247c880 100644 --- a/client/rpc-spec-v2/src/chain_head/event.rs +++ b/client/rpc-spec-v2/src/chain_head/event.rs @@ -276,31 +276,6 @@ pub enum FollowEvent { Stop, } -/// The result of a chain head method. -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct ChainHeadResult { - /// Result of the method. - pub result: T, -} - -/// The event generated by the body / call / storage methods. -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -#[serde(tag = "event")] -pub enum ChainHeadEvent { - /// The request completed successfully. - Done(ChainHeadResult), - /// The resources requested are inaccessible. - /// - /// Resubmitting the request later might succeed. - Inaccessible(ErrorEvent), - /// An error occurred. This is definitive. - Error(ErrorEvent), - /// The provided subscription ID is stale or invalid. - Disjoint, -} - /// The storage item received as paramter. #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] @@ -351,35 +326,6 @@ pub enum StorageResultType { ClosestDescendantMerkleValue(String), } -/// The event generated by storage method. -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] -#[serde(rename_all = "kebab-case")] -#[serde(tag = "event")] -pub enum ChainHeadStorageEvent { - /// The request produced multiple result items. - Items(ItemsEvent), - /// The request produced multiple result items. - WaitForContinue, - /// The request completed successfully and all the results were provided. - Done, - /// The resources requested are inaccessible. - /// - /// Resubmitting the request later might succeed. - Inaccessible, - /// An error occurred. This is definitive. - Error(ErrorEvent), - /// The provided subscription ID is stale or invalid. - Disjoint, -} - -/// The request produced multiple result items. -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct ItemsEvent { - /// The resulting items. - pub items: Vec, -} - /// The method respose of `chainHead_body`, `chainHead_call` and `chainHead_storage`. #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] @@ -714,56 +660,6 @@ mod tests { assert_eq!(event_dec, event); } - #[test] - fn chain_head_done_event() { - let event: ChainHeadEvent = - ChainHeadEvent::Done(ChainHeadResult { result: "A".into() }); - - let ser = serde_json::to_string(&event).unwrap(); - let exp = r#"{"event":"done","result":"A"}"#; - assert_eq!(ser, exp); - - let event_dec: ChainHeadEvent = serde_json::from_str(exp).unwrap(); - assert_eq!(event_dec, event); - } - - #[test] - fn chain_head_inaccessible_event() { - let event: ChainHeadEvent = - ChainHeadEvent::Inaccessible(ErrorEvent { error: "A".into() }); - - let ser = serde_json::to_string(&event).unwrap(); - let exp = r#"{"event":"inaccessible","error":"A"}"#; - assert_eq!(ser, exp); - - let event_dec: ChainHeadEvent = serde_json::from_str(exp).unwrap(); - assert_eq!(event_dec, event); - } - - #[test] - fn chain_head_error_event() { - let event: ChainHeadEvent = ChainHeadEvent::Error(ErrorEvent { error: "A".into() }); - - let ser = serde_json::to_string(&event).unwrap(); - let exp = r#"{"event":"error","error":"A"}"#; - assert_eq!(ser, exp); - - let event_dec: ChainHeadEvent = serde_json::from_str(exp).unwrap(); - assert_eq!(event_dec, event); - } - - #[test] - fn chain_head_disjoint_event() { - let event: ChainHeadEvent = ChainHeadEvent::Disjoint; - - let ser = serde_json::to_string(&event).unwrap(); - let exp = r#"{"event":"disjoint"}"#; - assert_eq!(ser, exp); - - let event_dec: ChainHeadEvent = serde_json::from_str(exp).unwrap(); - assert_eq!(event_dec, event); - } - #[test] fn chain_head_storage_query() { // Item with Value. @@ -855,68 +751,4 @@ mod tests { let dec: StorageResult = serde_json::from_str(exp).unwrap(); assert_eq!(dec, item); } - - #[test] - fn chain_head_storage_event() { - // Event with Items. - let event = ChainHeadStorageEvent::Items(ItemsEvent { - items: vec![ - StorageResult { - key: "0x1".into(), - result: StorageResultType::Value("first".into()), - }, - StorageResult { - key: "0x2".into(), - result: StorageResultType::Hash("second".into()), - }, - ], - }); - // Encode - let ser = serde_json::to_string(&event).unwrap(); - let exp = r#"{"event":"items","items":[{"key":"0x1","value":"first"},{"key":"0x2","hash":"second"}]}"#; - assert_eq!(ser, exp); - // Decode - let dec: ChainHeadStorageEvent = serde_json::from_str(exp).unwrap(); - assert_eq!(dec, event); - - // Event with WaitForContinue. - let event = ChainHeadStorageEvent::WaitForContinue; - // Encode - let ser = serde_json::to_string(&event).unwrap(); - let exp = r#"{"event":"wait-for-continue"}"#; - assert_eq!(ser, exp); - // Decode - let dec: ChainHeadStorageEvent = serde_json::from_str(exp).unwrap(); - assert_eq!(dec, event); - - // Event with Done. - let event = ChainHeadStorageEvent::Done; - // Encode - let ser = serde_json::to_string(&event).unwrap(); - let exp = r#"{"event":"done"}"#; - assert_eq!(ser, exp); - // Decode - let dec: ChainHeadStorageEvent = serde_json::from_str(exp).unwrap(); - assert_eq!(dec, event); - - // Event with Inaccessible. - let event = ChainHeadStorageEvent::Inaccessible; - // Encode - let ser = serde_json::to_string(&event).unwrap(); - let exp = r#"{"event":"inaccessible"}"#; - assert_eq!(ser, exp); - // Decode - let dec: ChainHeadStorageEvent = serde_json::from_str(exp).unwrap(); - assert_eq!(dec, event); - - // Event with Inaccessible. - let event = ChainHeadStorageEvent::Error(ErrorEvent { error: "reason".into() }); - // Encode - let ser = serde_json::to_string(&event).unwrap(); - let exp = r#"{"event":"error","error":"reason"}"#; - assert_eq!(ser, exp); - // Decode - let dec: ChainHeadStorageEvent = serde_json::from_str(exp).unwrap(); - assert_eq!(dec, event); - } } diff --git a/client/rpc-spec-v2/src/chain_head/mod.rs b/client/rpc-spec-v2/src/chain_head/mod.rs index 604f565ce7585..f0fa898f9f7e1 100644 --- a/client/rpc-spec-v2/src/chain_head/mod.rs +++ b/client/rpc-spec-v2/src/chain_head/mod.rs @@ -39,8 +39,8 @@ mod subscription; pub use api::ChainHeadApiServer; pub use chain_head::ChainHead; pub use event::{ - BestBlockChanged, ChainHeadEvent, ChainHeadResult, ErrorEvent, Finalized, FollowEvent, - Initialized, NewBlock, RuntimeEvent, RuntimeVersionEvent, + BestBlockChanged, ErrorEvent, Finalized, FollowEvent, Initialized, NewBlock, RuntimeEvent, + RuntimeVersionEvent, }; use sp_core::hexdisplay::{AsBytesRef, HexDisplay}; diff --git a/client/rpc-spec-v2/src/chain_head/subscription/inner.rs b/client/rpc-spec-v2/src/chain_head/subscription/inner.rs index be8b8f46a2844..c0c2701c5e145 100644 --- a/client/rpc-spec-v2/src/chain_head/subscription/inner.rs +++ b/client/rpc-spec-v2/src/chain_head/subscription/inner.rs @@ -18,6 +18,7 @@ use futures::channel::oneshot; use sc_client_api::Backend; +use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; use sp_runtime::traits::Block as BlockT; use std::{ collections::{hash_map::Entry, HashMap}, @@ -25,7 +26,10 @@ use std::{ time::{Duration, Instant}, }; -use crate::chain_head::subscription::SubscriptionManagementError; +use crate::chain_head::{subscription::SubscriptionManagementError, FollowEvent}; + +/// The queue size after which the `sc_utils::mpsc::tracing_unbounded` would produce warnings. +const QUEUE_SIZE_WARNING: usize = 512; /// The state machine of a block of a single subscription ID. /// @@ -116,6 +120,12 @@ struct SubscriptionState { with_runtime: bool, /// Signals the "Stop" event. tx_stop: Option>, + /// The sender of message responses to the `chainHead_follow` events. + /// + /// This object is cloned between methods. + response_sender: TracingUnboundedSender>, + /// The next operation ID. + next_operation_id: usize, /// Track the block hashes available for this subscription. /// /// This implementation assumes: @@ -227,6 +237,13 @@ impl SubscriptionState { } timestamp } + + /// Generate the next operation ID for this subscription. + fn next_operation_id(&mut self) -> usize { + let op_id = self.next_operation_id; + self.next_operation_id = self.next_operation_id.wrapping_add(1); + op_id + } } /// Keeps a specific block pinned while the handle is alive. @@ -235,6 +252,8 @@ impl SubscriptionState { pub struct BlockGuard> { hash: Block::Hash, with_runtime: bool, + response_sender: TracingUnboundedSender>, + operation_id: String, backend: Arc, } @@ -251,19 +270,37 @@ impl> BlockGuard { fn new( hash: Block::Hash, with_runtime: bool, + response_sender: TracingUnboundedSender>, + operation_id: usize, backend: Arc, ) -> Result { backend .pin_block(hash) .map_err(|err| SubscriptionManagementError::Custom(err.to_string()))?; - Ok(Self { hash, with_runtime, backend }) + Ok(Self { + hash, + with_runtime, + response_sender, + operation_id: operation_id.to_string(), + backend, + }) } /// The `with_runtime` flag of the subscription. pub fn has_runtime(&self) -> bool { self.with_runtime } + + /// Send message responses from the `chainHead` methods to `chainHead_follow`. + pub fn response_sender(&self) -> TracingUnboundedSender> { + self.response_sender.clone() + } + + /// The operation ID of this method. + pub fn operation_id(&self) -> String { + self.operation_id.clone() + } } impl> Drop for BlockGuard { @@ -272,6 +309,15 @@ impl> Drop for BlockGuard { } } +/// The data propagated back to the `chainHead_follow` method after +/// the subscription is successfully inserted. +pub struct InsertedSubscriptionData { + /// Signal that the subscription must stop. + pub rx_stop: oneshot::Receiver<()>, + /// Receive message responses from the `chainHead` methods. + pub response_receiver: TracingUnboundedReceiver>, +} + pub struct SubscriptionsInner> { /// Reference count the block hashes across all subscriptions. /// @@ -311,16 +357,21 @@ impl> SubscriptionsInner { &mut self, sub_id: String, with_runtime: bool, - ) -> Option> { + ) -> Option> { if let Entry::Vacant(entry) = self.subs.entry(sub_id) { let (tx_stop, rx_stop) = oneshot::channel(); + let (response_sender, response_receiver) = + tracing_unbounded("chain-head-method-responses", QUEUE_SIZE_WARNING); let state = SubscriptionState:: { with_runtime, tx_stop: Some(tx_stop), + response_sender, + next_operation_id: 0, blocks: Default::default(), }; entry.insert(state); - Some(rx_stop) + + Some(InsertedSubscriptionData { rx_stop, response_receiver }) } else { None } @@ -491,7 +542,7 @@ impl> SubscriptionsInner { sub_id: &str, hash: Block::Hash, ) -> Result, SubscriptionManagementError> { - let Some(sub) = self.subs.get(sub_id) else { + let Some(sub) = self.subs.get_mut(sub_id) else { return Err(SubscriptionManagementError::SubscriptionAbsent) }; @@ -499,7 +550,14 @@ impl> SubscriptionsInner { return Err(SubscriptionManagementError::BlockHashAbsent) } - BlockGuard::new(hash, sub.with_runtime, self.backend.clone()) + let operation_id = sub.next_operation_id(); + BlockGuard::new( + hash, + sub.with_runtime, + sub.response_sender.clone(), + operation_id, + self.backend.clone(), + ) } } @@ -604,9 +662,13 @@ mod tests { #[test] fn sub_state_register_twice() { + let (response_sender, _response_receiver) = + tracing_unbounded("test-chain-head-method-responses", QUEUE_SIZE_WARNING); let mut sub_state = SubscriptionState:: { with_runtime: false, tx_stop: None, + response_sender, + next_operation_id: 0, blocks: Default::default(), }; @@ -629,9 +691,13 @@ mod tests { #[test] fn sub_state_register_unregister() { + let (response_sender, _response_receiver) = + tracing_unbounded("test-chain-head-method-responses", QUEUE_SIZE_WARNING); let mut sub_state = SubscriptionState:: { with_runtime: false, tx_stop: None, + response_sender, + next_operation_id: 0, blocks: Default::default(), }; @@ -921,17 +987,17 @@ mod tests { let id = "abc".to_string(); - let mut rx_stop = subs.insert_subscription(id.clone(), true).unwrap(); + let mut sub_data = subs.insert_subscription(id.clone(), true).unwrap(); // Check the stop signal was not received. - let res = rx_stop.try_recv().unwrap(); + let res = sub_data.rx_stop.try_recv().unwrap(); assert!(res.is_none()); let sub = subs.subs.get_mut(&id).unwrap(); sub.stop(); // Check the signal was received. - let res = rx_stop.try_recv().unwrap(); + let res = sub_data.rx_stop.try_recv().unwrap(); assert!(res.is_some()); } } diff --git a/client/rpc-spec-v2/src/chain_head/subscription/mod.rs b/client/rpc-spec-v2/src/chain_head/subscription/mod.rs index 86e55acc4c176..3aece6575ef66 100644 --- a/client/rpc-spec-v2/src/chain_head/subscription/mod.rs +++ b/client/rpc-spec-v2/src/chain_head/subscription/mod.rs @@ -16,7 +16,6 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -use futures::channel::oneshot; use parking_lot::RwLock; use sc_client_api::Backend; use sp_runtime::traits::Block as BlockT; @@ -25,9 +24,9 @@ use std::{sync::Arc, time::Duration}; mod error; mod inner; +use self::inner::SubscriptionsInner; pub use error::SubscriptionManagementError; -pub use inner::BlockGuard; -use inner::SubscriptionsInner; +pub use inner::{BlockGuard, InsertedSubscriptionData}; /// Manage block pinning / unpinning for subscription IDs. pub struct SubscriptionManagement> { @@ -61,7 +60,7 @@ impl> SubscriptionManagement { &self, sub_id: String, runtime_updates: bool, - ) -> Option> { + ) -> Option> { let mut inner = self.inner.write(); inner.insert_subscription(sub_id, runtime_updates) } diff --git a/client/rpc-spec-v2/src/chain_head/tests.rs b/client/rpc-spec-v2/src/chain_head/tests.rs index fc2f1e85b42a3..6c3c343a10b53 100644 --- a/client/rpc-spec-v2/src/chain_head/tests.rs +++ b/client/rpc-spec-v2/src/chain_head/tests.rs @@ -1,5 +1,5 @@ use crate::chain_head::{ - event::{ChainHeadStorageEvent, StorageQuery, StorageQueryType, StorageResultType}, + event::{MethodResponse, StorageQuery, StorageQueryType, StorageResultType}, test_utils::ChainHeadMockClient, }; @@ -25,7 +25,7 @@ use sp_core::{ Blake2Hasher, Hasher, }; use sp_version::RuntimeVersion; -use std::{sync::Arc, time::Duration}; +use std::{collections::HashSet, sync::Arc, time::Duration}; use substrate_test_runtime::Transfer; use substrate_test_runtime_client::{ prelude::*, runtime, runtime::RuntimeApi, Backend, BlockBuilderExt, Client, @@ -330,29 +330,34 @@ async fn get_body() { let block_hash = format!("{:?}", block.header.hash()); let invalid_hash = hex_string(&INVALID_HASH); - // Subscription ID is stale the disjoint event is emitted. - let mut sub = api - .subscribe("chainHead_unstable_body", ["invalid_sub_id", &invalid_hash]) + // Subscription ID is invalid. + let response: MethodResponse = api + .call("chainHead_unstable_body", ["invalid_sub_id", &invalid_hash]) .await .unwrap(); - let event: ChainHeadEvent = get_next_event(&mut sub).await; - assert_eq!(event, ChainHeadEvent::::Disjoint); + assert_matches!(response, MethodResponse::LimitReached); - // Valid subscription ID with invalid block hash will error. + // Block hash is invalid. let err = api - .subscribe("chainHead_unstable_body", [&sub_id, &invalid_hash]) + .call::<_, serde_json::Value>("chainHead_unstable_body", [&sub_id, &invalid_hash]) .await .unwrap_err(); assert_matches!(err, Error::Call(CallError::Custom(ref err)) if err.code() == 2001 && err.message() == "Invalid block hash" ); - // Obtain valid the body (list of extrinsics). - let mut sub = api.subscribe("chainHead_unstable_body", [&sub_id, &block_hash]).await.unwrap(); - let event: ChainHeadEvent = get_next_event(&mut sub).await; - // Block contains no extrinsics. - assert_matches!(event, - ChainHeadEvent::Done(done) if done.result == "0x00" + // Valid call. + let response: MethodResponse = + api.call("chainHead_unstable_body", [&sub_id, &block_hash]).await.unwrap(); + let operation_id = match response { + MethodResponse::Started(started) => started.operation_id, + MethodResponse::LimitReached => panic!("Expected started response"), + }; + + // Response propagated to `chainHead_follow`. + assert_matches!( + get_next_event::>(&mut block_sub).await, + FollowEvent::OperationBodyDone(done) if done.operation_id == operation_id && done.value.is_empty() ); // Import a block with extrinsics. @@ -378,35 +383,41 @@ async fn get_body() { FollowEvent::BestBlockChanged(_) ); - let mut sub = api.subscribe("chainHead_unstable_body", [&sub_id, &block_hash]).await.unwrap(); - let event: ChainHeadEvent = get_next_event(&mut sub).await; - // Hex encoded scale encoded string for the vector of extrinsics. - let expected = hex_string(&block.extrinsics.encode()); - assert_matches!(event, - ChainHeadEvent::Done(done) if done.result == expected + // Valid call to a block with extrinsics. + let response: MethodResponse = + api.call("chainHead_unstable_body", [&sub_id, &block_hash]).await.unwrap(); + let operation_id = match response { + MethodResponse::Started(started) => started.operation_id, + MethodResponse::LimitReached => panic!("Expected started response"), + }; + + // Response propagated to `chainHead_follow`. + let expected_tx = hex_string(&block.extrinsics[0].encode()); + assert_matches!( + get_next_event::>(&mut block_sub).await, + FollowEvent::OperationBodyDone(done) if done.operation_id == operation_id && done.value == vec![expected_tx] ); } #[tokio::test] async fn call_runtime() { - let (_client, api, _sub, sub_id, block) = setup_api().await; + let (_client, api, mut block_sub, sub_id, block) = setup_api().await; let block_hash = format!("{:?}", block.header.hash()); let invalid_hash = hex_string(&INVALID_HASH); - // Subscription ID is stale the disjoint event is emitted. - let mut sub = api - .subscribe( + // Subscription ID is invalid. + let response: MethodResponse = api + .call( "chainHead_unstable_call", ["invalid_sub_id", &block_hash, "BabeApi_current_epoch", "0x00"], ) .await .unwrap(); - let event: ChainHeadEvent = get_next_event(&mut sub).await; - assert_eq!(event, ChainHeadEvent::::Disjoint); + assert_matches!(response, MethodResponse::LimitReached); - // Valid subscription ID with invalid block hash will error. + // Block hash is invalid. let err = api - .subscribe( + .call::<_, serde_json::Value>( "chainHead_unstable_call", [&sub_id, &invalid_hash, "BabeApi_current_epoch", "0x00"], ) @@ -418,8 +429,9 @@ async fn call_runtime() { // Pass an invalid parameters that cannot be decode. let err = api - .subscribe( + .call::<_, serde_json::Value>( "chainHead_unstable_call", + // 0x0 is invalid. [&sub_id, &block_hash, "BabeApi_current_epoch", "0x0"], ) .await @@ -428,34 +440,43 @@ async fn call_runtime() { Error::Call(CallError::Custom(ref err)) if err.code() == 2003 && err.message().contains("Invalid parameter") ); + // Valid call. let alice_id = AccountKeyring::Alice.to_account_id(); // Hex encoded scale encoded bytes representing the call parameters. let call_parameters = hex_string(&alice_id.encode()); - let mut sub = api - .subscribe( + let response: MethodResponse = api + .call( "chainHead_unstable_call", [&sub_id, &block_hash, "AccountNonceApi_account_nonce", &call_parameters], ) .await .unwrap(); + let operation_id = match response { + MethodResponse::Started(started) => started.operation_id, + MethodResponse::LimitReached => panic!("Expected started response"), + }; + // Response propagated to `chainHead_follow`. assert_matches!( - get_next_event::>(&mut sub).await, - ChainHeadEvent::Done(done) if done.result == "0x0000000000000000" + get_next_event::>(&mut block_sub).await, + FollowEvent::OperationCallDone(done) if done.operation_id == operation_id && done.output == "0x0000000000000000" ); // The `current_epoch` takes no parameters and not draining the input buffer // will cause the execution to fail. - let mut sub = api - .subscribe( - "chainHead_unstable_call", - [&sub_id, &block_hash, "BabeApi_current_epoch", "0x00"], - ) + let response: MethodResponse = api + .call("chainHead_unstable_call", [&sub_id, &block_hash, "BabeApi_current_epoch", "0x00"]) .await .unwrap(); + let operation_id = match response { + MethodResponse::Started(started) => started.operation_id, + MethodResponse::LimitReached => panic!("Expected started response"), + }; + + // Error propagated to `chainHead_follow`. assert_matches!( - get_next_event::>(&mut sub).await, - ChainHeadEvent::Error(event) if event.error.contains("Execution failed") + get_next_event::>(&mut block_sub).await, + FollowEvent::OperationError(error) if error.operation_id == operation_id && error.error.contains("Execution failed") ); } @@ -501,7 +522,7 @@ async fn call_runtime_without_flag() { let alice_id = AccountKeyring::Alice.to_account_id(); let call_parameters = hex_string(&alice_id.encode()); let err = api - .subscribe( + .call::<_, serde_json::Value>( "chainHead_unstable_call", [&sub_id, &block_hash, "AccountNonceApi_account_nonce", &call_parameters], ) @@ -520,9 +541,9 @@ async fn get_storage_hash() { let invalid_hash = hex_string(&INVALID_HASH); let key = hex_string(&KEY); - // Subscription ID is stale the disjoint event is emitted. - let mut sub = api - .subscribe( + // Subscription ID is invalid. + let response: MethodResponse = api + .call( "chainHead_unstable_storage", rpc_params![ "invalid_sub_id", @@ -532,12 +553,11 @@ async fn get_storage_hash() { ) .await .unwrap(); - let event: ChainHeadStorageEvent = get_next_event(&mut sub).await; - assert_eq!(event, ChainHeadStorageEvent::Disjoint); + assert_matches!(response, MethodResponse::LimitReached); - // Valid subscription ID with invalid block hash will error. + // Block hash is invalid. let err = api - .subscribe( + .call::<_, serde_json::Value>( "chainHead_unstable_storage", rpc_params![ &sub_id, @@ -552,8 +572,8 @@ async fn get_storage_hash() { ); // Valid call without storage at the key. - let mut sub = api - .subscribe( + let response: MethodResponse = api + .call( "chainHead_unstable_storage", rpc_params![ &sub_id, @@ -563,9 +583,15 @@ async fn get_storage_hash() { ) .await .unwrap(); - let event: ChainHeadStorageEvent = get_next_event(&mut sub).await; + let operation_id = match response { + MethodResponse::Started(started) => started.operation_id, + MethodResponse::LimitReached => panic!("Expected started response"), + }; // The `Done` event is generated directly since the key does not have any value associated. - assert_matches!(event, ChainHeadStorageEvent::Done); + assert_matches!( + get_next_event::>(&mut block_sub).await, + FollowEvent::OperationStorageDone(done) if done.operation_id == operation_id + ); // Import a new block with storage changes. let mut builder = client.new_block(Default::default()).unwrap(); @@ -585,9 +611,8 @@ async fn get_storage_hash() { ); // Valid call with storage at the key. - let expected_hash = format!("{:?}", Blake2Hasher::hash(&VALUE)); - let mut sub = api - .subscribe( + let response: MethodResponse = api + .call( "chainHead_unstable_storage", rpc_params![ &sub_id, @@ -597,17 +622,30 @@ async fn get_storage_hash() { ) .await .unwrap(); - let event: ChainHeadStorageEvent = get_next_event(&mut sub).await; - assert_matches!(event, ChainHeadStorageEvent::Items(res) if res.items.len() == 1 && res.items[0].key == key && res.items[0].result == StorageResultType::Hash(expected_hash)); - let event: ChainHeadStorageEvent = get_next_event(&mut sub).await; - assert_matches!(event, ChainHeadStorageEvent::Done); + let operation_id = match response { + MethodResponse::Started(started) => started.operation_id, + MethodResponse::LimitReached => panic!("Expected started response"), + }; + + let expected_hash = format!("{:?}", Blake2Hasher::hash(&VALUE)); + assert_matches!( + get_next_event::>(&mut block_sub).await, + FollowEvent::OperationStorageItems(res) if res.operation_id == operation_id && + res.items.len() == 1 && + res.items[0].key == key && res.items[0].result == StorageResultType::Hash(expected_hash) + ); + assert_matches!( + get_next_event::>(&mut block_sub).await, + FollowEvent::OperationStorageDone(done) if done.operation_id == operation_id + ); // Child value set in `setup_api`. let child_info = hex_string(&CHILD_STORAGE_KEY); let genesis_hash = format!("{:?}", client.genesis_hash()); - let expected_hash = format!("{:?}", Blake2Hasher::hash(&CHILD_VALUE)); - let mut sub = api - .subscribe( + + // Valid call with storage at the key. + let response: MethodResponse = api + .call( "chainHead_unstable_storage", rpc_params![ &sub_id, @@ -618,10 +656,22 @@ async fn get_storage_hash() { ) .await .unwrap(); - let event: ChainHeadStorageEvent = get_next_event(&mut sub).await; - assert_matches!(event, ChainHeadStorageEvent::Items(res) if res.items.len() == 1 && res.items[0].key == key && res.items[0].result == StorageResultType::Hash(expected_hash)); - let event: ChainHeadStorageEvent = get_next_event(&mut sub).await; - assert_matches!(event, ChainHeadStorageEvent::Done); + let operation_id = match response { + MethodResponse::Started(started) => started.operation_id, + MethodResponse::LimitReached => panic!("Expected started response"), + }; + + let expected_hash = format!("{:?}", Blake2Hasher::hash(&CHILD_VALUE)); + assert_matches!( + get_next_event::>(&mut block_sub).await, + FollowEvent::OperationStorageItems(res) if res.operation_id == operation_id && + res.items.len() == 1 && + res.items[0].key == key && res.items[0].result == StorageResultType::Hash(expected_hash) + ); + assert_matches!( + get_next_event::>(&mut block_sub).await, + FollowEvent::OperationStorageDone(done) if done.operation_id == operation_id + ); } #[tokio::test] @@ -647,10 +697,8 @@ async fn get_storage_multi_query_iter() { ); // Valid call with storage at the key. - let expected_hash = format!("{:?}", Blake2Hasher::hash(&VALUE)); - let expected_value = hex_string(&VALUE); - let mut sub = api - .subscribe( + let response: MethodResponse = api + .call( "chainHead_unstable_storage", rpc_params![ &sub_id, @@ -669,22 +717,34 @@ async fn get_storage_multi_query_iter() { ) .await .unwrap(); - let event: ChainHeadStorageEvent = get_next_event(&mut sub).await; - assert_matches!(event, ChainHeadStorageEvent::Items(res) if res.items.len() == 2 && - res.items[0].key == key && - res.items[1].key == key && - res.items[0].result == StorageResultType::Hash(expected_hash) && - res.items[1].result == StorageResultType::Value(expected_value)); - let event: ChainHeadStorageEvent = get_next_event(&mut sub).await; - assert_matches!(event, ChainHeadStorageEvent::Done); + let operation_id = match response { + MethodResponse::Started(started) => started.operation_id, + MethodResponse::LimitReached => panic!("Expected started response"), + }; + + let expected_hash = format!("{:?}", Blake2Hasher::hash(&VALUE)); + let expected_value = hex_string(&VALUE); + assert_matches!( + get_next_event::>(&mut block_sub).await, + FollowEvent::OperationStorageItems(res) if res.operation_id == operation_id && + res.items.len() == 2 && + res.items[0].key == key && + res.items[1].key == key && + res.items[0].result == StorageResultType::Hash(expected_hash) && + res.items[1].result == StorageResultType::Value(expected_value) + ); + assert_matches!( + get_next_event::>(&mut block_sub).await, + FollowEvent::OperationStorageDone(done) if done.operation_id == operation_id + ); // Child value set in `setup_api`. let child_info = hex_string(&CHILD_STORAGE_KEY); let genesis_hash = format!("{:?}", client.genesis_hash()); let expected_hash = format!("{:?}", Blake2Hasher::hash(&CHILD_VALUE)); let expected_value = hex_string(&CHILD_VALUE); - let mut sub = api - .subscribe( + let response: MethodResponse = api + .call( "chainHead_unstable_storage", rpc_params![ &sub_id, @@ -704,14 +764,24 @@ async fn get_storage_multi_query_iter() { ) .await .unwrap(); - let event: ChainHeadStorageEvent = get_next_event(&mut sub).await; - assert_matches!(event, ChainHeadStorageEvent::Items(res) if res.items.len() == 2 && - res.items[0].key == key && - res.items[1].key == key && - res.items[0].result == StorageResultType::Hash(expected_hash) && - res.items[1].result == StorageResultType::Value(expected_value)); - let event: ChainHeadStorageEvent = get_next_event(&mut sub).await; - assert_matches!(event, ChainHeadStorageEvent::Done); + let operation_id = match response { + MethodResponse::Started(started) => started.operation_id, + MethodResponse::LimitReached => panic!("Expected started response"), + }; + + assert_matches!( + get_next_event::>(&mut block_sub).await, + FollowEvent::OperationStorageItems(res) if res.operation_id == operation_id && + res.items.len() == 2 && + res.items[0].key == key && + res.items[1].key == key && + res.items[0].result == StorageResultType::Hash(expected_hash) && + res.items[1].result == StorageResultType::Value(expected_value) + ); + assert_matches!( + get_next_event::>(&mut block_sub).await, + FollowEvent::OperationStorageDone(done) if done.operation_id == operation_id + ); } #[tokio::test] @@ -721,9 +791,9 @@ async fn get_storage_value() { let invalid_hash = hex_string(&INVALID_HASH); let key = hex_string(&KEY); - // Subscription ID is stale the disjoint event is emitted. - let mut sub = api - .subscribe( + // Subscription ID is invalid. + let response: MethodResponse = api + .call( "chainHead_unstable_storage", rpc_params![ "invalid_sub_id", @@ -733,12 +803,11 @@ async fn get_storage_value() { ) .await .unwrap(); - let event: ChainHeadStorageEvent = get_next_event(&mut sub).await; - assert_eq!(event, ChainHeadStorageEvent::Disjoint); + assert_matches!(response, MethodResponse::LimitReached); - // Valid subscription ID with invalid block hash will error. + // Block hash is invalid. let err = api - .subscribe( + .call::<_, serde_json::Value>( "chainHead_unstable_storage", rpc_params![ &sub_id, @@ -753,8 +822,8 @@ async fn get_storage_value() { ); // Valid call without storage at the key. - let mut sub = api - .subscribe( + let response: MethodResponse = api + .call( "chainHead_unstable_storage", rpc_params![ &sub_id, @@ -764,9 +833,15 @@ async fn get_storage_value() { ) .await .unwrap(); - let event: ChainHeadStorageEvent = get_next_event(&mut sub).await; + let operation_id = match response { + MethodResponse::Started(started) => started.operation_id, + MethodResponse::LimitReached => panic!("Expected started response"), + }; // The `Done` event is generated directly since the key does not have any value associated. - assert_matches!(event, ChainHeadStorageEvent::Done); + assert_matches!( + get_next_event::>(&mut block_sub).await, + FollowEvent::OperationStorageDone(done) if done.operation_id == operation_id + ); // Import a new block with storage changes. let mut builder = client.new_block(Default::default()).unwrap(); @@ -786,9 +861,8 @@ async fn get_storage_value() { ); // Valid call with storage at the key. - let expected_value = hex_string(&VALUE); - let mut sub = api - .subscribe( + let response: MethodResponse = api + .call( "chainHead_unstable_storage", rpc_params![ &sub_id, @@ -798,17 +872,29 @@ async fn get_storage_value() { ) .await .unwrap(); - let event: ChainHeadStorageEvent = get_next_event(&mut sub).await; - assert_matches!(event, ChainHeadStorageEvent::Items(res) if res.items.len() == 1 && res.items[0].key == key && res.items[0].result == StorageResultType::Value(expected_value)); - let event: ChainHeadStorageEvent = get_next_event(&mut sub).await; - assert_matches!(event, ChainHeadStorageEvent::Done); + let operation_id = match response { + MethodResponse::Started(started) => started.operation_id, + MethodResponse::LimitReached => panic!("Expected started response"), + }; + + let expected_value = hex_string(&VALUE); + assert_matches!( + get_next_event::>(&mut block_sub).await, + FollowEvent::OperationStorageItems(res) if res.operation_id == operation_id && + res.items.len() == 1 && + res.items[0].key == key && res.items[0].result == StorageResultType::Value(expected_value) + ); + assert_matches!( + get_next_event::>(&mut block_sub).await, + FollowEvent::OperationStorageDone(done) if done.operation_id == operation_id + ); // Child value set in `setup_api`. - let child_info = hex_string(b"child"); + let child_info = hex_string(&CHILD_STORAGE_KEY); let genesis_hash = format!("{:?}", client.genesis_hash()); - let expected_value = hex_string(&CHILD_VALUE); - let mut sub = api - .subscribe( + + let response: MethodResponse = api + .call( "chainHead_unstable_storage", rpc_params![ &sub_id, @@ -819,15 +905,28 @@ async fn get_storage_value() { ) .await .unwrap(); - let event: ChainHeadStorageEvent = get_next_event(&mut sub).await; - assert_matches!(event, ChainHeadStorageEvent::Items(res) if res.items.len() == 1 && res.items[0].key == key && res.items[0].result == StorageResultType::Value(expected_value)); - let event: ChainHeadStorageEvent = get_next_event(&mut sub).await; - assert_matches!(event, ChainHeadStorageEvent::Done); + let operation_id = match response { + MethodResponse::Started(started) => started.operation_id, + MethodResponse::LimitReached => panic!("Expected started response"), + }; + + let expected_value = hex_string(&CHILD_VALUE); + + assert_matches!( + get_next_event::>(&mut block_sub).await, + FollowEvent::OperationStorageItems(res) if res.operation_id == operation_id && + res.items.len() == 1 && + res.items[0].key == key && res.items[0].result == StorageResultType::Value(expected_value) + ); + assert_matches!( + get_next_event::>(&mut block_sub).await, + FollowEvent::OperationStorageDone(done) if done.operation_id == operation_id + ); } #[tokio::test] -async fn get_storage_wrong_key() { - let (mut _client, api, mut _block_sub, sub_id, block) = setup_api().await; +async fn get_storage_non_queryable_key() { + let (mut _client, api, mut block_sub, sub_id, block) = setup_api().await; let block_hash = format!("{:?}", block.header.hash()); let key = hex_string(&KEY); @@ -835,8 +934,9 @@ async fn get_storage_wrong_key() { let mut prefixed_key = well_known_keys::CHILD_STORAGE_KEY_PREFIX.to_vec(); prefixed_key.extend_from_slice(&KEY); let prefixed_key = hex_string(&prefixed_key); - let mut sub = api - .subscribe( + + let response: MethodResponse = api + .call( "chainHead_unstable_storage", rpc_params![ &sub_id, @@ -846,15 +946,22 @@ async fn get_storage_wrong_key() { ) .await .unwrap(); - let event: ChainHeadStorageEvent = get_next_event(&mut sub).await; - assert_matches!(event, ChainHeadStorageEvent::Done); + let operation_id = match response { + MethodResponse::Started(started) => started.operation_id, + MethodResponse::LimitReached => panic!("Expected started response"), + }; + // The `Done` event is generated directly since the key is not queryable. + assert_matches!( + get_next_event::>(&mut block_sub).await, + FollowEvent::OperationStorageDone(done) if done.operation_id == operation_id + ); // Key is prefixed by DEFAULT_CHILD_STORAGE_KEY_PREFIX. let mut prefixed_key = well_known_keys::DEFAULT_CHILD_STORAGE_KEY_PREFIX.to_vec(); prefixed_key.extend_from_slice(&KEY); let prefixed_key = hex_string(&prefixed_key); - let mut sub = api - .subscribe( + let response: MethodResponse = api + .call( "chainHead_unstable_storage", rpc_params![ &sub_id, @@ -864,15 +971,22 @@ async fn get_storage_wrong_key() { ) .await .unwrap(); - let event: ChainHeadStorageEvent = get_next_event(&mut sub).await; - assert_matches!(event, ChainHeadStorageEvent::Done); + let operation_id = match response { + MethodResponse::Started(started) => started.operation_id, + MethodResponse::LimitReached => panic!("Expected started response"), + }; + // The `Done` event is generated directly since the key is not queryable. + assert_matches!( + get_next_event::>(&mut block_sub).await, + FollowEvent::OperationStorageDone(done) if done.operation_id == operation_id + ); // Child key is prefixed by CHILD_STORAGE_KEY_PREFIX. let mut prefixed_key = well_known_keys::CHILD_STORAGE_KEY_PREFIX.to_vec(); - prefixed_key.extend_from_slice(b"child"); + prefixed_key.extend_from_slice(CHILD_STORAGE_KEY); let prefixed_key = hex_string(&prefixed_key); - let mut sub = api - .subscribe( + let response: MethodResponse = api + .call( "chainHead_unstable_storage", rpc_params![ &sub_id, @@ -883,15 +997,22 @@ async fn get_storage_wrong_key() { ) .await .unwrap(); - let event: ChainHeadStorageEvent = get_next_event(&mut sub).await; - assert_matches!(event, ChainHeadStorageEvent::Done); + let operation_id = match response { + MethodResponse::Started(started) => started.operation_id, + MethodResponse::LimitReached => panic!("Expected started response"), + }; + // The `Done` event is generated directly since the key is not queryable. + assert_matches!( + get_next_event::>(&mut block_sub).await, + FollowEvent::OperationStorageDone(done) if done.operation_id == operation_id + ); // Child key is prefixed by DEFAULT_CHILD_STORAGE_KEY_PREFIX. let mut prefixed_key = well_known_keys::DEFAULT_CHILD_STORAGE_KEY_PREFIX.to_vec(); - prefixed_key.extend_from_slice(b"child"); + prefixed_key.extend_from_slice(CHILD_STORAGE_KEY); let prefixed_key = hex_string(&prefixed_key); - let mut sub = api - .subscribe( + let response: MethodResponse = api + .call( "chainHead_unstable_storage", rpc_params![ &sub_id, @@ -902,8 +1023,164 @@ async fn get_storage_wrong_key() { ) .await .unwrap(); - let event: ChainHeadStorageEvent = get_next_event(&mut sub).await; - assert_matches!(event, ChainHeadStorageEvent::Done); + let operation_id = match response { + MethodResponse::Started(started) => started.operation_id, + MethodResponse::LimitReached => panic!("Expected started response"), + }; + // The `Done` event is generated directly since the key is not queryable. + assert_matches!( + get_next_event::>(&mut block_sub).await, + FollowEvent::OperationStorageDone(done) if done.operation_id == operation_id + ); +} + +#[tokio::test] +async fn unique_operation_ids() { + let (mut _client, api, mut block_sub, sub_id, block) = setup_api().await; + let block_hash = format!("{:?}", block.header.hash()); + + let mut op_ids = HashSet::new(); + + // Ensure that operation IDs are unique for multiple method calls. + for _ in 0..5 { + // Valid `chainHead_unstable_body` call. + let response: MethodResponse = + api.call("chainHead_unstable_body", [&sub_id, &block_hash]).await.unwrap(); + let operation_id = match response { + MethodResponse::Started(started) => started.operation_id, + MethodResponse::LimitReached => panic!("Expected started response"), + }; + assert_matches!( + get_next_event::>(&mut block_sub).await, + FollowEvent::OperationBodyDone(done) if done.operation_id == operation_id && done.value.is_empty() + ); + // Ensure uniqueness. + assert!(op_ids.insert(operation_id)); + + // Valid `chainHead_unstable_storage` call. + let key = hex_string(&KEY); + let response: MethodResponse = api + .call( + "chainHead_unstable_storage", + rpc_params![ + &sub_id, + &block_hash, + vec![StorageQuery { key: key.clone(), query_type: StorageQueryType::Value }] + ], + ) + .await + .unwrap(); + let operation_id = match response { + MethodResponse::Started(started) => started.operation_id, + MethodResponse::LimitReached => panic!("Expected started response"), + }; + // The `Done` event is generated directly since the key does not have any value associated. + assert_matches!( + get_next_event::>(&mut block_sub).await, + FollowEvent::OperationStorageDone(done) if done.operation_id == operation_id + ); + // Ensure uniqueness. + assert!(op_ids.insert(operation_id)); + + // Valid `chainHead_unstable_call` call. + let alice_id = AccountKeyring::Alice.to_account_id(); + let call_parameters = hex_string(&alice_id.encode()); + let response: MethodResponse = api + .call( + "chainHead_unstable_call", + [&sub_id, &block_hash, "AccountNonceApi_account_nonce", &call_parameters], + ) + .await + .unwrap(); + let operation_id = match response { + MethodResponse::Started(started) => started.operation_id, + MethodResponse::LimitReached => panic!("Expected started response"), + }; + // Response propagated to `chainHead_follow`. + assert_matches!( + get_next_event::>(&mut block_sub).await, + FollowEvent::OperationCallDone(done) if done.operation_id == operation_id && done.output == "0x0000000000000000" + ); + // Ensure uniqueness. + assert!(op_ids.insert(operation_id)); + } +} + +#[tokio::test] +async fn separate_operation_ids_for_subscriptions() { + let builder = TestClientBuilder::new(); + let backend = builder.backend(); + let mut client = Arc::new(builder.build()); + + let api = ChainHead::new( + client.clone(), + backend, + Arc::new(TaskExecutor::default()), + CHAIN_GENESIS, + MAX_PINNED_BLOCKS, + Duration::from_secs(MAX_PINNED_SECS), + ) + .into_rpc(); + + // Create two separate subscriptions. + let mut sub_first = api.subscribe("chainHead_unstable_follow", [true]).await.unwrap(); + let sub_id_first = sub_first.subscription_id(); + let sub_id_first = serde_json::to_string(&sub_id_first).unwrap(); + + let mut sub_second = api.subscribe("chainHead_unstable_follow", [true]).await.unwrap(); + let sub_id_second = sub_second.subscription_id(); + let sub_id_second = serde_json::to_string(&sub_id_second).unwrap(); + + let block = client.new_block(Default::default()).unwrap().build().unwrap().block; + client.import(BlockOrigin::Own, block.clone()).await.unwrap(); + let block_hash = format!("{:?}", block.header.hash()); + + // Ensure the imported block is propagated and pinned. + assert_matches!( + get_next_event::>(&mut sub_first).await, + FollowEvent::Initialized(_) + ); + assert_matches!( + get_next_event::>(&mut sub_first).await, + FollowEvent::NewBlock(_) + ); + assert_matches!( + get_next_event::>(&mut sub_first).await, + FollowEvent::BestBlockChanged(_) + ); + + assert_matches!( + get_next_event::>(&mut sub_second).await, + FollowEvent::Initialized(_) + ); + assert_matches!( + get_next_event::>(&mut sub_second).await, + FollowEvent::NewBlock(_) + ); + assert_matches!( + get_next_event::>(&mut sub_second).await, + FollowEvent::BestBlockChanged(_) + ); + + // Each `chainHead_follow` subscription receives a separate operation ID. + let response: MethodResponse = + api.call("chainHead_unstable_body", [&sub_id_first, &block_hash]).await.unwrap(); + let operation_id: String = match response { + MethodResponse::Started(started) => started.operation_id, + MethodResponse::LimitReached => panic!("Expected started response"), + }; + assert_eq!(operation_id, "0"); + + let response: MethodResponse = api + .call("chainHead_unstable_body", [&sub_id_second, &block_hash]) + .await + .unwrap(); + let operation_id_second: String = match response { + MethodResponse::Started(started) => started.operation_id, + MethodResponse::LimitReached => panic!("Expected started response"), + }; + // The second subscription does not increment the operation ID of the first one. + assert_eq!(operation_id_second, "0"); } #[tokio::test]