Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: store large checkpoints as multipart upload #5026

Merged
merged 14 commits into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/iota-data-ingestion-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use iota_types::{
pub use metrics::DataIngestionMetrics;
pub use progress_store::{FileProgressStore, ProgressStore, ShimProgressStore};
pub use reader::ReaderOptions;
pub use util::create_remote_store_client;
pub use util::{create_remote_store_client, create_remote_store_client_with_ops};
pub use worker_pool::WorkerPool;

#[async_trait]
Expand Down
160 changes: 157 additions & 3 deletions crates/iota-data-ingestion-core/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,172 @@ use object_store::{
};
use url::Url;

/// Creates a remote store client *without* any retry mechanism.
///
/// This function constructs a remote store client configured to *not* retry
/// failed requests. All requests will fail immediately if the underlying
/// operation encounters an error. This is a convenience wrapper around
/// `create_remote_store_client_with_ops` that sets the retry configuration
/// to disable retries.
///
/// # Arguments
///
/// * `url`: The URL of the remote store. The scheme of the URL determines the
/// storage provider:
/// * `http://` or `https://`: HTTP-based store.
/// * `gs://`: Google Cloud Storage.
/// * `s3://` or other AWS S3-compatible URL: Amazon S3.
/// * `remote_store_options`: A vector of key-value pairs representing
/// provider-specific options.
/// * For GCS: See [`object_store::gcp::GoogleConfigKey`] for valid keys.
/// * For S3: See [`object_store::aws::AmazonS3ConfigKey`] for valid keys.
/// * For HTTP: No options are currently supported. This parameter should be
/// empty.
/// * `request_timeout_secs`: The timeout duration (in seconds) for individual
/// requests. This timeout is used to set a slightly longer retry timeout
/// (request_timeout_secs + 1) internally, even though retries are disabled.
/// This is done to ensure that the overall operation doesn't hang
/// indefinitely.
///
/// # Examples
///
/// Creating an S3 client without retries:
///
/// ```rust
/// # use iota_data_ingestion_core::create_remote_store_client;
/// use object_store::aws::AmazonS3ConfigKey;
///
/// let url = "s3://my-bucket";
/// let options = vec![(
/// AmazonS3ConfigKey::Region.as_ref().to_owned(),
/// "us-east-1".to_string(),
/// )];
/// let client = create_remote_store_client(url.to_string(), options, 30).unwrap();
/// ```
///
/// Creating a GCS client without retries:
///
/// ```text
/// # use iota_data_ingestion_core::create_remote_store_client;
/// use object_store::gcp::GoogleConfigKey;
///
/// let url = "gs://my-bucket";
/// let options = vec![(
/// GoogleConfigKey::ServiceAccount.as_ref().to_owned(),
/// "my-service-account".to_string(),
/// )];
/// let client = create_remote_store_client(url.to_string(), options, 30).unwrap();
/// ```
///
/// Creating an HTTP client without retries (no options supported):
///
/// ```text
/// # use iota_data_ingestion_core::create_remote_store_client;
///
/// let url = "http://example.bucket.com";
/// let options = vec![]; // No options for HTTP
/// let client = create_remote_store_client(url.to_string(), options, 30).unwrap();
/// ```
pub fn create_remote_store_client(
url: String,
remote_store_options: Vec<(String, String)>,
timeout_secs: u64,
request_timeout_secs: u64,
) -> Result<Box<dyn ObjectStore>> {
let retry_config = RetryConfig {
max_retries: 0,
retry_timeout: Duration::from_secs(timeout_secs + 1),
retry_timeout: Duration::from_secs(request_timeout_secs + 1),
..Default::default()
};

create_remote_store_client_with_ops(
url,
remote_store_options,
request_timeout_secs,
retry_config,
)
}

/// Creates a remote store client with configurable retry behavior and options.
///
/// This function constructs a remote store client for various cloud storage
/// providers (HTTP, Google Cloud Storage, Amazon S3) based on the provided URL
/// and options. It allows configuring retry behavior through the `retry_config`
/// argument.
///
/// # Arguments
///
/// * `url`: The URL of the remote store. The scheme of the URL determines the
/// storage provider:
/// * `http://` or `https://`: HTTP-based store.
/// * `gs://`: Google Cloud Storage.
/// * `s3://` or other AWS S3-compatible URL: Amazon S3.
/// * `remote_store_options`: A vector of key-value pairs representing
/// provider-specific options.
/// * For GCS: See [`object_store::gcp::GoogleConfigKey`] for valid keys.
/// * For S3: See [`object_store::aws::AmazonS3ConfigKey`] for valid keys.
/// * For HTTP: No options are currently supported. This parameter should be
/// empty.
/// * `request_timeout_secs`: The timeout duration (in seconds) for individual
/// requests.
/// * `retry_config`: A [`RetryConfig`] struct defining the retry strategy. This
/// allows fine-grained control over the number of retries, backoff behavior,
/// and retry timeouts. See the documentation for
/// [`object_store::RetryConfig`] for details.
///
/// # Examples
///
/// Creating an S3 client with specific options and a retry configuration:
///
/// ```text
/// # use iota_data_ingestion_core::create_remote_store_client_with_ops;
/// use object_store::{RetryConfig, aws::AmazonS3ConfigKey};
///
/// let url = "s3://my-bucket";
/// let options = vec![(
/// AmazonS3ConfigKey::Region.as_ref().to_owned(),
/// "us-east-1".to_string(),
/// )];
/// let retry_config = RetryConfig::default(); // Use default retry settings
/// let client =
/// create_remote_store_client_with_ops(url.to_string(), options, 30, retry_config).unwrap();
/// ```
///
/// Creating a GCS client:
///
/// ```text
/// # use iota_data_ingestion_core::create_remote_store_client_with_ops;
/// use object_store::{RetryConfig, gcp::GoogleConfigKey};
///
/// let url = "gs://my-bucket";
/// let options = vec![(
/// GoogleConfigKey::ServiceAccount.as_ref().to_owned(),
/// "my-service-account".to_string(),
/// )];
/// let retry_config = RetryConfig::default();
/// let client =
/// create_remote_store_client_with_ops(url.to_string(), options, 30, retry_config).unwrap();
/// ```
///
/// Creating an HTTP client (no options supported):
///
/// ```text
/// # use iota_data_ingestion_core::create_remote_store_client_with_ops;
/// use object_store::RetryConfig;
///
/// let url = "http://example.bucket.com";
/// let options = vec![]; // No options for HTTP
/// let retry_config = RetryConfig::default();
/// let client =
/// create_remote_store_client_with_ops(url.to_string(), options, 30, retry_config).unwrap();
/// ```
pub fn create_remote_store_client_with_ops(
url: String,
remote_store_options: Vec<(String, String)>,
request_timeout_secs: u64,
retry_config: RetryConfig,
) -> Result<Box<dyn ObjectStore>> {
let client_options = ClientOptions::new()
.with_timeout(Duration::from_secs(timeout_secs))
.with_timeout(Duration::from_secs(request_timeout_secs))
.with_allow_http(true);
if remote_store_options.is_empty() {
let http_store = object_store::http::HttpBuilder::new()
Expand Down
131 changes: 122 additions & 9 deletions crates/iota-data-ingestion/src/workers/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,139 @@
// Modifications Copyright (c) 2024 IOTA Stiftung
// SPDX-License-Identifier: Apache-2.0

use anyhow::Result;
use std::time::Duration;

use anyhow::{Result, bail};
use async_trait::async_trait;
use bytes::Bytes;
use iota_data_ingestion_core::{Worker, create_remote_store_client};
use futures::{StreamExt, stream};
use iota_data_ingestion_core::{Worker, create_remote_store_client_with_ops};
use iota_storage::blob::{Blob, BlobEncoding};
use iota_types::full_checkpoint_content::CheckpointData;
use object_store::{ObjectStore, path::Path};
use serde::{Deserialize, Serialize};
use object_store::{BackoffConfig, MultipartUpload, ObjectStore, RetryConfig, path::Path};
use serde::{Deserialize, Deserializer, Serialize};

/// Minimum allowed chunk size to be uploaded to remote store
const MIN_CHUNK_SIZE_MB: u64 = 5 * 1024 * 1024; // 5 MB
/// The maximum number of concurrent requests allowed when uploading checkpoint
/// chunk parts to remote store
const MAX_CONCURRENT_PARTS_UPLOAD: usize = 50;

#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct BlobTaskConfig {
pub url: String,
pub remote_store_options: Vec<(String, String)>,
pub request_timeout_secs: u64,
#[serde(deserialize_with = "deserialize_chunk")]
pub checkpoint_chunk_size_mb: u64,
}

fn deserialize_chunk<'de, D>(deserializer: D) -> Result<u64, D::Error>
where
D: Deserializer<'de>,
{
let checkpoint_chunk_size = u64::deserialize(deserializer)? * 1024 * 1024;
if checkpoint_chunk_size < MIN_CHUNK_SIZE_MB {
return Err(serde::de::Error::custom("Chunk size must be at least 5 MB"));
}
Ok(checkpoint_chunk_size)
}

pub struct BlobWorker {
remote_store: Box<dyn ObjectStore>,
checkpoint_chunk_size_mb: u64,
}

impl BlobWorker {
pub fn new(config: BlobTaskConfig) -> Self {
Self {
remote_store: create_remote_store_client(config.url, config.remote_store_options, 10)
.expect("failed to create remote store client"),
checkpoint_chunk_size_mb: config.checkpoint_chunk_size_mb,
remote_store: create_remote_store_client_with_ops(
config.url,
config.remote_store_options,
config.request_timeout_secs,
RetryConfig {
max_retries: 10,
retry_timeout: Duration::from_secs(config.request_timeout_secs + 1),
backoff: BackoffConfig {
init_backoff: Duration::from_secs(1),
..Default::default()
},
},
)
.expect("failed to create remote store client"),
}
}

/// Uploads a Checkpoint blob to the Remote Store.
///
/// If the blob size exceeds the configured `CHUNK_SIZE`,
/// it uploads the blob in parts using multipart upload.
/// Otherwise, it uploads the blob directly.
async fn upload_blob(&self, bytes: Vec<u8>, chk_seq_num: u64, location: Path) -> Result<()> {
if bytes.len() > self.checkpoint_chunk_size_mb as usize {
return self
.upload_blob_multipart(bytes, chk_seq_num, location)
.await;
}

self.remote_store
.put(&location, Bytes::from(bytes).into())
.await?;

Ok(())
}

/// Uploads a large Checkpoint blob to the Remote Store using multipart
/// upload.
///
/// This function divides the input `bytes` into chunks of size `CHUNK_SIZE`
/// and uploads each chunk individually.
/// Finally, it completes the multipart upload by assembling all the
/// uploaded parts.
async fn upload_blob_multipart(
&self,
bytes: Vec<u8>,
chk_seq_num: u64,
location: Path,
) -> Result<()> {
let mut multipart = self.remote_store.put_multipart(&location).await?;
let chunks = bytes.chunks(self.checkpoint_chunk_size_mb as usize);
let total_chunks = chunks.len();

let parts_futures = chunks
.into_iter()
.map(|chunk| multipart.put_part(Bytes::copy_from_slice(chunk).into()))
.collect::<Vec<_>>();

let mut buffered_uploaded_parts = stream::iter(parts_futures)
.buffer_unordered(MAX_CONCURRENT_PARTS_UPLOAD)
.enumerate();

while let Some((uploaded_chunk_id, part_result)) = buffered_uploaded_parts.next().await {
match part_result {
Ok(()) => {
tracing::info!(
"uploaded checkpoint {chk_seq_num} chunk {}/{total_chunks}",
uploaded_chunk_id + 1
);
}
Err(err) => {
tracing::error!("error uploading part: {err}");
multipart.abort().await?;
bail!("checkpoint {chk_seq_num} multipart upload aborted");
}
}
}

let start_time = std::time::Instant::now();
multipart.complete().await?;
tracing::info!(
"checkpoint {chk_seq_num} multipart completion request finished in {:?}",
start_time.elapsed()
);

Ok(())
}
}

Expand All @@ -38,9 +146,14 @@ impl Worker for BlobWorker {
"{}.chk",
checkpoint.checkpoint_summary.sequence_number
));
self.remote_store
.put(&location, Bytes::from(bytes).into())
.await?;

self.upload_blob(
bytes,
checkpoint.checkpoint_summary.sequence_number,
location,
)
.await?;

Ok(())
}
}
29 changes: 28 additions & 1 deletion dev-tools/iota-data-ingestion/config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ progress_store:
# DynamoDB table name
table_name: "checkpoint-progress"

# Wrokers Configs
# Workers Configs
#
tasks:
# Task unique name
Expand All @@ -24,6 +24,33 @@ tasks:
blob:
# S3 bucket where checkpoints will be stored
url: "s3://checkpoints"
# S3 client timeout, a large timeout is needed to accommodate the checkpoints which could have the size of several GBs
#
request_timeout_secs: 180
# Checkpoint upload chunk size (in MB) that determines the upload strategy:
#
# If checkpoint size < checkpoint_chunk_size_mb:
# - Uploads checkpoint using single PUT operation
# - Optimal for smaller checkpoints
#
# If checkpoint size >= checkpoint_chunk_size_mb:
# - Divides checkpoint into chunks of this size
# - Uploads chunks as multipart
# - Storage service concatenates parts on completion
#
# Example with 50MB chunk size:
# 200MB checkpoint:
# - Splits into 4 parts (50MB each)
# - Multipart upload of each part
# - Parts merged on remote storage
#
# 40MB checkpoint:
# - Single PUT upload
# - No chunking needed
#
# Minimum allowed chunk size is 5MB
#
checkpoint_chunk_size_mb: 100
# AWS S3 client config options
remote_store_options:
- ["access_key_id", "test"]
Expand Down
Loading