From 4410c98ddcff62fb9994451e3a5cc62d643fb06b Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 24 Jul 2023 19:33:13 +0300 Subject: [PATCH 1/4] chainHead: Iterate over key,values and key,hashes Signed-off-by: Alexandru Vasile --- .../rpc-spec-v2/src/chain_head/chain_head.rs | 4 +- .../src/chain_head/chain_head_storage.rs | 151 +++++++++++++----- 2 files changed, 111 insertions(+), 44 deletions(-) diff --git a/client/rpc-spec-v2/src/chain_head/chain_head.rs b/client/rpc-spec-v2/src/chain_head/chain_head.rs index bb6a6bcbdfed1..7acaaa07835f0 100644 --- a/client/rpc-spec-v2/src/chain_head/chain_head.rs +++ b/client/rpc-spec-v2/src/chain_head/chain_head.rs @@ -300,9 +300,7 @@ where let items = items .into_iter() .map(|query| { - if query.queue_type != StorageQueryType::Value && - query.queue_type != StorageQueryType::Hash - { + if query.queue_type == StorageQueryType::ClosestDescendantMerkleValue { // Note: remove this once all types are implemented. let _ = sink.reject(ChainHeadRpcError::InvalidParam( "Storage query type not supported".into(), diff --git a/client/rpc-spec-v2/src/chain_head/chain_head_storage.rs b/client/rpc-spec-v2/src/chain_head/chain_head_storage.rs index 9b5bf2a1180fb..863c62b338644 100644 --- a/client/rpc-spec-v2/src/chain_head/chain_head_storage.rs +++ b/client/rpc-spec-v2/src/chain_head/chain_head_storage.rs @@ -33,6 +33,18 @@ use super::{ hex_string, ErrorEvent, }; +/// The maximum number of items the `chainHead_storage` can return +/// before paginations is required. +const MAX_ITER_ITEMS: usize = 10; + +/// The query type of an interation. +enum IterQueryType { + /// Iterating over (key, value) pairs. + Value, + /// Iterating over (key, hash) pairs. + Hash, +} + /// Generates the events of the `chainHead_storage` method. pub struct ChainHeadStorage { /// Substrate client. @@ -58,7 +70,10 @@ fn is_key_queryable(key: &[u8]) -> bool { } /// The result of making a query call. -type QueryResult = Result, ChainHeadStorageEvent>; +type QueryResult = Result>, ChainHeadStorageEvent>; + +/// The result of iterating over keys. +type QueryIterResult = Result>, ChainHeadStorageEvent>; impl ChainHeadStorage where @@ -72,7 +87,7 @@ where hash: Block::Hash, key: &StorageKey, child_key: Option<&ChildInfo>, - ) -> Option { + ) -> QueryResult { let result = if let Some(child_key) = child_key { self.client.child_storage(hash, child_key, key) } else { @@ -81,17 +96,15 @@ where result .map(|opt| { - opt.map(|storage_data| { - QueryResult::Ok(StorageResult:: { - key: hex_string(&key.0), - result: StorageResultType::Value(hex_string(&storage_data.0)), - }) - }) + QueryResult::Ok(opt.map(|storage_data| StorageResult:: { + key: hex_string(&key.0), + result: StorageResultType::Value(hex_string(&storage_data.0)), + })) }) .unwrap_or_else(|err| { - Some(QueryResult::Err(ChainHeadStorageEvent::::Error(ErrorEvent { + QueryResult::Err(ChainHeadStorageEvent::::Error(ErrorEvent { error: err.to_string(), - }))) + })) }) } @@ -101,7 +114,7 @@ where hash: Block::Hash, key: &StorageKey, child_key: Option<&ChildInfo>, - ) -> Option { + ) -> QueryResult { let result = if let Some(child_key) = child_key { self.client.child_storage_hash(hash, child_key, key) } else { @@ -110,36 +123,55 @@ where result .map(|opt| { - opt.map(|storage_data| { - QueryResult::Ok(StorageResult:: { - key: hex_string(&key.0), - result: StorageResultType::Hash(hex_string(&storage_data.as_ref())), - }) - }) + QueryResult::Ok(opt.map(|storage_data| StorageResult:: { + key: hex_string(&key.0), + result: StorageResultType::Hash(hex_string(&storage_data.as_ref())), + })) }) .unwrap_or_else(|err| { - Some(QueryResult::Err(ChainHeadStorageEvent::::Error(ErrorEvent { + QueryResult::Err(ChainHeadStorageEvent::::Error(ErrorEvent { error: err.to_string(), - }))) + })) }) } - /// Make the storage query. - fn query_storage( + /// Handle iterating over (key, value) or (key, hash) pairs. + fn query_storage_iter( &self, hash: Block::Hash, - query: &StorageQuery, + key: &StorageKey, child_key: Option<&ChildInfo>, - ) -> Option { - if !is_key_queryable(&query.key.0) { - return None + ty: IterQueryType, + ) -> QueryIterResult { + let mut keys_iter = if let Some(child_key) = child_key { + self.client.child_storage_keys(hash, child_key.to_owned(), Some(key), None) + } else { + self.client.storage_keys(hash, Some(key), None) } + .map_err(|err| { + ChainHeadStorageEvent::::Error(ErrorEvent { error: err.to_string() }) + })?; + + let mut ret = Vec::with_capacity(MAX_ITER_ITEMS); + let mut maximum_iters = MAX_ITER_ITEMS; + while let Some(key) = keys_iter.next() { + if maximum_iters > 0 { + maximum_iters -= 1; + } + + let result = match ty { + IterQueryType::Value => self.query_storage_value(hash, &key, child_key), + IterQueryType::Hash => self.query_storage_hash(hash, &key, child_key), + }?; - match query.queue_type { - StorageQueryType::Value => self.query_storage_value(hash, &query.key, child_key), - StorageQueryType::Hash => self.query_storage_hash(hash, &query.key, child_key), - _ => None, + let Some(result) = result else { + continue + }; + + ret.push(result); } + + QueryIterResult::Ok(ret) } /// Generate the block events for the `chainHead_storage` method. @@ -159,19 +191,56 @@ where let mut storage_results = Vec::with_capacity(items.len()); for item in items { - let Some(result) = self.query_storage(hash, &item, child_key.as_ref()) else { - continue - }; - - match result { - QueryResult::Ok(storage_result) => storage_results.push(storage_result), - QueryResult::Err(event) => { - let _ = sink.send(&event); - // If an error is encountered for any of the query items - // do not produce any other events. - return - }, + if !is_key_queryable(&item.key.0) { + continue } + + match item.queue_type { + StorageQueryType::Value => { + match self.query_storage_value(hash, &item.key, child_key.as_ref()) { + Ok(Some(value)) => storage_results.push(value), + Ok(None) => continue, + Err(err) => { + let _ = sink.send(&err); + return + }, + } + }, + StorageQueryType::Hash => + match self.query_storage_hash(hash, &item.key, child_key.as_ref()) { + Ok(Some(value)) => storage_results.push(value), + Ok(None) => continue, + Err(err) => { + let _ = sink.send(&err); + return + }, + }, + StorageQueryType::DescendantsValues => match self.query_storage_iter( + hash, + &item.key, + child_key.as_ref(), + IterQueryType::Value, + ) { + Ok(values) => storage_results.extend(values), + Err(err) => { + let _ = sink.send(&err); + return + }, + }, + StorageQueryType::DescendantsHashes => match self.query_storage_iter( + hash, + &item.key, + child_key.as_ref(), + IterQueryType::Hash, + ) { + Ok(values) => storage_results.extend(values), + Err(err) => { + let _ = sink.send(&err); + return + }, + }, + _ => continue, + }; } if !storage_results.is_empty() { From a71413e9f2ecb379172a9d1cb185f9b05f0e3c6f Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 24 Jul 2023 23:22:02 +0300 Subject: [PATCH 2/4] chainHead/tests: Multi query with iteration over keys Signed-off-by: Alexandru Vasile --- client/rpc-spec-v2/src/chain_head/tests.rs | 91 +++++++++++++++++++++- 1 file changed, 90 insertions(+), 1 deletion(-) diff --git a/client/rpc-spec-v2/src/chain_head/tests.rs b/client/rpc-spec-v2/src/chain_head/tests.rs index 4a0a14750717f..8cc47be7054af 100644 --- a/client/rpc-spec-v2/src/chain_head/tests.rs +++ b/client/rpc-spec-v2/src/chain_head/tests.rs @@ -606,7 +606,6 @@ async fn get_storage_hash() { let child_info = hex_string(&CHILD_STORAGE_KEY); let genesis_hash = format!("{:?}", client.genesis_hash()); let expected_hash = format!("{:?}", Blake2Hasher::hash(&CHILD_VALUE)); - println!("Expe: {:?}", expected_hash); let mut sub = api .subscribe( "chainHead_unstable_storage", @@ -625,6 +624,96 @@ async fn get_storage_hash() { assert_matches!(event, ChainHeadStorageEvent::Done); } +#[tokio::test] +async fn get_storage_multi_query_iter() { + let (mut client, api, mut block_sub, sub_id, _) = setup_api().await; + let key = hex_string(&KEY); + + // Import a new block with storage changes. + let mut builder = client.new_block(Default::default()).unwrap(); + builder.push_storage_change(KEY.to_vec(), Some(VALUE.to_vec())).unwrap(); + let block = builder.build().unwrap().block; + let block_hash = format!("{:?}", block.header.hash()); + client.import(BlockOrigin::Own, block.clone()).await.unwrap(); + + // Ensure the imported block is propagated and pinned for this subscription. + assert_matches!( + get_next_event::>(&mut block_sub).await, + FollowEvent::NewBlock(_) + ); + assert_matches!( + get_next_event::>(&mut block_sub).await, + FollowEvent::BestBlockChanged(_) + ); + + // Valid call with storage at the key. + let expected_hash = format!("{:?}", Blake2Hasher::hash(&VALUE)); + let expected_value = hex_string(&VALUE); + let mut sub = api + .subscribe( + "chainHead_unstable_storage", + rpc_params![ + &sub_id, + &block_hash, + vec![ + StorageQuery { + key: key.clone(), + queue_type: StorageQueryType::DescendantsHashes + }, + StorageQuery { + key: key.clone(), + queue_type: StorageQueryType::DescendantsValues + } + ] + ], + ) + .await + .unwrap(); + let event: ChainHeadStorageEvent = get_next_event(&mut sub).await; + assert_matches!(event, ChainHeadStorageEvent::::Items(res) if res.items.len() == 2 && + res.items[0].key == key && + res.items[1].key == key && + res.items[0].result == StorageResultType::Hash(expected_hash) && + res.items[1].result == StorageResultType::Value(expected_value)); + let event: ChainHeadStorageEvent = get_next_event(&mut sub).await; + assert_matches!(event, ChainHeadStorageEvent::Done); + + // Child value set in `setup_api`. + let child_info = hex_string(&CHILD_STORAGE_KEY); + let genesis_hash = format!("{:?}", client.genesis_hash()); + let expected_hash = format!("{:?}", Blake2Hasher::hash(&CHILD_VALUE)); + let expected_value = hex_string(&CHILD_VALUE); + let mut sub = api + .subscribe( + "chainHead_unstable_storage", + rpc_params![ + &sub_id, + &genesis_hash, + vec![ + StorageQuery { + key: key.clone(), + queue_type: StorageQueryType::DescendantsHashes + }, + StorageQuery { + key: key.clone(), + queue_type: StorageQueryType::DescendantsValues + } + ], + &child_info + ], + ) + .await + .unwrap(); + let event: ChainHeadStorageEvent = get_next_event(&mut sub).await; + assert_matches!(event, ChainHeadStorageEvent::::Items(res) if res.items.len() == 2 && + res.items[0].key == key && + res.items[1].key == key && + res.items[0].result == StorageResultType::Hash(expected_hash) && + res.items[1].result == StorageResultType::Value(expected_value)); + let event: ChainHeadStorageEvent = get_next_event(&mut sub).await; + assert_matches!(event, ChainHeadStorageEvent::Done); +} + #[tokio::test] async fn get_storage_value() { let (mut client, api, mut block_sub, sub_id, block) = setup_api().await; From 5dd936b0bec096b06d09d431cdef7229d6b6e2d4 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 25 Jul 2023 12:19:03 +0300 Subject: [PATCH 3/4] chainHead/events: Fix typo in StorageQuery Signed-off-by: Alexandru Vasile --- .../rpc-spec-v2/src/chain_head/chain_head.rs | 4 +-- .../src/chain_head/chain_head_storage.rs | 2 +- client/rpc-spec-v2/src/chain_head/event.rs | 12 +++---- client/rpc-spec-v2/src/chain_head/tests.rs | 36 +++++++++---------- 4 files changed, 27 insertions(+), 27 deletions(-) diff --git a/client/rpc-spec-v2/src/chain_head/chain_head.rs b/client/rpc-spec-v2/src/chain_head/chain_head.rs index 7acaaa07835f0..7f34bde68862c 100644 --- a/client/rpc-spec-v2/src/chain_head/chain_head.rs +++ b/client/rpc-spec-v2/src/chain_head/chain_head.rs @@ -300,7 +300,7 @@ where let items = items .into_iter() .map(|query| { - if query.queue_type == StorageQueryType::ClosestDescendantMerkleValue { + if query.query_type == StorageQueryType::ClosestDescendantMerkleValue { // Note: remove this once all types are implemented. let _ = sink.reject(ChainHeadRpcError::InvalidParam( "Storage query type not supported".into(), @@ -310,7 +310,7 @@ where Ok(StorageQuery { key: StorageKey(parse_hex_param(&mut sink, query.key)?), - queue_type: query.queue_type, + query_type: query.query_type, }) }) .collect::, _>>()?; diff --git a/client/rpc-spec-v2/src/chain_head/chain_head_storage.rs b/client/rpc-spec-v2/src/chain_head/chain_head_storage.rs index 863c62b338644..b7be33cae5ecc 100644 --- a/client/rpc-spec-v2/src/chain_head/chain_head_storage.rs +++ b/client/rpc-spec-v2/src/chain_head/chain_head_storage.rs @@ -195,7 +195,7 @@ where continue } - match item.queue_type { + match item.query_type { StorageQueryType::Value => { match self.query_storage_value(hash, &item.key, child_key.as_ref()) { Ok(Some(value)) => storage_results.push(value), diff --git a/client/rpc-spec-v2/src/chain_head/event.rs b/client/rpc-spec-v2/src/chain_head/event.rs index a141eee195ed6..0199edee42342 100644 --- a/client/rpc-spec-v2/src/chain_head/event.rs +++ b/client/rpc-spec-v2/src/chain_head/event.rs @@ -249,7 +249,7 @@ pub struct StorageQuery { pub key: Key, /// The type of the storage query. #[serde(rename = "type")] - pub queue_type: StorageQueryType, + pub query_type: StorageQueryType, } /// The type of the storage query. @@ -558,7 +558,7 @@ mod tests { #[test] fn chain_head_storage_query() { // Item with Value. - let item = StorageQuery { key: "0x1", queue_type: StorageQueryType::Value }; + let item = StorageQuery { key: "0x1", query_type: StorageQueryType::Value }; // Encode let ser = serde_json::to_string(&item).unwrap(); let exp = r#"{"key":"0x1","type":"value"}"#; @@ -568,7 +568,7 @@ mod tests { assert_eq!(dec, item); // Item with Hash. - let item = StorageQuery { key: "0x1", queue_type: StorageQueryType::Hash }; + let item = StorageQuery { key: "0x1", query_type: StorageQueryType::Hash }; // Encode let ser = serde_json::to_string(&item).unwrap(); let exp = r#"{"key":"0x1","type":"hash"}"#; @@ -578,7 +578,7 @@ mod tests { assert_eq!(dec, item); // Item with DescendantsValues. - let item = StorageQuery { key: "0x1", queue_type: StorageQueryType::DescendantsValues }; + let item = StorageQuery { key: "0x1", query_type: StorageQueryType::DescendantsValues }; // Encode let ser = serde_json::to_string(&item).unwrap(); let exp = r#"{"key":"0x1","type":"descendants-values"}"#; @@ -588,7 +588,7 @@ mod tests { assert_eq!(dec, item); // Item with DescendantsHashes. - let item = StorageQuery { key: "0x1", queue_type: StorageQueryType::DescendantsHashes }; + let item = StorageQuery { key: "0x1", query_type: StorageQueryType::DescendantsHashes }; // Encode let ser = serde_json::to_string(&item).unwrap(); let exp = r#"{"key":"0x1","type":"descendants-hashes"}"#; @@ -599,7 +599,7 @@ mod tests { // Item with Merkle. let item = - StorageQuery { key: "0x1", queue_type: StorageQueryType::ClosestDescendantMerkleValue }; + StorageQuery { key: "0x1", query_type: StorageQueryType::ClosestDescendantMerkleValue }; // Encode let ser = serde_json::to_string(&item).unwrap(); let exp = r#"{"key":"0x1","type":"closest-descendant-merkle-value"}"#; diff --git a/client/rpc-spec-v2/src/chain_head/tests.rs b/client/rpc-spec-v2/src/chain_head/tests.rs index 8cc47be7054af..c4cc17dac648d 100644 --- a/client/rpc-spec-v2/src/chain_head/tests.rs +++ b/client/rpc-spec-v2/src/chain_head/tests.rs @@ -527,7 +527,7 @@ async fn get_storage_hash() { rpc_params![ "invalid_sub_id", &invalid_hash, - vec![StorageQuery { key: key.clone(), queue_type: StorageQueryType::Hash }] + vec![StorageQuery { key: key.clone(), query_type: StorageQueryType::Hash }] ], ) .await @@ -542,7 +542,7 @@ async fn get_storage_hash() { rpc_params![ &sub_id, &invalid_hash, - vec![StorageQuery { key: key.clone(), queue_type: StorageQueryType::Hash }] + vec![StorageQuery { key: key.clone(), query_type: StorageQueryType::Hash }] ], ) .await @@ -558,7 +558,7 @@ async fn get_storage_hash() { rpc_params![ &sub_id, &block_hash, - vec![StorageQuery { key: key.clone(), queue_type: StorageQueryType::Hash }] + vec![StorageQuery { key: key.clone(), query_type: StorageQueryType::Hash }] ], ) .await @@ -592,7 +592,7 @@ async fn get_storage_hash() { rpc_params![ &sub_id, &block_hash, - vec![StorageQuery { key: key.clone(), queue_type: StorageQueryType::Hash }] + vec![StorageQuery { key: key.clone(), query_type: StorageQueryType::Hash }] ], ) .await @@ -612,7 +612,7 @@ async fn get_storage_hash() { rpc_params![ &sub_id, &genesis_hash, - vec![StorageQuery { key: key.clone(), queue_type: StorageQueryType::Hash }], + vec![StorageQuery { key: key.clone(), query_type: StorageQueryType::Hash }], &child_info ], ) @@ -658,11 +658,11 @@ async fn get_storage_multi_query_iter() { vec![ StorageQuery { key: key.clone(), - queue_type: StorageQueryType::DescendantsHashes + query_type: StorageQueryType::DescendantsHashes }, StorageQuery { key: key.clone(), - queue_type: StorageQueryType::DescendantsValues + query_type: StorageQueryType::DescendantsValues } ] ], @@ -692,11 +692,11 @@ async fn get_storage_multi_query_iter() { vec![ StorageQuery { key: key.clone(), - queue_type: StorageQueryType::DescendantsHashes + query_type: StorageQueryType::DescendantsHashes }, StorageQuery { key: key.clone(), - queue_type: StorageQueryType::DescendantsValues + query_type: StorageQueryType::DescendantsValues } ], &child_info @@ -728,7 +728,7 @@ async fn get_storage_value() { rpc_params![ "invalid_sub_id", &invalid_hash, - vec![StorageQuery { key: key.clone(), queue_type: StorageQueryType::Value }] + vec![StorageQuery { key: key.clone(), query_type: StorageQueryType::Value }] ], ) .await @@ -743,7 +743,7 @@ async fn get_storage_value() { rpc_params![ &sub_id, &invalid_hash, - vec![StorageQuery { key: key.clone(), queue_type: StorageQueryType::Value }] + vec![StorageQuery { key: key.clone(), query_type: StorageQueryType::Value }] ], ) .await @@ -759,7 +759,7 @@ async fn get_storage_value() { rpc_params![ &sub_id, &block_hash, - vec![StorageQuery { key: key.clone(), queue_type: StorageQueryType::Value }] + vec![StorageQuery { key: key.clone(), query_type: StorageQueryType::Value }] ], ) .await @@ -793,7 +793,7 @@ async fn get_storage_value() { rpc_params![ &sub_id, &block_hash, - vec![StorageQuery { key: key.clone(), queue_type: StorageQueryType::Value }] + vec![StorageQuery { key: key.clone(), query_type: StorageQueryType::Value }] ], ) .await @@ -813,7 +813,7 @@ async fn get_storage_value() { rpc_params![ &sub_id, &genesis_hash, - vec![StorageQuery { key: key.clone(), queue_type: StorageQueryType::Value }], + vec![StorageQuery { key: key.clone(), query_type: StorageQueryType::Value }], &child_info ], ) @@ -841,7 +841,7 @@ async fn get_storage_wrong_key() { rpc_params![ &sub_id, &block_hash, - vec![StorageQuery { key: prefixed_key, queue_type: StorageQueryType::Value }] + vec![StorageQuery { key: prefixed_key, query_type: StorageQueryType::Value }] ], ) .await @@ -859,7 +859,7 @@ async fn get_storage_wrong_key() { rpc_params![ &sub_id, &block_hash, - vec![StorageQuery { key: prefixed_key, queue_type: StorageQueryType::Value }] + vec![StorageQuery { key: prefixed_key, query_type: StorageQueryType::Value }] ], ) .await @@ -877,7 +877,7 @@ async fn get_storage_wrong_key() { rpc_params![ &sub_id, &block_hash, - vec![StorageQuery { key: key.clone(), queue_type: StorageQueryType::Value }], + vec![StorageQuery { key: key.clone(), query_type: StorageQueryType::Value }], &prefixed_key ], ) @@ -896,7 +896,7 @@ async fn get_storage_wrong_key() { rpc_params![ &sub_id, &block_hash, - vec![StorageQuery { key, queue_type: StorageQueryType::Value }], + vec![StorageQuery { key, query_type: StorageQueryType::Value }], &prefixed_key ], ) From f37438977635e17be08f221c666821e142443332 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 25 Jul 2023 12:20:30 +0300 Subject: [PATCH 4/4] chainHead: Take 10 from key iterator Signed-off-by: Alexandru Vasile --- .../src/chain_head/chain_head_storage.rs | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/client/rpc-spec-v2/src/chain_head/chain_head_storage.rs b/client/rpc-spec-v2/src/chain_head/chain_head_storage.rs index b7be33cae5ecc..310e48901876c 100644 --- a/client/rpc-spec-v2/src/chain_head/chain_head_storage.rs +++ b/client/rpc-spec-v2/src/chain_head/chain_head_storage.rs @@ -143,7 +143,7 @@ where child_key: Option<&ChildInfo>, ty: IterQueryType, ) -> QueryIterResult { - let mut keys_iter = if let Some(child_key) = child_key { + let keys_iter = if let Some(child_key) = child_key { self.client.child_storage_keys(hash, child_key.to_owned(), Some(key), None) } else { self.client.storage_keys(hash, Some(key), None) @@ -153,22 +153,16 @@ where })?; let mut ret = Vec::with_capacity(MAX_ITER_ITEMS); - let mut maximum_iters = MAX_ITER_ITEMS; + let mut keys_iter = keys_iter.take(MAX_ITER_ITEMS); while let Some(key) = keys_iter.next() { - if maximum_iters > 0 { - maximum_iters -= 1; - } - let result = match ty { IterQueryType::Value => self.query_storage_value(hash, &key, child_key), IterQueryType::Hash => self.query_storage_hash(hash, &key, child_key), }?; - let Some(result) = result else { - continue - }; - - ret.push(result); + if let Some(result) = result { + ret.push(result); + } } QueryIterResult::Ok(ret)