From 744d52b5e8679901cfb633e95ed4a1f05938fc61 Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Sun, 16 Jun 2024 02:13:33 +0300 Subject: [PATCH] Add back concurrency limit for piece requests to avoid too high load on RPC server --- .../subspace-farmer/src/node_client/node_rpc_client.rs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/crates/subspace-farmer/src/node_client/node_rpc_client.rs b/crates/subspace-farmer/src/node_client/node_rpc_client.rs index 793042202f..eedd7e33d7 100644 --- a/crates/subspace-farmer/src/node_client/node_rpc_client.rs +++ b/crates/subspace-farmer/src/node_client/node_rpc_client.rs @@ -1,6 +1,6 @@ use crate::node_client::{Error as RpcError, Error, NodeClient, NodeClientExt}; use crate::utils::AsyncJoinOnDrop; -use async_lock::RwLock as AsyncRwLock; +use async_lock::{RwLock as AsyncRwLock, Semaphore}; use async_trait::async_trait; use futures::{Stream, StreamExt}; use jsonrpsee::core::client::{ClientT, Error as JsonError, SubscriptionClientT}; @@ -15,10 +15,15 @@ use subspace_rpc_primitives::{ }; use tracing::{info, trace, warn}; +/// TODO: Node is having a hard time responding for many piece requests, specifically this results +/// in subscriptions become broken on the node: https://github.com/paritytech/jsonrpsee/issues/1409 +const MAX_CONCURRENT_PIECE_REQUESTS: usize = 10; + /// `WsClient` wrapper. #[derive(Debug, Clone)] pub struct NodeRpcClient { client: Arc, + piece_request_semaphore: Arc, segment_headers: Arc>>, _background_task: Arc>, } @@ -32,6 +37,7 @@ impl NodeRpcClient { .build(url) .await?, ); + let piece_request_semaphore = Arc::new(Semaphore::new(MAX_CONCURRENT_PIECE_REQUESTS)); let mut segment_headers = Vec::::new(); let mut archived_segments_notifications = Box::pin( @@ -123,6 +129,7 @@ impl NodeRpcClient { let node_client = Self { client, + piece_request_semaphore, segment_headers, _background_task: Arc::new(AsyncJoinOnDrop::new(background_task, true)), }; @@ -234,6 +241,7 @@ impl NodeClient for NodeRpcClient { } async fn piece(&self, piece_index: PieceIndex) -> Result, RpcError> { + let _permit = self.piece_request_semaphore.acquire().await; let client = Arc::clone(&self.client); // Spawn a separate task to improve concurrency due to slow-ish JSON decoding that causes // issues for jsonrpsee