Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
chainHead_storage: Backport queries for value types (#14551)
Browse files Browse the repository at this point in the history
* chainHead/events: Add storage params and events

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead/tests: Check storage events serialization / deserialization

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead/error: Add error for invalid WaitForContinue storage call

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead/storage: Use new items params

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead/tests: Adjust storage tests to the new API

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead/events: Generalize StorageQuery by provided key

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chain_head: Add dedicated ChainHeadStorage client for queries

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead/storage: Implement queries for hashes of values

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead/tests: Check storage queries for hashes of values

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead: Improve API documentation wrt multiple entries

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead/event: Rename StorageQueue ty to queue_ty

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chianHead: Add helper to encode chainHead results as hex str

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Update client/rpc-spec-v2/src/chain_head/error.rs

Co-authored-by: Sebastian Kunert <skunert49@gmail.com>

* chainHead: Change the `queryResult` to a plain `Result`

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead: Stop producing events after the first error

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead: Change child_key to child_trie API param

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

---------

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Co-authored-by: Sebastian Kunert <skunert49@gmail.com>
  • Loading branch information
2 people authored and Ank4n committed Jul 22, 2023
1 parent c3d7615 commit 35203d8
Show file tree
Hide file tree
Showing 7 changed files with 703 additions and 110 deletions.
8 changes: 4 additions & 4 deletions client/rpc-spec-v2/src/chain_head/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
#![allow(non_snake_case)]

//! API trait of the chain head.
use crate::chain_head::event::{ChainHeadEvent, FollowEvent, NetworkConfig};
use crate::chain_head::event::{ChainHeadEvent, FollowEvent, NetworkConfig, StorageQuery};
use jsonrpsee::{core::RpcResult, proc_macros::rpc};

#[rpc(client, server)]
Expand Down Expand Up @@ -86,7 +86,7 @@ pub trait ChainHeadApi<Hash> {
#[method(name = "chainHead_unstable_genesisHash", blocking)]
fn chain_head_unstable_genesis_hash(&self) -> RpcResult<String>;

/// Return a storage entry at a specific block's state.
/// Returns storage entries at a specific block's state.
///
/// # Unstable
///
Expand All @@ -100,8 +100,8 @@ pub trait ChainHeadApi<Hash> {
&self,
follow_subscription: String,
hash: Hash,
key: String,
child_key: Option<String>,
items: Vec<StorageQuery<String>>,
child_trie: Option<String>,
network_config: Option<NetworkConfig>,
);

Expand Down
99 changes: 40 additions & 59 deletions client/rpc-spec-v2/src/chain_head/chain_head.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::{
chain_head_follow::ChainHeadFollower,
error::Error as ChainHeadRpcError,
event::{ChainHeadEvent, ChainHeadResult, ErrorEvent, FollowEvent, NetworkConfig},
hex_string,
subscription::{SubscriptionManagement, SubscriptionManagementError},
},
SubscriptionTaskExecutor,
Expand All @@ -42,10 +43,15 @@ use sc_client_api::{
};
use sp_api::CallApiAt;
use sp_blockchain::{Error as BlockChainError, HeaderBackend, HeaderMetadata};
use sp_core::{hexdisplay::HexDisplay, storage::well_known_keys, traits::CallContext, Bytes};
use sp_core::{traits::CallContext, Bytes};
use sp_runtime::traits::Block as BlockT;
use std::{marker::PhantomData, sync::Arc, time::Duration};

use super::{
chain_head_storage::ChainHeadStorage,
event::{ChainHeadStorageEvent, StorageQuery, StorageQueryType},
};

pub(crate) const LOG_TARGET: &str = "rpc-spec-v2";

/// An API for chain head RPC calls.
Expand Down Expand Up @@ -74,7 +80,7 @@ impl<BE: Backend<Block>, Block: BlockT, Client> ChainHead<BE, Block, Client> {
max_pinned_blocks: usize,
max_pinned_duration: Duration,
) -> Self {
let genesis_hash = format!("0x{:?}", HexDisplay::from(&genesis_hash.as_ref()));
let genesis_hash = hex_string(&genesis_hash.as_ref());

Self {
client,
Expand Down Expand Up @@ -229,7 +235,7 @@ where
let event = match client.block(hash) {
Ok(Some(signed_block)) => {
let extrinsics = signed_block.block.extrinsics();
let result = format!("0x{:?}", HexDisplay::from(&extrinsics.encode()));
let result = hex_string(&extrinsics.encode());
ChainHeadEvent::Done(ChainHeadResult { result })
},
Ok(None) => {
Expand Down Expand Up @@ -272,7 +278,7 @@ where

self.client
.header(hash)
.map(|opt_header| opt_header.map(|h| format!("0x{:?}", HexDisplay::from(&h.encode()))))
.map(|opt_header| opt_header.map(|h| hex_string(&h.encode())))
.map_err(ChainHeadRpcError::FetchBlockHeader)
.map_err(Into::into)
}
Expand All @@ -286,14 +292,33 @@ where
mut sink: SubscriptionSink,
follow_subscription: String,
hash: Block::Hash,
key: String,
child_key: Option<String>,
items: Vec<StorageQuery<String>>,
child_trie: Option<String>,
_network_config: Option<NetworkConfig>,
) -> SubscriptionResult {
let key = StorageKey(parse_hex_param(&mut sink, key)?);
// Gain control over parameter parsing and returned error.
let items = items
.into_iter()
.map(|query| {
if query.queue_type != StorageQueryType::Value &&
query.queue_type != StorageQueryType::Hash
{
// Note: remove this once all types are implemented.
let _ = sink.reject(ChainHeadRpcError::InvalidParam(
"Storage query type not supported".into(),
));
return Err(SubscriptionEmptyError)
}

let child_key = child_key
.map(|child_key| parse_hex_param(&mut sink, child_key))
Ok(StorageQuery {
key: StorageKey(parse_hex_param(&mut sink, query.key)?),
queue_type: query.queue_type,
})
})
.collect::<Result<Vec<_>, _>>()?;

let child_trie = child_trie
.map(|child_trie| parse_hex_param(&mut sink, child_trie))
.transpose()?
.map(ChildInfo::new_default_from_vec);

Expand All @@ -304,7 +329,7 @@ where
Ok(block) => block,
Err(SubscriptionManagementError::SubscriptionAbsent) => {
// Invalid invalid subscription ID.
let _ = sink.send(&ChainHeadEvent::<String>::Disjoint);
let _ = sink.send(&ChainHeadStorageEvent::<String>::Disjoint);
return Ok(())
},
Err(SubscriptionManagementError::BlockHashAbsent) => {
Expand All @@ -313,63 +338,19 @@ where
return Ok(())
},
Err(error) => {
let _ = sink.send(&ChainHeadEvent::<String>::Error(ErrorEvent {
let _ = sink.send(&ChainHeadStorageEvent::<String>::Error(ErrorEvent {
error: error.to_string(),
}));
return Ok(())
},
};

let storage_client = ChainHeadStorage::<Client, Block, BE>::new(client);

let fut = async move {
let _block_guard = block_guard;
// The child key is provided, use the key to query the child trie.
if let Some(child_key) = child_key {
// The child key must not be prefixed with ":child_storage:" nor
// ":child_storage:default:".
if well_known_keys::is_default_child_storage_key(child_key.storage_key()) ||
well_known_keys::is_child_storage_key(child_key.storage_key())
{
let _ = sink
.send(&ChainHeadEvent::Done(ChainHeadResult { result: None::<String> }));
return
}

let res = client
.child_storage(hash, &child_key, &key)
.map(|result| {
let result =
result.map(|storage| format!("0x{:?}", HexDisplay::from(&storage.0)));
ChainHeadEvent::Done(ChainHeadResult { result })
})
.unwrap_or_else(|error| {
ChainHeadEvent::Error(ErrorEvent { error: error.to_string() })
});
let _ = sink.send(&res);
return
}

// The main key must not be prefixed with b":child_storage:" nor
// b":child_storage:default:".
if well_known_keys::is_default_child_storage_key(&key.0) ||
well_known_keys::is_child_storage_key(&key.0)
{
let _ =
sink.send(&ChainHeadEvent::Done(ChainHeadResult { result: None::<String> }));
return
}

// Main root trie storage query.
let res = client
.storage(hash, &key)
.map(|result| {
let result =
result.map(|storage| format!("0x{:?}", HexDisplay::from(&storage.0)));
ChainHeadEvent::Done(ChainHeadResult { result })
})
.unwrap_or_else(|error| {
ChainHeadEvent::Error(ErrorEvent { error: error.to_string() })
});
let _ = sink.send(&res);
storage_client.generate_events(sink, hash, items, child_trie);
};

self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed());
Expand Down Expand Up @@ -423,7 +404,7 @@ where
.executor()
.call(hash, &function, &call_parameters, CallContext::Offchain)
.map(|result| {
let result = format!("0x{:?}", HexDisplay::from(&result));
let result = hex_string(&result);
ChainHeadEvent::Done(ChainHeadResult { result })
})
.unwrap_or_else(|error| {
Expand Down
184 changes: 184 additions & 0 deletions client/rpc-spec-v2/src/chain_head/chain_head_storage.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
// This file is part of Substrate.

// Copyright (C) Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0

// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

//! Implementation of the `chainHead_storage` method.

use std::{marker::PhantomData, sync::Arc};

use jsonrpsee::SubscriptionSink;
use sc_client_api::{Backend, ChildInfo, StorageKey, StorageProvider};
use sp_api::BlockT;
use sp_core::storage::well_known_keys;

use super::{
event::{
ChainHeadStorageEvent, ItemsEvent, StorageQuery, StorageQueryType, StorageResult,
StorageResultType,
},
hex_string, ErrorEvent,
};

/// Generates the events of the `chainHead_storage` method.
pub struct ChainHeadStorage<Client, Block, BE> {
/// Substrate client.
client: Arc<Client>,
_phantom: PhantomData<(Block, BE)>,
}

impl<Client, Block, BE> ChainHeadStorage<Client, Block, BE> {
/// Constructs a new [`ChainHeadStorage`].
pub fn new(client: Arc<Client>) -> Self {
Self { client, _phantom: PhantomData }
}
}

/// Checks if the provided key (main or child key) is valid
/// for queries.
///
/// Keys that are identical to `:child_storage:` or `:child_storage:default:`
/// are not queryable.
fn is_key_queryable(key: &[u8]) -> bool {
!well_known_keys::is_default_child_storage_key(key) &&
!well_known_keys::is_child_storage_key(key)
}

/// The result of making a query call.
type QueryResult = Result<StorageResult<String>, ChainHeadStorageEvent<String>>;

impl<Client, Block, BE> ChainHeadStorage<Client, Block, BE>
where
Block: BlockT + 'static,
BE: Backend<Block> + 'static,
Client: StorageProvider<Block, BE> + 'static,
{
/// Fetch the value from storage.
fn query_storage_value(
&self,
hash: Block::Hash,
key: &StorageKey,
child_key: Option<&ChildInfo>,
) -> Option<QueryResult> {
let result = if let Some(child_key) = child_key {
self.client.child_storage(hash, child_key, key)
} else {
self.client.storage(hash, key)
};

result
.map(|opt| {
opt.map(|storage_data| {
QueryResult::Ok(StorageResult::<String> {
key: hex_string(&key.0),
result: StorageResultType::Value(hex_string(&storage_data.0)),
})
})
})
.unwrap_or_else(|err| {
Some(QueryResult::Err(ChainHeadStorageEvent::<String>::Error(ErrorEvent {
error: err.to_string(),
})))
})
}

/// Fetch the hash of a value from storage.
fn query_storage_hash(
&self,
hash: Block::Hash,
key: &StorageKey,
child_key: Option<&ChildInfo>,
) -> Option<QueryResult> {
let result = if let Some(child_key) = child_key {
self.client.child_storage_hash(hash, child_key, key)
} else {
self.client.storage_hash(hash, key)
};

result
.map(|opt| {
opt.map(|storage_data| {
QueryResult::Ok(StorageResult::<String> {
key: hex_string(&key.0),
result: StorageResultType::Hash(hex_string(&storage_data.as_ref())),
})
})
})
.unwrap_or_else(|err| {
Some(QueryResult::Err(ChainHeadStorageEvent::<String>::Error(ErrorEvent {
error: err.to_string(),
})))
})
}

/// Make the storage query.
fn query_storage(
&self,
hash: Block::Hash,
query: &StorageQuery<StorageKey>,
child_key: Option<&ChildInfo>,
) -> Option<QueryResult> {
if !is_key_queryable(&query.key.0) {
return None
}

match query.queue_type {
StorageQueryType::Value => self.query_storage_value(hash, &query.key, child_key),
StorageQueryType::Hash => self.query_storage_hash(hash, &query.key, child_key),
_ => None,
}
}

/// Generate the block events for the `chainHead_storage` method.
pub fn generate_events(
&self,
mut sink: SubscriptionSink,
hash: Block::Hash,
items: Vec<StorageQuery<StorageKey>>,
child_key: Option<ChildInfo>,
) {
if let Some(child_key) = child_key.as_ref() {
if !is_key_queryable(child_key.storage_key()) {
let _ = sink.send(&ChainHeadStorageEvent::<String>::Done);
return
}
}

let mut storage_results = Vec::with_capacity(items.len());
for item in items {
let Some(result) = self.query_storage(hash, &item, child_key.as_ref()) else {
continue
};

match result {
QueryResult::Ok(storage_result) => storage_results.push(storage_result),
QueryResult::Err(event) => {
let _ = sink.send(&event);
// If an error is encountered for any of the query items
// do not produce any other events.
return
},
}
}

if !storage_results.is_empty() {
let event = ChainHeadStorageEvent::Items(ItemsEvent { items: storage_results });
let _ = sink.send(&event);
}

let _ = sink.send(&ChainHeadStorageEvent::<String>::Done);
}
}
Loading

0 comments on commit 35203d8

Please sign in to comment.