Skip to content

Commit

Permalink
Optimize cluster memory allocations
Browse files Browse the repository at this point in the history
  • Loading branch information
nazar-pc committed Oct 6, 2024
1 parent c82252d commit e13407a
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 8 deletions.
12 changes: 7 additions & 5 deletions crates/subspace-farmer/src/cluster/plotter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use anyhow::anyhow;
use async_trait::async_trait;
use backoff::backoff::Backoff;
use backoff::ExponentialBackoff;
use bytes::Bytes;
use derive_more::Display;
use event_listener_primitives::{Bag, HandlerId};
use futures::channel::mpsc;
Expand Down Expand Up @@ -93,7 +94,7 @@ enum ClusterSectorPlottingProgress {
time: Duration,
},
/// Sector chunk after finished plotting
SectorChunk(Result<Vec<u8>, String>),
SectorChunk(Result<Bytes, String>),
/// Plotting failed
Error {
/// Error message
Expand Down Expand Up @@ -518,8 +519,8 @@ async fn process_response_notification<PS>(
progress_sender: &mut PS,
retry_backoff_policy: &mut ExponentialBackoff,
response: ClusterSectorPlottingProgress,
sector_sender: &mut mpsc::Sender<Result<Vec<u8>, String>>,
maybe_sector_receiver: &mut Option<mpsc::Receiver<Result<Vec<u8>, String>>>,
sector_sender: &mut mpsc::Sender<Result<Bytes, String>>,
maybe_sector_receiver: &mut Option<mpsc::Receiver<Result<Bytes, String>>>,
) -> ResponseProcessingResult
where
PS: Sink<SectorPlottingProgress> + Unpin + Send + 'static,
Expand Down Expand Up @@ -931,10 +932,11 @@ async fn send_publish_progress(
match maybe_sector_chunk {
Ok(sector_chunk) => {
// Slice large chunks into smaller ones before publishing
for sector_chunk in sector_chunk.chunks(approximate_max_message_size) {
for small_sector_chunk in sector_chunk.chunks(approximate_max_message_size)
{
if let Err(error) = response_sender
.send(ClusterSectorPlottingProgress::SectorChunk(Ok(
sector_chunk.to_vec()
sector_chunk.slice_ref(small_sector_chunk)
)))
.await
{
Expand Down
3 changes: 2 additions & 1 deletion crates/subspace-farmer/src/plotter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub mod gpu;
pub mod pool;

use async_trait::async_trait;
use bytes::Bytes;
use futures::channel::mpsc;
use futures::Stream;
use std::fmt;
Expand Down Expand Up @@ -39,7 +40,7 @@ pub enum SectorPlottingProgress {
/// How much time it took to plot a sector
time: Duration,
/// Stream of all plotted sector bytes
sector: Pin<Box<dyn Stream<Item = Result<Vec<u8>, String>> + Send + Sync>>,
sector: Pin<Box<dyn Stream<Item = Result<Bytes, String>> + Send + Sync>>,
},
/// Plotting failed
Error {
Expand Down
3 changes: 2 additions & 1 deletion crates/subspace-farmer/src/plotter/cpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::thread_pool_manager::PlottingThreadPoolManager;
use crate::utils::AsyncJoinOnDrop;
use async_lock::Mutex as AsyncMutex;
use async_trait::async_trait;
use bytes::Bytes;
use event_listener_primitives::{Bag, HandlerId};
use futures::channel::mpsc;
use futures::stream::FuturesUnordered;
Expand Down Expand Up @@ -455,7 +456,7 @@ where
SectorPlottingProgress::Finished {
plotted_sector,
time: start.elapsed(),
sector: Box::pin(stream::once(async move { Ok(sector) })),
sector: Box::pin(stream::once(async move { Ok(Bytes::from(sector)) })),
},
)
.await;
Expand Down
3 changes: 2 additions & 1 deletion crates/subspace-farmer/src/plotter/gpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::plotter::{Plotter, SectorPlottingProgress};
use crate::utils::AsyncJoinOnDrop;
use async_lock::Mutex as AsyncMutex;
use async_trait::async_trait;
use bytes::Bytes;
use event_listener_primitives::{Bag, HandlerId};
use futures::channel::mpsc;
use futures::stream::FuturesUnordered;
Expand Down Expand Up @@ -441,7 +442,7 @@ where
SectorPlottingProgress::Finished {
plotted_sector,
time: start.elapsed(),
sector: Box::pin(stream::once(async move { Ok(sector) })),
sector: Box::pin(stream::once(async move { Ok(Bytes::from(sector)) })),
},
)
.await;
Expand Down

0 comments on commit e13407a

Please sign in to comment.