From e22311aadfc49e888d490c675f6cdf396dcaa232 Mon Sep 17 00:00:00 2001 From: Sergiu Popescu Date: Fri, 24 Jan 2025 17:48:23 +0100 Subject: [PATCH 01/13] refactor: add the ability to store large checkpoints as multipart uploads --- crates/iota-data-ingestion-core/src/lib.rs | 2 +- crates/iota-data-ingestion-core/src/util.rs | 16 ++- .../iota-data-ingestion/src/workers/blob.rs | 106 ++++++++++++++++-- .../iota-data-ingestion/config/config.yaml | 4 + 4 files changed, 117 insertions(+), 11 deletions(-) diff --git a/crates/iota-data-ingestion-core/src/lib.rs b/crates/iota-data-ingestion-core/src/lib.rs index f110488f758..a90941dd971 100644 --- a/crates/iota-data-ingestion-core/src/lib.rs +++ b/crates/iota-data-ingestion-core/src/lib.rs @@ -20,7 +20,7 @@ use iota_types::{ pub use metrics::DataIngestionMetrics; pub use progress_store::{FileProgressStore, ProgressStore, ShimProgressStore}; pub use reader::ReaderOptions; -pub use util::create_remote_store_client; +pub use util::{create_remote_store_client, create_remote_store_client_with_ops}; pub use worker_pool::WorkerPool; #[async_trait] diff --git a/crates/iota-data-ingestion-core/src/util.rs b/crates/iota-data-ingestion-core/src/util.rs index 62434dfd6e6..8ad3c0a9b23 100644 --- a/crates/iota-data-ingestion-core/src/util.rs +++ b/crates/iota-data-ingestion-core/src/util.rs @@ -13,15 +13,25 @@ use url::Url; pub fn create_remote_store_client( url: String, remote_store_options: Vec<(String, String)>, - timeout_secs: u64, + request_timeout: u64, ) -> Result> { let retry_config = RetryConfig { max_retries: 0, - retry_timeout: Duration::from_secs(timeout_secs + 1), + retry_timeout: Duration::from_secs(request_timeout + 1), ..Default::default() }; + + create_remote_store_client_with_ops(url, remote_store_options, request_timeout, retry_config) +} + +pub fn create_remote_store_client_with_ops( + url: String, + remote_store_options: Vec<(String, String)>, + request_timeout: u64, + retry_config: RetryConfig, +) -> Result> { let client_options = ClientOptions::new() - .with_timeout(Duration::from_secs(timeout_secs)) + .with_timeout(Duration::from_secs(request_timeout)) .with_allow_http(true); if remote_store_options.is_empty() { let http_store = object_store::http::HttpBuilder::new() diff --git a/crates/iota-data-ingestion/src/workers/blob.rs b/crates/iota-data-ingestion/src/workers/blob.rs index 9afa7ece94d..b9ca853de4b 100644 --- a/crates/iota-data-ingestion/src/workers/blob.rs +++ b/crates/iota-data-ingestion/src/workers/blob.rs @@ -2,19 +2,25 @@ // Modifications Copyright (c) 2024 IOTA Stiftung // SPDX-License-Identifier: Apache-2.0 +use std::time::Duration; + use anyhow::Result; use async_trait::async_trait; use bytes::Bytes; -use iota_data_ingestion_core::{Worker, create_remote_store_client}; +use iota_data_ingestion_core::{Worker, create_remote_store_client_with_ops}; use iota_storage::blob::{Blob, BlobEncoding}; use iota_types::full_checkpoint_content::CheckpointData; -use object_store::{ObjectStore, path::Path}; +use object_store::{MultipartUpload, ObjectStore, RetryConfig, path::Path}; use serde::{Deserialize, Serialize}; +const CHUNK_SIZE: usize = 50 * 1024 * 1024; // 50 MB +const PARALLEL_CHUNKS_UPLOAD: usize = 10; + #[derive(Serialize, Deserialize, Clone, Debug)] pub struct BlobTaskConfig { pub url: String, pub remote_store_options: Vec<(String, String)>, + pub timeout_secs: u64, } pub struct BlobWorker { @@ -24,10 +30,91 @@ pub struct BlobWorker { impl BlobWorker { pub fn new(config: BlobTaskConfig) -> Self { Self { - remote_store: create_remote_store_client(config.url, config.remote_store_options, 10) - .expect("failed to create remote store client"), + remote_store: create_remote_store_client_with_ops( + config.url, + config.remote_store_options, + config.timeout_secs, + RetryConfig { + max_retries: 10, + retry_timeout: Duration::from_secs(config.timeout_secs + 1), + ..Default::default() + }, + ) + .expect("failed to create remote store client"), } } + + /// Uploads a Checkpoint blob to the Remote Store. + /// + /// If the blob size exceeds the configured `CHUNK_SIZE`, + /// it uploads the blob in parts using multipart upload. + /// Otherwise, it uploads the blob directly. + async fn upload_blob(&self, bytes: Vec, chk_seq_num: u64, location: Path) -> Result<()> { + if bytes.len() > CHUNK_SIZE { + return self + .upload_blob_multipart(bytes, chk_seq_num, location) + .await; + } + + self.remote_store + .put(&location, Bytes::from(bytes).into()) + .await?; + + Ok(()) + } + + /// Uploads a large Checkpoint blob to the Remote Store using multipart + /// upload. + /// + /// This function divides the input `bytes` into chunks of size `CHUNK_SIZE` + /// and uploads each chunk individually. + /// Finally, it completes the multipart upload by assembling all the + /// uploaded parts. + async fn upload_blob_multipart( + &self, + bytes: Vec, + chk_seq_num: u64, + location: Path, + ) -> Result<()> { + let mut multipart = self.remote_store.put_multipart(&location).await?; + + let chunks = bytes.chunks(CHUNK_SIZE); + let total_chunks = chunks.len(); + + let mut parts_futures = vec![]; + for (chunk_id, chunk) in chunks.enumerate() { + tracing::info!( + "Preparing checkpoint {chk_seq_num} chunk {}/{total_chunks}", + chunk_id + 1 + ); + + parts_futures.push(multipart.put_part(Bytes::copy_from_slice(chunk).into())); + } + + // send chunks in parallel to the remote store + for (chunk_id, chunk) in parts_futures.chunks_mut(PARALLEL_CHUNKS_UPLOAD).enumerate() { + tracing::info!( + "Sending checkpoint {chk_seq_num} chunks {}-{} of {total_chunks}", + chunk_id * PARALLEL_CHUNKS_UPLOAD + 1, + (chunk_id + 1) * PARALLEL_CHUNKS_UPLOAD.min(total_chunks) + ); + let start_time = std::time::Instant::now(); + futures::future::try_join_all(chunk).await?; + tracing::info!( + "multipart checkpoint {chk_seq_num} sent in {:?}", + start_time.elapsed() + ); + } + + let start_time = std::time::Instant::now(); + multipart.complete().await?; + tracing::info!( + "multipart checkpoint {chk_seq_num} uploaded in {:?}", + start_time.elapsed() + ); + + Ok(()) + } } #[async_trait] @@ -38,9 +125,14 @@ impl Worker for BlobWorker { "{}.chk", checkpoint.checkpoint_summary.sequence_number )); - self.remote_store - .put(&location, Bytes::from(bytes).into()) - .await?; + + self.upload_blob( + bytes, + checkpoint.checkpoint_summary.sequence_number, + location, + ) + .await?; + Ok(()) } } diff --git a/dev-tools/iota-data-ingestion/config/config.yaml b/dev-tools/iota-data-ingestion/config/config.yaml index bb913f53ebd..8ccf59d2148 100644 --- a/dev-tools/iota-data-ingestion/config/config.yaml +++ b/dev-tools/iota-data-ingestion/config/config.yaml @@ -24,6 +24,10 @@ tasks: blob: # S3 bucket where checkpoints will be stored url: "s3://checkpoints" + # S3 client timeout, a large timout is needed to accommodate + # checkpoints which could have the size of several GBs + # + timeout_secs: 60 # AWS S3 client config options remote_store_options: - ["access_key_id", "test"] From a33e8907bc25a5e7da9a0b1b83adb6a7b7d67dce Mon Sep 17 00:00:00 2001 From: Sergiu Popescu Date: Mon, 27 Jan 2025 10:29:11 +0100 Subject: [PATCH 02/13] fixup! refactor: add the ability to store large checkpoints as multipart uploads --- dev-tools/iota-data-ingestion/config/config.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dev-tools/iota-data-ingestion/config/config.yaml b/dev-tools/iota-data-ingestion/config/config.yaml index 8ccf59d2148..812f2d90087 100644 --- a/dev-tools/iota-data-ingestion/config/config.yaml +++ b/dev-tools/iota-data-ingestion/config/config.yaml @@ -24,7 +24,7 @@ tasks: blob: # S3 bucket where checkpoints will be stored url: "s3://checkpoints" - # S3 client timeout, a large timout is needed to accommodate + # S3 client timeout, a large timeout is needed to accommodate # checkpoints which could have the size of several GBs # timeout_secs: 60 From 1a18b55f5b046c8962e924ebf0e63ca5fa9e25db Mon Sep 17 00:00:00 2001 From: Sergiu Popescu Date: Mon, 27 Jan 2025 15:18:39 +0100 Subject: [PATCH 03/13] fixup! fixup! refactor: add the ability to store large checkpoints as multipart uploads --- crates/iota-data-ingestion-core/src/util.rs | 15 +++++++---- .../iota-data-ingestion/src/workers/blob.rs | 26 ++++++++++--------- .../iota-data-ingestion/config/config.yaml | 2 +- 3 files changed, 25 insertions(+), 18 deletions(-) diff --git a/crates/iota-data-ingestion-core/src/util.rs b/crates/iota-data-ingestion-core/src/util.rs index 8ad3c0a9b23..86c4586c5f9 100644 --- a/crates/iota-data-ingestion-core/src/util.rs +++ b/crates/iota-data-ingestion-core/src/util.rs @@ -13,25 +13,30 @@ use url::Url; pub fn create_remote_store_client( url: String, remote_store_options: Vec<(String, String)>, - request_timeout: u64, + request_timeout_secs: u64, ) -> Result> { let retry_config = RetryConfig { max_retries: 0, - retry_timeout: Duration::from_secs(request_timeout + 1), + retry_timeout: Duration::from_secs(request_timeout_secs + 1), ..Default::default() }; - create_remote_store_client_with_ops(url, remote_store_options, request_timeout, retry_config) + create_remote_store_client_with_ops( + url, + remote_store_options, + request_timeout_secs, + retry_config, + ) } pub fn create_remote_store_client_with_ops( url: String, remote_store_options: Vec<(String, String)>, - request_timeout: u64, + request_timeout_secs: u64, retry_config: RetryConfig, ) -> Result> { let client_options = ClientOptions::new() - .with_timeout(Duration::from_secs(request_timeout)) + .with_timeout(Duration::from_secs(request_timeout_secs)) .with_allow_http(true); if remote_store_options.is_empty() { let http_store = object_store::http::HttpBuilder::new() diff --git a/crates/iota-data-ingestion/src/workers/blob.rs b/crates/iota-data-ingestion/src/workers/blob.rs index b9ca853de4b..d91680cd849 100644 --- a/crates/iota-data-ingestion/src/workers/blob.rs +++ b/crates/iota-data-ingestion/src/workers/blob.rs @@ -20,7 +20,7 @@ const PARALLEL_CHUNKS_UPLOAD: usize = 10; pub struct BlobTaskConfig { pub url: String, pub remote_store_options: Vec<(String, String)>, - pub timeout_secs: u64, + pub request_timeout_secs: u64, } pub struct BlobWorker { @@ -33,10 +33,10 @@ impl BlobWorker { remote_store: create_remote_store_client_with_ops( config.url, config.remote_store_options, - config.timeout_secs, + config.request_timeout_secs, RetryConfig { max_retries: 10, - retry_timeout: Duration::from_secs(config.timeout_secs + 1), + retry_timeout: Duration::from_secs(config.request_timeout_secs + 1), ..Default::default() }, ) @@ -83,8 +83,8 @@ impl BlobWorker { let mut parts_futures = vec![]; for (chunk_id, chunk) in chunks.enumerate() { - tracing::info!( - "Preparing checkpoint {chk_seq_num} chunk {}/{total_chunks}", + tracing::trace!( + "preparing checkpoint {chk_seq_num} chunk {}/{total_chunks}", chunk_id + 1 ); @@ -92,16 +92,18 @@ impl BlobWorker { } // send chunks in parallel to the remote store - for (chunk_id, chunk) in parts_futures.chunks_mut(PARALLEL_CHUNKS_UPLOAD).enumerate() { + for (chunks_group_id, chunks_group) in + parts_futures.chunks_mut(PARALLEL_CHUNKS_UPLOAD).enumerate() + { + let first_chunk_id = chunks_group_id * PARALLEL_CHUNKS_UPLOAD + 1; + let last_chunk_id = (chunks_group_id + 1) * PARALLEL_CHUNKS_UPLOAD.min(total_chunks); tracing::info!( - "Sending checkpoint {chk_seq_num} chunks {}-{} of {total_chunks}", - chunk_id * PARALLEL_CHUNKS_UPLOAD + 1, - (chunk_id + 1) * PARALLEL_CHUNKS_UPLOAD.min(total_chunks) + "sending checkpoint {chk_seq_num} chunks {first_chunk_id}-{last_chunk_id} of {total_chunks}", ); let start_time = std::time::Instant::now(); - futures::future::try_join_all(chunk).await?; + futures::future::try_join_all(chunks_group).await?; tracing::info!( - "multipart checkpoint {chk_seq_num} sent in {:?}", + "checkpoint {chk_seq_num} chunk parts {first_chunk_id}-{last_chunk_id} of {total_chunks} were sent in {:?}", start_time.elapsed() ); } @@ -109,7 +111,7 @@ impl BlobWorker { let start_time = std::time::Instant::now(); multipart.complete().await?; tracing::info!( - "multipart checkpoint {chk_seq_num} uploaded in {:?}", + "checkpoint {chk_seq_num} multipart uploaded in {:?}", start_time.elapsed() ); diff --git a/dev-tools/iota-data-ingestion/config/config.yaml b/dev-tools/iota-data-ingestion/config/config.yaml index 812f2d90087..fd28c69aea5 100644 --- a/dev-tools/iota-data-ingestion/config/config.yaml +++ b/dev-tools/iota-data-ingestion/config/config.yaml @@ -27,7 +27,7 @@ tasks: # S3 client timeout, a large timeout is needed to accommodate # checkpoints which could have the size of several GBs # - timeout_secs: 60 + request_timeout_secs: 60 # AWS S3 client config options remote_store_options: - ["access_key_id", "test"] From 4068573f0513fda5ddb468fe2a36e3b06bb94c0e Mon Sep 17 00:00:00 2001 From: Sergiu Popescu Date: Tue, 28 Jan 2025 14:42:33 +0100 Subject: [PATCH 04/13] fixup! fixup! fixup! refactor: add the ability to store large checkpoints as multipart uploads --- .../iota-data-ingestion/src/workers/blob.rs | 54 ++++++++--------- docker/iota-data-ingestion/config/config.yaml | 60 +++++++++++++++++++ 2 files changed, 87 insertions(+), 27 deletions(-) create mode 100644 docker/iota-data-ingestion/config/config.yaml diff --git a/crates/iota-data-ingestion/src/workers/blob.rs b/crates/iota-data-ingestion/src/workers/blob.rs index d91680cd849..557f4fd5264 100644 --- a/crates/iota-data-ingestion/src/workers/blob.rs +++ b/crates/iota-data-ingestion/src/workers/blob.rs @@ -11,25 +11,41 @@ use iota_data_ingestion_core::{Worker, create_remote_store_client_with_ops}; use iota_storage::blob::{Blob, BlobEncoding}; use iota_types::full_checkpoint_content::CheckpointData; use object_store::{MultipartUpload, ObjectStore, RetryConfig, path::Path}; -use serde::{Deserialize, Serialize}; +use serde::{Deserialize, Deserializer, Serialize}; -const CHUNK_SIZE: usize = 50 * 1024 * 1024; // 50 MB -const PARALLEL_CHUNKS_UPLOAD: usize = 10; +/// Minimum allowed chunk size to be uploaded to remote store +const MIN_CHUNK_SIZE_MB: u64 = 5 * 1024 * 1024; // 5 MB #[derive(Serialize, Deserialize, Clone, Debug)] pub struct BlobTaskConfig { pub url: String, pub remote_store_options: Vec<(String, String)>, pub request_timeout_secs: u64, + #[serde(deserialize_with = "deserialize_chunk")] + pub checkpoint_chunk_size_mb: u64, +} + +fn deserialize_chunk<'de, D>(deserializer: D) -> Result +where + D: Deserializer<'de>, +{ + let checkpoint_chunk_size = u64::deserialize(deserializer)? * 1024 * 1024; + if checkpoint_chunk_size < MIN_CHUNK_SIZE_MB { + return Err(serde::de::Error::custom("Chunk size must be at least 5 MB")); + } + Ok(checkpoint_chunk_size) } pub struct BlobWorker { remote_store: Box, + checkpoint_chunk_size_mb: u64, } impl BlobWorker { pub fn new(config: BlobTaskConfig) -> Self { + println!("{}", config.checkpoint_chunk_size_mb); Self { + checkpoint_chunk_size_mb: config.checkpoint_chunk_size_mb, remote_store: create_remote_store_client_with_ops( config.url, config.remote_store_options, @@ -50,7 +66,7 @@ impl BlobWorker { /// it uploads the blob in parts using multipart upload. /// Otherwise, it uploads the blob directly. async fn upload_blob(&self, bytes: Vec, chk_seq_num: u64, location: Path) -> Result<()> { - if bytes.len() > CHUNK_SIZE { + if bytes.len() > self.checkpoint_chunk_size_mb as usize { return self .upload_blob_multipart(bytes, chk_seq_num, location) .await; @@ -77,33 +93,17 @@ impl BlobWorker { location: Path, ) -> Result<()> { let mut multipart = self.remote_store.put_multipart(&location).await?; - - let chunks = bytes.chunks(CHUNK_SIZE); + let chunks = bytes.chunks(self.checkpoint_chunk_size_mb as usize); let total_chunks = chunks.len(); - let mut parts_futures = vec![]; for (chunk_id, chunk) in chunks.enumerate() { - tracing::trace!( - "preparing checkpoint {chk_seq_num} chunk {}/{total_chunks}", - chunk_id + 1 - ); - - parts_futures.push(multipart.put_part(Bytes::copy_from_slice(chunk).into())); - } - - // send chunks in parallel to the remote store - for (chunks_group_id, chunks_group) in - parts_futures.chunks_mut(PARALLEL_CHUNKS_UPLOAD).enumerate() - { - let first_chunk_id = chunks_group_id * PARALLEL_CHUNKS_UPLOAD + 1; - let last_chunk_id = (chunks_group_id + 1) * PARALLEL_CHUNKS_UPLOAD.min(total_chunks); - tracing::info!( - "sending checkpoint {chk_seq_num} chunks {first_chunk_id}-{last_chunk_id} of {total_chunks}", - ); let start_time = std::time::Instant::now(); - futures::future::try_join_all(chunks_group).await?; + multipart + .put_part(Bytes::copy_from_slice(chunk).into()) + .await?; tracing::info!( - "checkpoint {chk_seq_num} chunk parts {first_chunk_id}-{last_chunk_id} of {total_chunks} were sent in {:?}", + "uploaded checkpoint {chk_seq_num} chunk {}/{total_chunks} in {:?}", + chunk_id + 1, start_time.elapsed() ); } @@ -111,7 +111,7 @@ impl BlobWorker { let start_time = std::time::Instant::now(); multipart.complete().await?; tracing::info!( - "checkpoint {chk_seq_num} multipart uploaded in {:?}", + "checkpoint {chk_seq_num} multipart completion request finished in {:?}", start_time.elapsed() ); diff --git a/docker/iota-data-ingestion/config/config.yaml b/docker/iota-data-ingestion/config/config.yaml new file mode 100644 index 00000000000..8414ab9bd39 --- /dev/null +++ b/docker/iota-data-ingestion/config/config.yaml @@ -0,0 +1,60 @@ +# IndexerExecutor config +# +path: "./test-checkpoints" +# IOTA Node Rest API URL +remote_store_url: "http://localhost:9000/api/v1" + +# DynamoDbProgressStore config +# +progress_store: + aws_access_key_id: "test" + aws_secret_access_key: "test" + aws_region: "us-east-1" + # DynamoDB table name + table_name: "checkpoint-progress" + +# Workers Configs +# +tasks: + # Task unique name + - name: "local-blob-storage" + # Number of workers will process the checkpoints in parallel + concurrency: 1 + # Task type + blob: + # S3 bucket where checkpoints will be stored + url: "s3://checkpoints" + # S3 client timeout, a large timeout is needed to accommodate the checkpoints which could have the size of several GBs + # + request_timeout_secs: 180 + # Checkpoint upload chunk size (in MB) that determines the upload strategy: + # + # If checkpoint size < checkpoint_chunk_size_mb: + # - Uploads checkpoint using single PUT operation + # - Optimal for smaller checkpoints + # + # If checkpoint size >= checkpoint_chunk_size_mb: + # - Divides checkpoint into chunks of this size + # - Uploads chunks in parallel as multipart upload + # - Storage service concatenates parts on completion + # + # Example with 50MB chunk size: + # 200MB checkpoint: + # - Splits into 4 parts (50MB each) + # - Multipart upload of each part + # - Parts merged on remote storage + # + # 40MB checkpoint: + # - Single PUT upload + # - No chunking needed + # + # Minimum allowed chunk size is 5MB + # + checkpoint_chunk_size_mb: 50 + # AWS S3 client config options + remote_store_options: + - ["access_key_id", "test"] + - ["secret_access_key", "test"] + - ["region", "us-east-1"] + # Only needed if using Localstack for local development purposes, it's preferred to be removed + - ["endpoint_url", "http://localhost:4566"] From ab905b247d950d56c6fe58fd2f1fe677e94d8f64 Mon Sep 17 00:00:00 2001 From: Sergiu Popescu Date: Tue, 28 Jan 2025 16:42:24 +0100 Subject: [PATCH 05/13] fixup! fixup! fixup! fixup! refactor: add the ability to store large checkpoints as multipart uploads --- crates/iota-data-ingestion/src/workers/blob.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/iota-data-ingestion/src/workers/blob.rs b/crates/iota-data-ingestion/src/workers/blob.rs index 557f4fd5264..a0e709f38f8 100644 --- a/crates/iota-data-ingestion/src/workers/blob.rs +++ b/crates/iota-data-ingestion/src/workers/blob.rs @@ -43,7 +43,6 @@ pub struct BlobWorker { impl BlobWorker { pub fn new(config: BlobTaskConfig) -> Self { - println!("{}", config.checkpoint_chunk_size_mb); Self { checkpoint_chunk_size_mb: config.checkpoint_chunk_size_mb, remote_store: create_remote_store_client_with_ops( From 2205a876c34613503d75f90936c621de0e0068d8 Mon Sep 17 00:00:00 2001 From: Sergiu Popescu Date: Tue, 28 Jan 2025 18:05:30 +0100 Subject: [PATCH 06/13] refactor: upload parts concurrently --- .../iota-data-ingestion/src/workers/blob.rs | 37 +++++++++++++------ .../iota-data-ingestion/config/config.yaml | 31 ++++++++++++++-- docker/iota-data-ingestion/config/config.yaml | 4 +- 3 files changed, 54 insertions(+), 18 deletions(-) diff --git a/crates/iota-data-ingestion/src/workers/blob.rs b/crates/iota-data-ingestion/src/workers/blob.rs index a0e709f38f8..5686cf63f2c 100644 --- a/crates/iota-data-ingestion/src/workers/blob.rs +++ b/crates/iota-data-ingestion/src/workers/blob.rs @@ -7,10 +7,11 @@ use std::time::Duration; use anyhow::Result; use async_trait::async_trait; use bytes::Bytes; +use futures::{StreamExt, stream::FuturesUnordered}; use iota_data_ingestion_core::{Worker, create_remote_store_client_with_ops}; use iota_storage::blob::{Blob, BlobEncoding}; use iota_types::full_checkpoint_content::CheckpointData; -use object_store::{MultipartUpload, ObjectStore, RetryConfig, path::Path}; +use object_store::{BackoffConfig, MultipartUpload, ObjectStore, RetryConfig, path::Path}; use serde::{Deserialize, Deserializer, Serialize}; /// Minimum allowed chunk size to be uploaded to remote store @@ -52,7 +53,10 @@ impl BlobWorker { RetryConfig { max_retries: 10, retry_timeout: Duration::from_secs(config.request_timeout_secs + 1), - ..Default::default() + backoff: BackoffConfig { + init_backoff: Duration::from_secs(1), + ..Default::default() + }, }, ) .expect("failed to create remote store client"), @@ -95,16 +99,25 @@ impl BlobWorker { let chunks = bytes.chunks(self.checkpoint_chunk_size_mb as usize); let total_chunks = chunks.len(); - for (chunk_id, chunk) in chunks.enumerate() { - let start_time = std::time::Instant::now(); - multipart - .put_part(Bytes::copy_from_slice(chunk).into()) - .await?; - tracing::info!( - "uploaded checkpoint {chk_seq_num} chunk {}/{total_chunks} in {:?}", - chunk_id + 1, - start_time.elapsed() - ); + let mut parts_ordered_futures = chunks + .into_iter() + .map(|chunk| multipart.put_part(Bytes::copy_from_slice(chunk).into())) + .collect::>(); + + let mut uploaded_part = 0; + while let Some(part_result) = parts_ordered_futures.next().await { + match part_result { + Ok(()) => { + uploaded_part += 1; + tracing::info!( + "uploaded checkpoint {chk_seq_num} chunk {uploaded_part}/{total_chunks}" + ); + } + Err(err) => { + multipart.abort().await?; + tracing::error!("error uploading part: {err}"); + } + } } let start_time = std::time::Instant::now(); diff --git a/dev-tools/iota-data-ingestion/config/config.yaml b/dev-tools/iota-data-ingestion/config/config.yaml index fd28c69aea5..b87398e0b14 100644 --- a/dev-tools/iota-data-ingestion/config/config.yaml +++ b/dev-tools/iota-data-ingestion/config/config.yaml @@ -13,7 +13,7 @@ progress_store: # DynamoDB table name table_name: "checkpoint-progress" -# Wrokers Configs +# Workers Configs # tasks: # Task unique name @@ -24,10 +24,33 @@ tasks: blob: # S3 bucket where checkpoints will be stored url: "s3://checkpoints" - # S3 client timeout, a large timeout is needed to accommodate - # checkpoints which could have the size of several GBs + # S3 client timeout, a large timeout is needed to accommodate the checkpoints which could have the size of several GBs # - request_timeout_secs: 60 + request_timeout_secs: 180 + # Checkpoint upload chunk size (in MB) that determines the upload strategy: + # + # If checkpoint size < checkpoint_chunk_size_mb: + # - Uploads checkpoint using single PUT operation + # - Optimal for smaller checkpoints + # + # If checkpoint size >= checkpoint_chunk_size_mb: + # - Divides checkpoint into chunks of this size + # - Uploads chunks as multipart + # - Storage service concatenates parts on completion + # + # Example with 50MB chunk size: + # 200MB checkpoint: + # - Splits into 4 parts (50MB each) + # - Multipart upload of each part + # - Parts merged on remote storage + # + # 40MB checkpoint: + # - Single PUT upload + # - No chunking needed + # + # Minimum allowed chunk size is 5MB + # + checkpoint_chunk_size_mb: 100 # AWS S3 client config options remote_store_options: - ["access_key_id", "test"] diff --git a/docker/iota-data-ingestion/config/config.yaml b/docker/iota-data-ingestion/config/config.yaml index 8414ab9bd39..b87398e0b14 100644 --- a/docker/iota-data-ingestion/config/config.yaml +++ b/docker/iota-data-ingestion/config/config.yaml @@ -35,7 +35,7 @@ tasks: # # If checkpoint size >= checkpoint_chunk_size_mb: # - Divides checkpoint into chunks of this size - # - Uploads chunks in parallel as multipart upload + # - Uploads chunks as multipart # - Storage service concatenates parts on completion # # Example with 50MB chunk size: @@ -50,7 +50,7 @@ tasks: # # Minimum allowed chunk size is 5MB # - checkpoint_chunk_size_mb: 50 + checkpoint_chunk_size_mb: 100 # AWS S3 client config options remote_store_options: - ["access_key_id", "test"] From a5f7d89854d05adaf7e7532fe729f1b748507e50 Mon Sep 17 00:00:00 2001 From: Sergiu Popescu Date: Wed, 29 Jan 2025 12:08:19 +0100 Subject: [PATCH 07/13] fixup! refactor: upload parts concurrently --- .../iota-data-ingestion/src/workers/blob.rs | 21 +++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/crates/iota-data-ingestion/src/workers/blob.rs b/crates/iota-data-ingestion/src/workers/blob.rs index 5686cf63f2c..a2ad72a3b64 100644 --- a/crates/iota-data-ingestion/src/workers/blob.rs +++ b/crates/iota-data-ingestion/src/workers/blob.rs @@ -4,10 +4,10 @@ use std::time::Duration; -use anyhow::Result; +use anyhow::{Result, bail}; use async_trait::async_trait; use bytes::Bytes; -use futures::{StreamExt, stream::FuturesUnordered}; +use futures::{StreamExt, stream}; use iota_data_ingestion_core::{Worker, create_remote_store_client_with_ops}; use iota_storage::blob::{Blob, BlobEncoding}; use iota_types::full_checkpoint_content::CheckpointData; @@ -16,6 +16,9 @@ use serde::{Deserialize, Deserializer, Serialize}; /// Minimum allowed chunk size to be uploaded to remote store const MIN_CHUNK_SIZE_MB: u64 = 5 * 1024 * 1024; // 5 MB +/// The maximum number of concurrent requests allowed when uploading checkpoint +/// chunk parts to remote store +const MAX_CONCURRENT_PARTS_UPLOAD: usize = 50; #[derive(Serialize, Deserialize, Clone, Debug)] pub struct BlobTaskConfig { @@ -99,13 +102,18 @@ impl BlobWorker { let chunks = bytes.chunks(self.checkpoint_chunk_size_mb as usize); let total_chunks = chunks.len(); - let mut parts_ordered_futures = chunks + let parts_futures = chunks .into_iter() .map(|chunk| multipart.put_part(Bytes::copy_from_slice(chunk).into())) - .collect::>(); + .collect::>(); + + let stream_parts_futures = stream::iter(parts_futures); + + let mut buffered_uploaded_parts = + stream_parts_futures.buffer_unordered(MAX_CONCURRENT_PARTS_UPLOAD); let mut uploaded_part = 0; - while let Some(part_result) = parts_ordered_futures.next().await { + while let Some(part_result) = buffered_uploaded_parts.next().await { match part_result { Ok(()) => { uploaded_part += 1; @@ -114,8 +122,9 @@ impl BlobWorker { ); } Err(err) => { - multipart.abort().await?; tracing::error!("error uploading part: {err}"); + multipart.abort().await?; + bail!("checkpoint {chk_seq_num} multipart upload aborted"); } } } From 777681d6bcaee1d6eb20e9969aac7056bbc47eef Mon Sep 17 00:00:00 2001 From: Sergiu Popescu Date: Wed, 29 Jan 2025 12:10:31 +0100 Subject: [PATCH 08/13] doc: add fn documentation --- crates/iota-data-ingestion-core/src/util.rs | 129 ++++++++++++++++++++ 1 file changed, 129 insertions(+) diff --git a/crates/iota-data-ingestion-core/src/util.rs b/crates/iota-data-ingestion-core/src/util.rs index 86c4586c5f9..b0190d530e4 100644 --- a/crates/iota-data-ingestion-core/src/util.rs +++ b/crates/iota-data-ingestion-core/src/util.rs @@ -10,6 +10,68 @@ use object_store::{ }; use url::Url; +/// Creates a remote store client *without* any retry mechanism. +/// +/// This function constructs a remote store client configured to *not* retry +/// failed requests. All requests will fail immediately if the underlying +/// operation encounters an error. This is a convenience wrapper around +/// `create_remote_store_client_with_ops` that sets the retry configuration +/// to disable retries. +/// +/// # Arguments +/// +/// * `url`: The URL of the remote store. The scheme of the URL determines the +/// storage provider: +/// * `http://` or `https://`: HTTP-based store. +/// * `gs://`: Google Cloud Storage. +/// * `s3://` or other AWS S3-compatible URL: Amazon S3. +/// * `remote_store_options`: A vector of key-value pairs representing +/// provider-specific options. +/// * For GCS: See [`object_store::gcp::GoogleConfigKey`] for valid keys. +/// * For S3: See [`object_store::aws::AmazonS3ConfigKey`] for valid keys. +/// * For HTTP: No options are currently supported. This parameter should be +/// empty. +/// * `request_timeout_secs`: The timeout duration (in seconds) for individual +/// requests. This timeout is used to set a slightly longer retry timeout +/// (request_timeout_secs + 1) internally, even though retries are disabled. +/// This is done to ensure that the overall operation doesn't hang +/// indefinitely. +/// +/// # Examples +/// +/// Creating an S3 client without retries: +/// +/// ```rust +/// use object_store::aws::AmazonS3ConfigKey; +/// +/// let url = "s3://my-bucket"; +/// let options = vec![( +/// AmazonS3ConfigKey::Region.to_string(), +/// "us-east-1".to_string(), +/// )]; +/// let client = create_remote_store_client(url.to_string(), options, 30)?; +/// ``` +/// +/// Creating a GCS client without retries: +/// +/// ```rust +/// use object_store::gcp::GoogleConfigKey; +/// +/// let url = "gs://my-bucket"; +/// let options = vec![( +/// GoogleConfigKey::ProjectId.to_string(), +/// "my-project".to_string(), +/// )]; +/// let client = create_remote_store_client(url.to_string(), options, 30)?; +/// ``` +/// +/// Creating an HTTP client without retries (no options supported): +/// +/// ```rust +/// let url = "http://example.bucket.com"; +/// let options = vec![]; // No options for HTTP +/// let client = create_remote_store_client(url.to_string(), options, 30)?; +/// ``` pub fn create_remote_store_client( url: String, remote_store_options: Vec<(String, String)>, @@ -29,6 +91,73 @@ pub fn create_remote_store_client( ) } +/// Creates a remote store client with configurable retry behavior and options. +/// +/// This function constructs a remote store client for various cloud storage +/// providers (HTTP, Google Cloud Storage, Amazon S3) based on the provided URL +/// and options. It allows configuring retry behavior through the `retry_config` +/// argument. +/// +/// # Arguments +/// +/// * `url`: The URL of the remote store. The scheme of the URL determines the +/// storage provider: +/// * `http://` or `https://`: HTTP-based store. +/// * `gs://`: Google Cloud Storage. +/// * `s3://` or other AWS S3-compatible URL: Amazon S3. +/// * `remote_store_options`: A vector of key-value pairs representing +/// provider-specific options. +/// * For GCS: See [`object_store::gcp::GoogleConfigKey`] for valid keys. +/// * For S3: See [`object_store::aws::AmazonS3ConfigKey`] for valid keys. +/// * For HTTP: No options are currently supported. This parameter should be +/// empty. +/// * `request_timeout_secs`: The timeout duration (in seconds) for individual +/// requests. +/// * `retry_config`: A [`RetryConfig`] struct defining the retry strategy. This +/// allows fine-grained control over the number of retries, backoff behavior, +/// and retry timeouts. See the documentation for +/// [`object_store::retry::RetryConfig`] for details. +/// +/// # Examples +/// +/// Creating an S3 client with specific options and a retry configuration: +/// +/// ```rust +/// use object_store::{aws::AmazonS3ConfigKey, retry::RetryConfig}; +/// +/// let url = "s3://my-bucket"; +/// let options = vec![( +/// AmazonS3ConfigKey::Region.to_string(), +/// "us-east-1".to_string(), +/// )]; +/// let retry_config = RetryConfig::default(); // Use default retry settings +/// let client = create_remote_store_client_with_ops(url.to_string(), options, 30, retry_config)?; +/// ``` +/// +/// Creating a GCS client: +/// +/// ```rust +/// use object_store::{gcp::GoogleConfigKey, retry::RetryConfig}; +/// +/// let url = "gs://my-bucket"; +/// let options = vec![( +/// GoogleConfigKey::ProjectId.to_string(), +/// "my-project".to_string(), +/// )]; +/// let retry_config = RetryConfig::default(); +/// let client = create_remote_store_client_with_ops(url.to_string(), options, 30, retry_config)?; +/// ``` +/// +/// Creating an HTTP client (no options supported): +/// +/// ```rust +/// use object_store::retry::RetryConfig; +/// +/// let url = "http://example.bucket.com"; +/// let options = vec![]; // No options for HTTP +/// let retry_config = RetryConfig::default(); +/// let client = create_remote_store_client_with_ops(url.to_string(), options, 30, retry_config)?; +/// ``` pub fn create_remote_store_client_with_ops( url: String, remote_store_options: Vec<(String, String)>, From 2c8ce076368585f9cc265f52d40125b79804f7d0 Mon Sep 17 00:00:00 2001 From: Sergiu Popescu Date: Wed, 29 Jan 2025 12:25:26 +0100 Subject: [PATCH 09/13] fixup! fixup! refactor: upload parts concurrently --- crates/iota-data-ingestion/src/workers/blob.rs | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/crates/iota-data-ingestion/src/workers/blob.rs b/crates/iota-data-ingestion/src/workers/blob.rs index a2ad72a3b64..b83d795eefc 100644 --- a/crates/iota-data-ingestion/src/workers/blob.rs +++ b/crates/iota-data-ingestion/src/workers/blob.rs @@ -7,7 +7,7 @@ use std::time::Duration; use anyhow::{Result, bail}; use async_trait::async_trait; use bytes::Bytes; -use futures::{StreamExt, stream}; +use futures::{StreamExt, TryFutureExt, stream}; use iota_data_ingestion_core::{Worker, create_remote_store_client_with_ops}; use iota_storage::blob::{Blob, BlobEncoding}; use iota_types::full_checkpoint_content::CheckpointData; @@ -104,21 +104,24 @@ impl BlobWorker { let parts_futures = chunks .into_iter() - .map(|chunk| multipart.put_part(Bytes::copy_from_slice(chunk).into())) + .enumerate() + .map(|(chunk_id, chunk)| { + multipart + .put_part(Bytes::copy_from_slice(chunk).into()) + .map_ok(move |_| chunk_id + 1) + }) .collect::>(); let stream_parts_futures = stream::iter(parts_futures); let mut buffered_uploaded_parts = - stream_parts_futures.buffer_unordered(MAX_CONCURRENT_PARTS_UPLOAD); + stream_parts_futures.buffered(MAX_CONCURRENT_PARTS_UPLOAD); - let mut uploaded_part = 0; while let Some(part_result) = buffered_uploaded_parts.next().await { match part_result { - Ok(()) => { - uploaded_part += 1; + Ok(uploaded_chunk_id) => { tracing::info!( - "uploaded checkpoint {chk_seq_num} chunk {uploaded_part}/{total_chunks}" + "uploaded checkpoint {chk_seq_num} chunk {uploaded_chunk_id}/{total_chunks}", ); } Err(err) => { From 1974930a2fc592d362a861f5c5c6ef8f50ea1587 Mon Sep 17 00:00:00 2001 From: Sergiu Popescu Date: Wed, 29 Jan 2025 13:34:59 +0100 Subject: [PATCH 10/13] fix: remove redundant file --- docker/iota-data-ingestion/config/config.yaml | 60 ------------------- 1 file changed, 60 deletions(-) delete mode 100644 docker/iota-data-ingestion/config/config.yaml diff --git a/docker/iota-data-ingestion/config/config.yaml b/docker/iota-data-ingestion/config/config.yaml deleted file mode 100644 index b87398e0b14..00000000000 --- a/docker/iota-data-ingestion/config/config.yaml +++ /dev/null @@ -1,60 +0,0 @@ -# IndexerExecutor config -# -path: "./test-checkpoints" -# IOTA Node Rest API URL -remote_store_url: "http://localhost:9000/api/v1" - -# DynamoDbProgressStore config -# -progress_store: - aws_access_key_id: "test" - aws_secret_access_key: "test" - aws_region: "us-east-1" - # DynamoDB table name - table_name: "checkpoint-progress" - -# Workers Configs -# -tasks: - # Task unique name - - name: "local-blob-storage" - # Number of workers will process the checkpoints in parallel - concurrency: 1 - # Task type - blob: - # S3 bucket where checkpoints will be stored - url: "s3://checkpoints" - # S3 client timeout, a large timeout is needed to accommodate the checkpoints which could have the size of several GBs - # - request_timeout_secs: 180 - # Checkpoint upload chunk size (in MB) that determines the upload strategy: - # - # If checkpoint size < checkpoint_chunk_size_mb: - # - Uploads checkpoint using single PUT operation - # - Optimal for smaller checkpoints - # - # If checkpoint size >= checkpoint_chunk_size_mb: - # - Divides checkpoint into chunks of this size - # - Uploads chunks as multipart - # - Storage service concatenates parts on completion - # - # Example with 50MB chunk size: - # 200MB checkpoint: - # - Splits into 4 parts (50MB each) - # - Multipart upload of each part - # - Parts merged on remote storage - # - # 40MB checkpoint: - # - Single PUT upload - # - No chunking needed - # - # Minimum allowed chunk size is 5MB - # - checkpoint_chunk_size_mb: 100 - # AWS S3 client config options - remote_store_options: - - ["access_key_id", "test"] - - ["secret_access_key", "test"] - - ["region", "us-east-1"] - # Only needed if using Localstack for local development purposes, it's preferred to be removed - - ["endpoint_url", "http://localhost:4566"] From f55f2c1d2adc57c6d7a88abcbd913885913c6dca Mon Sep 17 00:00:00 2001 From: Sergiu Popescu Date: Wed, 29 Jan 2025 13:43:36 +0100 Subject: [PATCH 11/13] fixup! doc: add fn documentation --- crates/iota-data-ingestion-core/src/util.rs | 50 ++++++++++++--------- 1 file changed, 30 insertions(+), 20 deletions(-) diff --git a/crates/iota-data-ingestion-core/src/util.rs b/crates/iota-data-ingestion-core/src/util.rs index b0190d530e4..3f84d2a7ba3 100644 --- a/crates/iota-data-ingestion-core/src/util.rs +++ b/crates/iota-data-ingestion-core/src/util.rs @@ -42,35 +42,39 @@ use url::Url; /// Creating an S3 client without retries: /// /// ```rust +/// # use iota_data_ingestion_core::create_remote_store_client; /// use object_store::aws::AmazonS3ConfigKey; /// /// let url = "s3://my-bucket"; /// let options = vec![( -/// AmazonS3ConfigKey::Region.to_string(), +/// AmazonS3ConfigKey::Region.as_ref().to_owned(), /// "us-east-1".to_string(), /// )]; -/// let client = create_remote_store_client(url.to_string(), options, 30)?; +/// let client = create_remote_store_client(url.to_string(), options, 30).unwrap(); /// ``` /// /// Creating a GCS client without retries: /// -/// ```rust +/// ```text +/// # use iota_data_ingestion_core::create_remote_store_client; /// use object_store::gcp::GoogleConfigKey; /// /// let url = "gs://my-bucket"; /// let options = vec![( -/// GoogleConfigKey::ProjectId.to_string(), -/// "my-project".to_string(), +/// GoogleConfigKey::ServiceAccount.as_ref().to_owned(), +/// "my-service-account".to_string(), /// )]; -/// let client = create_remote_store_client(url.to_string(), options, 30)?; +/// let client = create_remote_store_client(url.to_string(), options, 30).unwrap(); /// ``` /// /// Creating an HTTP client without retries (no options supported): /// -/// ```rust +/// ```text +/// # use iota_data_ingestion_core::create_remote_store_client; +/// /// let url = "http://example.bucket.com"; /// let options = vec![]; // No options for HTTP -/// let client = create_remote_store_client(url.to_string(), options, 30)?; +/// let client = create_remote_store_client(url.to_string(), options, 30).unwrap(); /// ``` pub fn create_remote_store_client( url: String, @@ -122,41 +126,47 @@ pub fn create_remote_store_client( /// /// Creating an S3 client with specific options and a retry configuration: /// -/// ```rust -/// use object_store::{aws::AmazonS3ConfigKey, retry::RetryConfig}; +/// ```text +/// # use iota_data_ingestion_core::create_remote_store_client_with_ops; +/// use object_store::{RetryConfig, aws::AmazonS3ConfigKey}; /// /// let url = "s3://my-bucket"; /// let options = vec![( -/// AmazonS3ConfigKey::Region.to_string(), +/// AmazonS3ConfigKey::Region.as_ref().to_owned(), /// "us-east-1".to_string(), /// )]; /// let retry_config = RetryConfig::default(); // Use default retry settings -/// let client = create_remote_store_client_with_ops(url.to_string(), options, 30, retry_config)?; +/// let client = +/// create_remote_store_client_with_ops(url.to_string(), options, 30, retry_config).unwrap(); /// ``` /// /// Creating a GCS client: /// -/// ```rust -/// use object_store::{gcp::GoogleConfigKey, retry::RetryConfig}; +/// ```text +/// # use iota_data_ingestion_core::create_remote_store_client_with_ops; +/// use object_store::{RetryConfig, gcp::GoogleConfigKey}; /// /// let url = "gs://my-bucket"; /// let options = vec![( -/// GoogleConfigKey::ProjectId.to_string(), -/// "my-project".to_string(), +/// GoogleConfigKey::ServiceAccount.as_ref().to_owned(), +/// "my-service-account".to_string(), /// )]; /// let retry_config = RetryConfig::default(); -/// let client = create_remote_store_client_with_ops(url.to_string(), options, 30, retry_config)?; +/// let client = +/// create_remote_store_client_with_ops(url.to_string(), options, 30, retry_config).unwrap(); /// ``` /// /// Creating an HTTP client (no options supported): /// -/// ```rust -/// use object_store::retry::RetryConfig; +/// ```text +/// # use iota_data_ingestion_core::create_remote_store_client_with_ops; +/// use object_store::RetryConfig; /// /// let url = "http://example.bucket.com"; /// let options = vec![]; // No options for HTTP /// let retry_config = RetryConfig::default(); -/// let client = create_remote_store_client_with_ops(url.to_string(), options, 30, retry_config)?; +/// let client = +/// create_remote_store_client_with_ops(url.to_string(), options, 30, retry_config).unwrap(); /// ``` pub fn create_remote_store_client_with_ops( url: String, From 6eb5a17403c9177bc289c8b3dc9a0a5c788ba3ed Mon Sep 17 00:00:00 2001 From: Sergiu Popescu Date: Wed, 29 Jan 2025 14:06:01 +0100 Subject: [PATCH 12/13] fixup! fixup! doc: add fn documentation --- crates/iota-data-ingestion-core/src/util.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/iota-data-ingestion-core/src/util.rs b/crates/iota-data-ingestion-core/src/util.rs index 3f84d2a7ba3..19109cff9a7 100644 --- a/crates/iota-data-ingestion-core/src/util.rs +++ b/crates/iota-data-ingestion-core/src/util.rs @@ -120,7 +120,7 @@ pub fn create_remote_store_client( /// * `retry_config`: A [`RetryConfig`] struct defining the retry strategy. This /// allows fine-grained control over the number of retries, backoff behavior, /// and retry timeouts. See the documentation for -/// [`object_store::retry::RetryConfig`] for details. +/// [`object_store::RetryConfig`] for details. /// /// # Examples /// From 65a3e3ad725f7537d5a3d288928f49a2f45fae5e Mon Sep 17 00:00:00 2001 From: Sergiu Popescu Date: Wed, 29 Jan 2025 17:46:19 +0100 Subject: [PATCH 13/13] fixup! fixup! fixup! doc: add fn documentation --- .../iota-data-ingestion/src/workers/blob.rs | 23 ++++++++----------- 1 file changed, 9 insertions(+), 14 deletions(-) diff --git a/crates/iota-data-ingestion/src/workers/blob.rs b/crates/iota-data-ingestion/src/workers/blob.rs index b83d795eefc..19a3a9e6af9 100644 --- a/crates/iota-data-ingestion/src/workers/blob.rs +++ b/crates/iota-data-ingestion/src/workers/blob.rs @@ -7,7 +7,7 @@ use std::time::Duration; use anyhow::{Result, bail}; use async_trait::async_trait; use bytes::Bytes; -use futures::{StreamExt, TryFutureExt, stream}; +use futures::{StreamExt, stream}; use iota_data_ingestion_core::{Worker, create_remote_store_client_with_ops}; use iota_storage::blob::{Blob, BlobEncoding}; use iota_types::full_checkpoint_content::CheckpointData; @@ -104,24 +104,19 @@ impl BlobWorker { let parts_futures = chunks .into_iter() - .enumerate() - .map(|(chunk_id, chunk)| { - multipart - .put_part(Bytes::copy_from_slice(chunk).into()) - .map_ok(move |_| chunk_id + 1) - }) + .map(|chunk| multipart.put_part(Bytes::copy_from_slice(chunk).into())) .collect::>(); - let stream_parts_futures = stream::iter(parts_futures); + let mut buffered_uploaded_parts = stream::iter(parts_futures) + .buffer_unordered(MAX_CONCURRENT_PARTS_UPLOAD) + .enumerate(); - let mut buffered_uploaded_parts = - stream_parts_futures.buffered(MAX_CONCURRENT_PARTS_UPLOAD); - - while let Some(part_result) = buffered_uploaded_parts.next().await { + while let Some((uploaded_chunk_id, part_result)) = buffered_uploaded_parts.next().await { match part_result { - Ok(uploaded_chunk_id) => { + Ok(()) => { tracing::info!( - "uploaded checkpoint {chk_seq_num} chunk {uploaded_chunk_id}/{total_chunks}", + "uploaded checkpoint {chk_seq_num} chunk {}/{total_chunks}", + uploaded_chunk_id + 1 ); } Err(err) => {