From e13407a1b48da3b8496ec8b80e723fb6cd197060 Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Mon, 7 Oct 2024 00:28:51 +0300 Subject: [PATCH] Optimize cluster memory allocations --- crates/subspace-farmer/src/cluster/plotter.rs | 12 +++++++----- crates/subspace-farmer/src/plotter.rs | 3 ++- crates/subspace-farmer/src/plotter/cpu.rs | 3 ++- crates/subspace-farmer/src/plotter/gpu.rs | 3 ++- 4 files changed, 13 insertions(+), 8 deletions(-) diff --git a/crates/subspace-farmer/src/cluster/plotter.rs b/crates/subspace-farmer/src/cluster/plotter.rs index 32bda46ce1..1b112e4033 100644 --- a/crates/subspace-farmer/src/cluster/plotter.rs +++ b/crates/subspace-farmer/src/cluster/plotter.rs @@ -15,6 +15,7 @@ use anyhow::anyhow; use async_trait::async_trait; use backoff::backoff::Backoff; use backoff::ExponentialBackoff; +use bytes::Bytes; use derive_more::Display; use event_listener_primitives::{Bag, HandlerId}; use futures::channel::mpsc; @@ -93,7 +94,7 @@ enum ClusterSectorPlottingProgress { time: Duration, }, /// Sector chunk after finished plotting - SectorChunk(Result, String>), + SectorChunk(Result), /// Plotting failed Error { /// Error message @@ -518,8 +519,8 @@ async fn process_response_notification( progress_sender: &mut PS, retry_backoff_policy: &mut ExponentialBackoff, response: ClusterSectorPlottingProgress, - sector_sender: &mut mpsc::Sender, String>>, - maybe_sector_receiver: &mut Option, String>>>, + sector_sender: &mut mpsc::Sender>, + maybe_sector_receiver: &mut Option>>, ) -> ResponseProcessingResult where PS: Sink + Unpin + Send + 'static, @@ -931,10 +932,11 @@ async fn send_publish_progress( match maybe_sector_chunk { Ok(sector_chunk) => { // Slice large chunks into smaller ones before publishing - for sector_chunk in sector_chunk.chunks(approximate_max_message_size) { + for small_sector_chunk in sector_chunk.chunks(approximate_max_message_size) + { if let Err(error) = response_sender .send(ClusterSectorPlottingProgress::SectorChunk(Ok( - sector_chunk.to_vec() + sector_chunk.slice_ref(small_sector_chunk) ))) .await { diff --git a/crates/subspace-farmer/src/plotter.rs b/crates/subspace-farmer/src/plotter.rs index e1c6f5e1f0..e99e3d93fa 100644 --- a/crates/subspace-farmer/src/plotter.rs +++ b/crates/subspace-farmer/src/plotter.rs @@ -11,6 +11,7 @@ pub mod gpu; pub mod pool; use async_trait::async_trait; +use bytes::Bytes; use futures::channel::mpsc; use futures::Stream; use std::fmt; @@ -39,7 +40,7 @@ pub enum SectorPlottingProgress { /// How much time it took to plot a sector time: Duration, /// Stream of all plotted sector bytes - sector: Pin, String>> + Send + Sync>>, + sector: Pin> + Send + Sync>>, }, /// Plotting failed Error { diff --git a/crates/subspace-farmer/src/plotter/cpu.rs b/crates/subspace-farmer/src/plotter/cpu.rs index d1b860f97c..358d301f7a 100644 --- a/crates/subspace-farmer/src/plotter/cpu.rs +++ b/crates/subspace-farmer/src/plotter/cpu.rs @@ -8,6 +8,7 @@ use crate::thread_pool_manager::PlottingThreadPoolManager; use crate::utils::AsyncJoinOnDrop; use async_lock::Mutex as AsyncMutex; use async_trait::async_trait; +use bytes::Bytes; use event_listener_primitives::{Bag, HandlerId}; use futures::channel::mpsc; use futures::stream::FuturesUnordered; @@ -455,7 +456,7 @@ where SectorPlottingProgress::Finished { plotted_sector, time: start.elapsed(), - sector: Box::pin(stream::once(async move { Ok(sector) })), + sector: Box::pin(stream::once(async move { Ok(Bytes::from(sector)) })), }, ) .await; diff --git a/crates/subspace-farmer/src/plotter/gpu.rs b/crates/subspace-farmer/src/plotter/gpu.rs index 8b0cf2ccb9..fb3ecdacf2 100644 --- a/crates/subspace-farmer/src/plotter/gpu.rs +++ b/crates/subspace-farmer/src/plotter/gpu.rs @@ -11,6 +11,7 @@ use crate::plotter::{Plotter, SectorPlottingProgress}; use crate::utils::AsyncJoinOnDrop; use async_lock::Mutex as AsyncMutex; use async_trait::async_trait; +use bytes::Bytes; use event_listener_primitives::{Bag, HandlerId}; use futures::channel::mpsc; use futures::stream::FuturesUnordered; @@ -441,7 +442,7 @@ where SectorPlottingProgress::Finished { plotted_sector, time: start.elapsed(), - sector: Box::pin(stream::once(async move { Ok(sector) })), + sector: Box::pin(stream::once(async move { Ok(Bytes::from(sector)) })), }, ) .await;