From 2cebf3c021f2997d35df321abff71473e1bd4d75 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Wed, 29 Mar 2023 16:43:50 +0200 Subject: [PATCH 1/2] staking miner: less aggresive submissions We have noticed that the staking-miner performs many concurrent RPC calls (more than 256). Probably because these batch request are getting bigger because the state is growing. So let's relax this and mine solutions sequentially i.e, mine solution one solution at the time and not in concurrently. --- utils/staking-miner/src/monitor.rs | 10 +++++++--- utils/staking-miner/src/rpc.rs | 1 + 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/utils/staking-miner/src/monitor.rs b/utils/staking-miner/src/monitor.rs index bfc075668e66..1f8450312b17 100644 --- a/utils/staking-miner/src/monitor.rs +++ b/utils/staking-miner/src/monitor.rs @@ -21,10 +21,11 @@ use crate::{ }; use codec::Encode; use jsonrpsee::core::Error as RpcError; +use std::sync::Arc; use sc_transaction_pool_api::TransactionStatus; use sp_core::storage::StorageKey; use sp_runtime::Perbill; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, Mutex}; use EPM::{signed::SubmissionIndicesOf, SignedSubmissionOf}; /// Ensure that now is the signed phase. @@ -170,6 +171,7 @@ macro_rules! monitor_cmd_for { ($runtime:tt) => { paste::paste! { let mut subscription = heads_subscription().await?; let (tx, mut rx) = mpsc::unbounded_channel::(); + let submit_lock = Arc::new(Mutex::new(())); loop { let at = tokio::select! { @@ -201,9 +203,8 @@ macro_rules! monitor_cmd_for { ($runtime:tt) => { paste::paste! { // Spawn task and non-recoverable errors are sent back to the main task // such as if the connection has been closed. tokio::spawn( - send_and_watch_extrinsic(rpc.clone(), tx.clone(), at, signer.clone(), config.clone()) + send_and_watch_extrinsic(rpc.clone(), tx.clone(), at, signer.clone(), config.clone(), submit_lock.clone()) ); - } /// Construct extrinsic at given block and watch it. @@ -213,6 +214,7 @@ macro_rules! monitor_cmd_for { ($runtime:tt) => { paste::paste! { at: Header, signer: Signer, config: MonitorConfig, + submit_lock: Arc>, ) { async fn flatten( @@ -255,6 +257,8 @@ macro_rules! monitor_cmd_for { ($runtime:tt) => { paste::paste! { return; } + let _lock = submit_lock.lock().await; + let mut ext = match crate::create_election_ext::(rpc.clone(), Some(hash), vec![]).await { Ok(ext) => ext, Err(err) => { diff --git a/utils/staking-miner/src/rpc.rs b/utils/staking-miner/src/rpc.rs index 8929afcbe656..1da24b4aae34 100644 --- a/utils/staking-miner/src/rpc.rs +++ b/utils/staking-miner/src/rpc.rs @@ -131,6 +131,7 @@ impl SharedRpcClient { .connection_timeout(connection_timeout) .max_request_body_size(u32::MAX) .request_timeout(request_timeout) + .max_concurrent_requests(u32::MAX as usize) .build(uri) .await?; Ok(Self(Arc::new(client))) From e5ed20b5cbcde6824da1a8f9e751381032bb7bef Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Thu, 30 Mar 2023 00:12:18 +0200 Subject: [PATCH 2/2] add check if self hasn't submitted after mining --- utils/staking-miner/src/monitor.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/utils/staking-miner/src/monitor.rs b/utils/staking-miner/src/monitor.rs index 1f8450312b17..2395c7b7fc69 100644 --- a/utils/staking-miner/src/monitor.rs +++ b/utils/staking-miner/src/monitor.rs @@ -21,10 +21,10 @@ use crate::{ }; use codec::Encode; use jsonrpsee::core::Error as RpcError; -use std::sync::Arc; use sc_transaction_pool_api::TransactionStatus; use sp_core::storage::StorageKey; use sp_runtime::Perbill; +use std::sync::Arc; use tokio::sync::{mpsc, Mutex}; use EPM::{signed::SubmissionIndicesOf, SignedSubmissionOf}; @@ -306,6 +306,7 @@ macro_rules! monitor_cmd_for { ($runtime:tt) => { paste::paste! { let rpc1 = rpc.clone(); let rpc2 = rpc.clone(); + let rpc3 = rpc.clone(); let latest_head = match get_latest_head::(&rpc, &config.listen).await { Ok(hash) => hash, @@ -329,10 +330,16 @@ macro_rules! monitor_cmd_for { ($runtime:tt) => { paste::paste! { ensure_signed_phase::(&rpc2, latest_head).await }); + let account = signer.account.clone(); + let no_prev_sol_fut = tokio::spawn(async move { + ensure_no_previous_solution::(&rpc3, latest_head, &account).await + }); + // Run the calls in parallel and return once all has completed or any failed. if let Err(err) = tokio::try_join!( flatten(ensure_strategy_met_fut), flatten(ensure_signed_phase_fut), + flatten(no_prev_sol_fut), ) { log::debug!(target: LOG_TARGET, "Skipping to submit at block {}; {}", at.number, err); return;