diff --git a/utils/staking-miner/src/monitor.rs b/utils/staking-miner/src/monitor.rs index bfc075668e66..2395c7b7fc69 100644 --- a/utils/staking-miner/src/monitor.rs +++ b/utils/staking-miner/src/monitor.rs @@ -24,7 +24,8 @@ use jsonrpsee::core::Error as RpcError; use sc_transaction_pool_api::TransactionStatus; use sp_core::storage::StorageKey; use sp_runtime::Perbill; -use tokio::sync::mpsc; +use std::sync::Arc; +use tokio::sync::{mpsc, Mutex}; use EPM::{signed::SubmissionIndicesOf, SignedSubmissionOf}; /// Ensure that now is the signed phase. @@ -170,6 +171,7 @@ macro_rules! monitor_cmd_for { ($runtime:tt) => { paste::paste! { let mut subscription = heads_subscription().await?; let (tx, mut rx) = mpsc::unbounded_channel::(); + let submit_lock = Arc::new(Mutex::new(())); loop { let at = tokio::select! { @@ -201,9 +203,8 @@ macro_rules! monitor_cmd_for { ($runtime:tt) => { paste::paste! { // Spawn task and non-recoverable errors are sent back to the main task // such as if the connection has been closed. tokio::spawn( - send_and_watch_extrinsic(rpc.clone(), tx.clone(), at, signer.clone(), config.clone()) + send_and_watch_extrinsic(rpc.clone(), tx.clone(), at, signer.clone(), config.clone(), submit_lock.clone()) ); - } /// Construct extrinsic at given block and watch it. @@ -213,6 +214,7 @@ macro_rules! monitor_cmd_for { ($runtime:tt) => { paste::paste! { at: Header, signer: Signer, config: MonitorConfig, + submit_lock: Arc>, ) { async fn flatten( @@ -255,6 +257,8 @@ macro_rules! monitor_cmd_for { ($runtime:tt) => { paste::paste! { return; } + let _lock = submit_lock.lock().await; + let mut ext = match crate::create_election_ext::(rpc.clone(), Some(hash), vec![]).await { Ok(ext) => ext, Err(err) => { @@ -302,6 +306,7 @@ macro_rules! monitor_cmd_for { ($runtime:tt) => { paste::paste! { let rpc1 = rpc.clone(); let rpc2 = rpc.clone(); + let rpc3 = rpc.clone(); let latest_head = match get_latest_head::(&rpc, &config.listen).await { Ok(hash) => hash, @@ -325,10 +330,16 @@ macro_rules! monitor_cmd_for { ($runtime:tt) => { paste::paste! { ensure_signed_phase::(&rpc2, latest_head).await }); + let account = signer.account.clone(); + let no_prev_sol_fut = tokio::spawn(async move { + ensure_no_previous_solution::(&rpc3, latest_head, &account).await + }); + // Run the calls in parallel and return once all has completed or any failed. if let Err(err) = tokio::try_join!( flatten(ensure_strategy_met_fut), flatten(ensure_signed_phase_fut), + flatten(no_prev_sol_fut), ) { log::debug!(target: LOG_TARGET, "Skipping to submit at block {}; {}", at.number, err); return; diff --git a/utils/staking-miner/src/rpc.rs b/utils/staking-miner/src/rpc.rs index 8929afcbe656..1da24b4aae34 100644 --- a/utils/staking-miner/src/rpc.rs +++ b/utils/staking-miner/src/rpc.rs @@ -131,6 +131,7 @@ impl SharedRpcClient { .connection_timeout(connection_timeout) .max_request_body_size(u32::MAX) .request_timeout(request_timeout) + .max_concurrent_requests(u32::MAX as usize) .build(uri) .await?; Ok(Self(Arc::new(client)))