Skip to content

Commit

Permalink
Add back concurrency limit for piece requests to avoid too high load …
Browse files Browse the repository at this point in the history
…on RPC server
  • Loading branch information
nazar-pc committed Jun 16, 2024
1 parent c03d6ab commit 744d52b
Showing 1 changed file with 9 additions and 1 deletion.
10 changes: 9 additions & 1 deletion crates/subspace-farmer/src/node_client/node_rpc_client.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::node_client::{Error as RpcError, Error, NodeClient, NodeClientExt};
use crate::utils::AsyncJoinOnDrop;
use async_lock::RwLock as AsyncRwLock;
use async_lock::{RwLock as AsyncRwLock, Semaphore};
use async_trait::async_trait;
use futures::{Stream, StreamExt};
use jsonrpsee::core::client::{ClientT, Error as JsonError, SubscriptionClientT};
Expand All @@ -15,10 +15,15 @@ use subspace_rpc_primitives::{
};
use tracing::{info, trace, warn};

/// TODO: Node is having a hard time responding for many piece requests, specifically this results
/// in subscriptions become broken on the node: https://github.com/paritytech/jsonrpsee/issues/1409
const MAX_CONCURRENT_PIECE_REQUESTS: usize = 10;

/// `WsClient` wrapper.
#[derive(Debug, Clone)]
pub struct NodeRpcClient {
client: Arc<WsClient>,
piece_request_semaphore: Arc<Semaphore>,
segment_headers: Arc<AsyncRwLock<Vec<SegmentHeader>>>,
_background_task: Arc<AsyncJoinOnDrop<()>>,
}
Expand All @@ -32,6 +37,7 @@ impl NodeRpcClient {
.build(url)
.await?,
);
let piece_request_semaphore = Arc::new(Semaphore::new(MAX_CONCURRENT_PIECE_REQUESTS));

let mut segment_headers = Vec::<SegmentHeader>::new();
let mut archived_segments_notifications = Box::pin(
Expand Down Expand Up @@ -123,6 +129,7 @@ impl NodeRpcClient {

let node_client = Self {
client,
piece_request_semaphore,
segment_headers,
_background_task: Arc::new(AsyncJoinOnDrop::new(background_task, true)),
};
Expand Down Expand Up @@ -234,6 +241,7 @@ impl NodeClient for NodeRpcClient {
}

async fn piece(&self, piece_index: PieceIndex) -> Result<Option<Piece>, RpcError> {
let _permit = self.piece_request_semaphore.acquire().await;
let client = Arc::clone(&self.client);
// Spawn a separate task to improve concurrency due to slow-ish JSON decoding that causes
// issues for jsonrpsee
Expand Down

0 comments on commit 744d52b

Please sign in to comment.