Skip to content

Commit

Permalink
Support Storage Chains (#292)
Browse files Browse the repository at this point in the history
* last few tweaks for releases

* add version next to project name

* fix prepare binary

* update dependencies; fix metadata

* add block_indexed_body

* begin support for storage chains

* finish support for storage chains

* add test for concat()
  • Loading branch information
insipx authored Jun 24, 2021
1 parent b061311 commit 70ea921
Show file tree
Hide file tree
Showing 8 changed files with 130 additions and 27 deletions.
4 changes: 2 additions & 2 deletions bin/polkadot-archive/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion substrate-archive-backend/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ impl ReadOnlyDb for SecondaryRocksDb {
NUM_COLUMNS,
other_col_budget,
);
Self::open(db_config, &path)
Self::open(db_config, path)
}
}

Expand Down
12 changes: 10 additions & 2 deletions substrate-archive-backend/src/frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use sc_client_api::{
ExecutionStrategy,
};
use sc_executor::{NativeExecutionDispatch, NativeExecutor, WasmExecutionMethod};
use sc_service::{ChainSpec, ClientConfig, LocalCallExecutor};
use sc_service::{ChainSpec, ClientConfig, LocalCallExecutor, TransactionStorageMode};
use sp_api::ConstructRuntimeApi;
use sp_core::traits::SpawnNamed;
use sp_runtime::traits::{BlakeTwo256, Block as BlockT};
Expand Down Expand Up @@ -86,6 +86,9 @@ pub struct RuntimeConfig {
/// If both are in use, the `wasm_runtime_overrides` takes precedence.
#[serde(skip)]
code_substitutes: HashMap<String, Vec<u8>>,
/// Method of storing and retrieving transactions(extrinsics).
#[serde(skip, default = "default_storage_mode")]
pub storage_mode: TransactionStorageMode,
}

impl RuntimeConfig {
Expand All @@ -103,6 +106,7 @@ impl Default for RuntimeConfig {
wasm_pages: None,
wasm_runtime_overrides: None,
code_substitutes: Default::default(),
storage_mode: TransactionStorageMode::BlockBody,
}
}
}
Expand All @@ -112,6 +116,10 @@ fn default_block_workers() -> usize {
num_cpus::get()
}

const fn default_storage_mode() -> TransactionStorageMode {
TransactionStorageMode::BlockBody
}

impl<B> TryFrom<RuntimeConfig> for ClientConfig<B>
where
B: BlockT,
Expand Down Expand Up @@ -154,7 +162,7 @@ where
Dispatch: NativeExecutionDispatch + 'static,
<Runtime::RuntimeApi as sp_api::ApiExt<Block>>::StateBackend: sp_api::StateBackend<BlakeTwo256>,
{
let backend = Arc::new(ReadOnlyBackend::new(db, true));
let backend = Arc::new(ReadOnlyBackend::new(db, true, config.storage_mode));

let executor = NativeExecutor::<Dispatch>::new(config.exec_method.into(), config.wasm_pages, config.block_workers);
let executor =
Expand Down
7 changes: 5 additions & 2 deletions substrate-archive-backend/src/read_only_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use hash_db::Prefix;
use kvdb::DBValue;

use sc_client_api::backend::StateBackend;
use sc_service::TransactionStorageMode;
use sp_blockchain::{Backend as _, HeaderBackend as _};
use sp_runtime::{
generic::{BlockId, SignedBlock},
Expand All @@ -46,9 +47,11 @@ pub use self::state_backend::TrieState;
use self::state_backend::{DbState, StateVault};
use crate::{database::ReadOnlyDb, error::Result, util::columns};

/// Backend supporting a generic database that may only read.
pub struct ReadOnlyBackend<Block: BlockT, D: ReadOnlyDb> {
db: Arc<D>,
storage: Arc<StateVault<Block, D>>,
storage_mode: TransactionStorageMode,
}

impl<Block, D> ReadOnlyBackend<Block, D>
Expand All @@ -57,9 +60,9 @@ where
Block::Header: HeaderT,
D: ReadOnlyDb + 'static,
{
pub fn new(db: Arc<D>, prefix_keys: bool) -> Self {
pub fn new(db: Arc<D>, prefix_keys: bool, storage_mode: TransactionStorageMode) -> Self {
let vault = Arc::new(StateVault::new(db.clone(), prefix_keys));
Self { db, storage: vault }
Self { db, storage: vault, storage_mode }
}

/// get a reference to the backing database
Expand Down
116 changes: 104 additions & 12 deletions substrate-archive-backend/src/read_only_backend/blockchain_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
use std::sync::Arc;

use codec::Decode;
use codec::{Decode, Encode};

use sc_service::TransactionStorageMode;
use sp_blockchain::{
Backend as BlockchainBackend, BlockStatus, Cache, CachedHeaderMetadata, Error as BlockchainError, HeaderBackend,
HeaderMetadata, Info,
Expand All @@ -33,22 +34,59 @@ use sp_runtime::{
use crate::{
database::ReadOnlyDb,
read_only_backend::ReadOnlyBackend,
util::{self, columns},
util::{self, columns, read_db},
};

type ChainResult<T> = Result<T, BlockchainError>;

#[derive(Debug, Encode, Decode)]
struct ExtrinsicHeader {
hash: sp_core::H256,
data: Vec<u8>,
}

impl<Block: BlockT, D: ReadOnlyDb> BlockchainBackend<Block> for ReadOnlyBackend<Block, D> {
fn body(&self, id: BlockId<Block>) -> ChainResult<Option<Vec<<Block as BlockT>::Extrinsic>>> {
let res = util::read_db::<Block, D>(&*self.db, columns::KEY_LOOKUP, columns::BODY, id)
.map_err(|e| BlockchainError::Backend(e.to_string()))?;

match res {
Some(body) => match Decode::decode(&mut &body[..]) {
let body = match read_db(&*self.db, columns::KEY_LOOKUP, columns::BODY, id)
.map_err(|e| BlockchainError::Backend(e.to_string()))?
{
Some(body) => body,
None => return Ok(None),
};
match self.storage_mode {
TransactionStorageMode::BlockBody => match Decode::decode(&mut &body[..]) {
Ok(body) => Ok(Some(body)),
Err(_) => Err(BlockchainError::Backend("Could not decode extrinsics".into())),
Err(err) => return Err(BlockchainError::Backend(format!("Error decoding body: {}", err))),
},
TransactionStorageMode::StorageChain => match Vec::<ExtrinsicHeader>::decode(&mut &body[..]) {
Ok(index) => {
let extrinsics: ChainResult<Vec<Block::Extrinsic>> = index
.into_iter()
.map(|ExtrinsicHeader { hash, data }| {
let decode_result = if hash != Default::default() {
match self.db.get(columns::TRANSACTION, hash.as_ref()) {
Some(t) => {
let input = [&data[..], &t[..]].concat();
Block::Extrinsic::decode(&mut input.as_slice())
}
None => {
return Err(BlockchainError::Backend(format!(
"Missing indexed transaction {:?}",
hash
)))
}
}
} else {
Block::Extrinsic::decode(&mut data.as_ref())
};
decode_result
.map_err(|err| BlockchainError::Backend(format!("Error decoding extrinsic: {}", err)))
})
.collect();
Ok(Some(extrinsics?))
}
Err(err) => return Err(BlockchainError::Backend(format!("Error decoding body list: {}", err))),
},
None => Ok(None),
}
}

Expand Down Expand Up @@ -92,8 +130,38 @@ impl<Block: BlockT, D: ReadOnlyDb> BlockchainBackend<Block> for ReadOnlyBackend<
Ok(self.db.get(columns::TRANSACTION, hash.as_ref()))
}

fn block_indexed_body(&self, _id: BlockId<Block>) -> ChainResult<Option<Vec<Vec<u8>>>> {
unimplemented!()
fn block_indexed_body(&self, id: BlockId<Block>) -> ChainResult<Option<Vec<Vec<u8>>>> {
match self.storage_mode {
TransactionStorageMode::BlockBody => Ok(None),
TransactionStorageMode::StorageChain => {
let body = match read_db(&*self.db, columns::KEY_LOOKUP, columns::BODY, id)
.map_err(|e| BlockchainError::Backend(e.to_string()))?
{
Some(body) => body,
None => return Ok(None),
};
match Vec::<ExtrinsicHeader>::decode(&mut &body[..]) {
Ok(index) => {
let mut transactions = Vec::new();
for ExtrinsicHeader { hash, .. } in index.into_iter() {
if hash != Default::default() {
match self.db.get(columns::TRANSACTION, hash.as_ref()) {
Some(t) => transactions.push(t),
None => {
return Err(BlockchainError::Backend(format!(
"Missing indexed transaction {:?}",
hash
)))
}
}
}
}
Ok(Some(transactions))
}
Err(err) => return Err(BlockchainError::Backend(format!("Error decoding body list: {}", err))),
}
}
}
}
}

Expand Down Expand Up @@ -150,4 +218,28 @@ impl<Block: BlockT, D: ReadOnlyDb> HeaderMetadata<Block> for ReadOnlyBackend<Blo
}

#[cfg(test)]
mod tests {}
mod tests {
use codec::Input;

#[test]
fn concat_imitates_join_input() {
let buf1 = [1, 2, 3, 4];
let buf2 = [5, 6, 7, 8];
let mut test = [0, 0, 0];
let joined = [buf1.as_ref(), buf2.as_ref()].concat();
let mut joined = joined.as_slice();
assert_eq!(joined.remaining_len().unwrap(), Some(8));

joined.read(&mut test).unwrap();
assert_eq!(test, [1, 2, 3]);
assert_eq!(joined.remaining_len().unwrap(), Some(5));

joined.read(&mut test).unwrap();
assert_eq!(test, [4, 5, 6]);
assert_eq!(joined.remaining_len().unwrap(), Some(2));

joined.read(&mut test[0..2]).unwrap();
assert_eq!(test, [7, 8, 6]);
assert_eq!(joined.remaining_len().unwrap(), Some(0));
}
}
2 changes: 1 addition & 1 deletion substrate-archive-backend/src/runtime_version_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ impl<B: BlockT, D: ReadOnlyDb + 'static> RuntimeVersionCache<B, D> {
let version = decode_version(self.exec.read_runtime_version(&code, &mut ext)?.as_slice())?;
log::debug!("Registered a new runtime version: {:?}", version);
self.versions.rcu(|cache| {
let mut cache = HashMap::clone(&cache);
let mut cache = HashMap::clone(cache);
cache.insert(code_hash, version.clone());
cache
});
Expand Down
8 changes: 4 additions & 4 deletions substrate-archive/src/archive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ where
logger::init(self.config.log.clone())?;
log::debug!("Archive Config: {:?}", self.config);

// config chain
// configure chain
const CHAIN_DATA_DB: &str = "CHAIN_DATA_DB";
let chain_path = self
.config
Expand All @@ -379,15 +379,15 @@ where
)?;
let db = Arc::new(DB::open_database(chain_path, self.config.chain.cache_size, db_path)?);

// config runtime
// configure runtime
self.config.runtime.wasm_runtime_overrides = self.config.wasm_tracing.as_ref().and_then(|c| c.folder.clone());
if let Some(spec) = self.config.chain.spec {
self.config.runtime.set_code_substitutes(spec.as_ref());
}

// configure substrate client and backend
let client = Arc::new(runtime_api::<B, R, D, DB>(db.clone(), self.config.runtime)?);
let backend = Arc::new(ReadOnlyBackend::new(db, true));
let backend = Arc::new(ReadOnlyBackend::new(db.clone(), true, self.config.runtime.storage_mode));
let client = Arc::new(runtime_api::<B, R, D, DB>(db, self.config.runtime)?);
Self::startup_info(&*client, &*backend)?;

// config postgres database
Expand Down
6 changes: 3 additions & 3 deletions substrate-archive/src/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ where

fn execute(self) -> Result<BlockChanges<Block>, ArchiveError> {
let BlockPrep { block, state, hash, parent_hash, number } =
Self::prepare_block(self.block, &self.backend, &self.id)?;
Self::prepare_block(self.block, self.backend, &self.id)?;

self.api.execute_block(&self.id, block)?;
let storage_changes =
Expand All @@ -191,10 +191,10 @@ where

fn execute_with_tracing(self, targets: &str) -> Result<(BlockChanges<Block>, Traces), ArchiveError> {
let BlockExecutor { block, backend, id, api } = self;
let BlockPrep { block, state, hash, parent_hash, number } = Self::prepare_block(block, &backend, &id)?;
let BlockPrep { block, state, hash, parent_hash, number } = Self::prepare_block(block, backend, &id)?;

let span_events = Arc::new(Mutex::new(SpansAndEvents { spans: Vec::new(), events: Vec::new() }));
let handler = TraceHandler::new(&targets, span_events);
let handler = TraceHandler::new(targets, span_events);
let dispatcher_span = tracing::debug_span!(
target: "state_tracing",
"execute_block",
Expand Down

0 comments on commit 70ea921

Please sign in to comment.