Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

some suggestions #12532

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 56 additions & 15 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 2 additions & 3 deletions utils/frame/remote-externalities/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ targets = ["x86_64-unknown-linux-gnu"]

[dependencies]
codec = { package = "parity-scale-codec", version = "3.0.0" }
env_logger = "0.9"
jsonrpsee = { version = "0.15.1", features = ["ws-client", "macros"] }
log = "0.4.17"
serde = "1.0.136"
Expand All @@ -24,12 +23,12 @@ sp-core = { version = "6.0.0", path = "../../../primitives/core" }
sp-io = { version = "6.0.0", path = "../../../primitives/io" }
sp-runtime = { version = "6.0.0", path = "../../../primitives/runtime" }
sp-version = { version = "5.0.0", path = "../../../primitives/version" }
async-std = { version = "1.0.0", features = ["tokio1"] }
tokio = { version = "1.17.0", features = ["macros", "rt-multi-thread"] }

[dev-dependencies]
tokio = { version = "1.17.0", features = ["macros", "rt-multi-thread"] }
frame-support = { version = "4.0.0-dev", path = "../../../frame/support" }
pallet-elections-phragmen = { version = "5.0.0-dev", path = "../../../frame/elections-phragmen" }
tracing-subscriber = { version = "0.3.16", features = ["env-filter"] }

[features]
remote-test = ["frame-support"]
40 changes: 21 additions & 19 deletions utils/frame/remote-externalities/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use sp_core::{
};
pub use sp_io::TestExternalities;
use sp_runtime::{
traits::{Block as BlockT, Header as HeaderT},
traits::{Block as BlockT},
StateVersion,
};
use std::{
Expand Down Expand Up @@ -156,6 +156,9 @@ impl Transport {
log::info!(target: LOG_TARGET, "initializing remote client to {:?}", uri);
WsClientBuilder::default()
.max_request_body_size(u32::MAX)
.request_timeout(std::time::Duration::from_secs(5 * 10))
.connection_timeout(std::time::Duration::from_secs(60))
.max_notifs_per_subscription(1024)
.build(&uri)
.await
.map_err(|e| {
Expand Down Expand Up @@ -412,7 +415,7 @@ impl<B: BlockT + DeserializeOwned> Builder<B> {
) -> Result<Vec<KeyValue>, &'static str> {
let now = std::time::Instant::now();
let keys = self.rpc_get_keys_paged(prefix, at).await?;
let uri = Arc::new(self.as_online().transport.uri.clone().unwrap());
let client = self.as_online().transport.remote_client.clone().unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually tried this, but even the local tests would then stall. Maybe it was because of lagging this #[tokio::test(flavor = "multi_thread")]?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it worked for the tests in remote externalities but didn't try it in try-runtime

let thread_chunk_size = (keys.len() / self.as_online().threads).max(1);

log::info!(
Expand All @@ -428,10 +431,10 @@ impl<B: BlockT + DeserializeOwned> Builder<B> {
let keys_chunked: Vec<Vec<StorageKey>> =
keys.chunks(thread_chunk_size).map(|s| s.into()).collect::<Vec<_>>();
for thread_keys in keys_chunked {
let uri = Arc::clone(&uri);
let thread_client = client.clone();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the client is now shared between all threads? Is that maybe slowing done anything?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or can you send parallel requests from multiple threads?

Copy link
Member Author

@niklasad1 niklasad1 Oct 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kian introduced this make it faster i.e, to make parallel requests from multiple threads but I think tokio::spawn should be sufficient here i.e, to use the tokio threadpool instead of spawning threads for each request. I guess we need to bench it to know why

But I removed that a new RPC client is created in each thread that seems needless as the RPC client is already multi-threaded, so I I Arc:ed instead and the request/call is just done a separate task/thread.

let handle = std::thread::spawn(move || {
use async_std::task::block_on;
let thread_client = block_on(Transport::build_ws_client(&*uri)).unwrap();
let rt = tokio::runtime::Runtime::new().unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have you actually tested this in try-runtime-cli?

Copy link
Member Author

@niklasad1 niklasad1 Oct 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tested it right now by RUST_LOG=remote-ext=debug,try-runtime-cli=debug ./target/release/substrate try-runtime offchain-worker live --uri wss://rpc.polkadot.io:443 --at 0x76b3e40351c4ea941f57233d0476259cfd21df418be01d456c05c55694c96f36 -p Proxy

it worked fine but I 'm just too lazy to run it on all pallets ^^


let mut thread_key_values = Vec::with_capacity(thread_keys.len());
for chunk_keys in thread_keys.chunks(DEFAULT_VALUE_DOWNLOAD_BATCH) {
let batch = chunk_keys
Expand All @@ -441,7 +444,7 @@ impl<B: BlockT + DeserializeOwned> Builder<B> {
.collect::<Vec<_>>();

let values =
block_on(thread_client.batch_request::<Option<StorageData>>(batch))
rt.block_on(thread_client.batch_request::<Option<StorageData>>(batch))
.map_err(|e| {
log::error!(
target: LOG_TARGET,
Expand Down Expand Up @@ -871,16 +874,15 @@ impl<B: BlockT + DeserializeOwned> Builder<B> {

#[cfg(test)]
mod test_prelude {
use tracing_subscriber::EnvFilter;

pub(crate) use super::*;
pub(crate) use sp_runtime::testing::{Block as RawBlock, ExtrinsicWrapper, H256 as Hash};

pub(crate) type Block = RawBlock<ExtrinsicWrapper<Hash>>;

pub(crate) fn init_logger() {
let _ = env_logger::Builder::from_default_env()
.format_module_path(true)
.format_level(true)
// .filter_module(LOG_TARGET, log::LevelFilter::Debug)
let _ = tracing_subscriber::fmt().with_env_filter(EnvFilter::from_default_env())
.with_level(true)
.try_init();
}
}
Expand Down Expand Up @@ -954,7 +956,7 @@ mod remote_tests {
todo!();
}

#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn snapshot_block_hash_works() {
const CACHE: &'static str = "snapshot_block_hash_works";
init_logger();
Expand All @@ -981,7 +983,7 @@ mod remote_tests {
assert_eq!(block_hash, cached_block_hash);
}

#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn offline_else_online_works() {
const CACHE: &'static str = "offline_else_online_works_data";
init_logger();
Expand Down Expand Up @@ -1026,7 +1028,7 @@ mod remote_tests {
std::fs::remove_file(to_delete[0].path()).unwrap();
}

#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn can_build_one_small_pallet() {
init_logger();
Builder::<Block>::new()
Expand All @@ -1041,7 +1043,7 @@ mod remote_tests {
.execute_with(|| {});
}

#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn can_build_few_pallet() {
init_logger();
Builder::<Block>::new()
Expand Down Expand Up @@ -1076,7 +1078,7 @@ mod remote_tests {
.execute_with(|| {});
}

#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn can_create_snapshot() {
const CACHE: &'static str = "can_create_snapshot";
init_logger();
Expand Down Expand Up @@ -1109,7 +1111,7 @@ mod remote_tests {
std::fs::remove_file(to_delete.path()).unwrap();
}

#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn can_create_child_snapshot() {
const CACHE: &'static str = "can_create_child_snapshot";
init_logger();
Expand Down Expand Up @@ -1141,7 +1143,7 @@ mod remote_tests {
std::fs::remove_file(to_delete.path()).unwrap();
}

#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
// #[ignore = "only works if a local node is present."]
async fn can_fetch_all_local() {
init_logger();
Expand All @@ -1156,7 +1158,7 @@ mod remote_tests {
.execute_with(|| {});
}

#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
// #[ignore = "slow af."]
async fn can_fetch_all_remote() {
init_logger();
Expand Down