diff --git a/crates/iota-data-ingestion-core/src/lib.rs b/crates/iota-data-ingestion-core/src/lib.rs index f110488f758..a90941dd971 100644 --- a/crates/iota-data-ingestion-core/src/lib.rs +++ b/crates/iota-data-ingestion-core/src/lib.rs @@ -20,7 +20,7 @@ use iota_types::{ pub use metrics::DataIngestionMetrics; pub use progress_store::{FileProgressStore, ProgressStore, ShimProgressStore}; pub use reader::ReaderOptions; -pub use util::create_remote_store_client; +pub use util::{create_remote_store_client, create_remote_store_client_with_ops}; pub use worker_pool::WorkerPool; #[async_trait] diff --git a/crates/iota-data-ingestion-core/src/util.rs b/crates/iota-data-ingestion-core/src/util.rs index 62434dfd6e6..19109cff9a7 100644 --- a/crates/iota-data-ingestion-core/src/util.rs +++ b/crates/iota-data-ingestion-core/src/util.rs @@ -10,18 +10,172 @@ use object_store::{ }; use url::Url; +/// Creates a remote store client *without* any retry mechanism. +/// +/// This function constructs a remote store client configured to *not* retry +/// failed requests. All requests will fail immediately if the underlying +/// operation encounters an error. This is a convenience wrapper around +/// `create_remote_store_client_with_ops` that sets the retry configuration +/// to disable retries. +/// +/// # Arguments +/// +/// * `url`: The URL of the remote store. The scheme of the URL determines the +/// storage provider: +/// * `http://` or `https://`: HTTP-based store. +/// * `gs://`: Google Cloud Storage. +/// * `s3://` or other AWS S3-compatible URL: Amazon S3. +/// * `remote_store_options`: A vector of key-value pairs representing +/// provider-specific options. +/// * For GCS: See [`object_store::gcp::GoogleConfigKey`] for valid keys. +/// * For S3: See [`object_store::aws::AmazonS3ConfigKey`] for valid keys. +/// * For HTTP: No options are currently supported. This parameter should be +/// empty. +/// * `request_timeout_secs`: The timeout duration (in seconds) for individual +/// requests. This timeout is used to set a slightly longer retry timeout +/// (request_timeout_secs + 1) internally, even though retries are disabled. +/// This is done to ensure that the overall operation doesn't hang +/// indefinitely. +/// +/// # Examples +/// +/// Creating an S3 client without retries: +/// +/// ```rust +/// # use iota_data_ingestion_core::create_remote_store_client; +/// use object_store::aws::AmazonS3ConfigKey; +/// +/// let url = "s3://my-bucket"; +/// let options = vec![( +/// AmazonS3ConfigKey::Region.as_ref().to_owned(), +/// "us-east-1".to_string(), +/// )]; +/// let client = create_remote_store_client(url.to_string(), options, 30).unwrap(); +/// ``` +/// +/// Creating a GCS client without retries: +/// +/// ```text +/// # use iota_data_ingestion_core::create_remote_store_client; +/// use object_store::gcp::GoogleConfigKey; +/// +/// let url = "gs://my-bucket"; +/// let options = vec![( +/// GoogleConfigKey::ServiceAccount.as_ref().to_owned(), +/// "my-service-account".to_string(), +/// )]; +/// let client = create_remote_store_client(url.to_string(), options, 30).unwrap(); +/// ``` +/// +/// Creating an HTTP client without retries (no options supported): +/// +/// ```text +/// # use iota_data_ingestion_core::create_remote_store_client; +/// +/// let url = "http://example.bucket.com"; +/// let options = vec![]; // No options for HTTP +/// let client = create_remote_store_client(url.to_string(), options, 30).unwrap(); +/// ``` pub fn create_remote_store_client( url: String, remote_store_options: Vec<(String, String)>, - timeout_secs: u64, + request_timeout_secs: u64, ) -> Result> { let retry_config = RetryConfig { max_retries: 0, - retry_timeout: Duration::from_secs(timeout_secs + 1), + retry_timeout: Duration::from_secs(request_timeout_secs + 1), ..Default::default() }; + + create_remote_store_client_with_ops( + url, + remote_store_options, + request_timeout_secs, + retry_config, + ) +} + +/// Creates a remote store client with configurable retry behavior and options. +/// +/// This function constructs a remote store client for various cloud storage +/// providers (HTTP, Google Cloud Storage, Amazon S3) based on the provided URL +/// and options. It allows configuring retry behavior through the `retry_config` +/// argument. +/// +/// # Arguments +/// +/// * `url`: The URL of the remote store. The scheme of the URL determines the +/// storage provider: +/// * `http://` or `https://`: HTTP-based store. +/// * `gs://`: Google Cloud Storage. +/// * `s3://` or other AWS S3-compatible URL: Amazon S3. +/// * `remote_store_options`: A vector of key-value pairs representing +/// provider-specific options. +/// * For GCS: See [`object_store::gcp::GoogleConfigKey`] for valid keys. +/// * For S3: See [`object_store::aws::AmazonS3ConfigKey`] for valid keys. +/// * For HTTP: No options are currently supported. This parameter should be +/// empty. +/// * `request_timeout_secs`: The timeout duration (in seconds) for individual +/// requests. +/// * `retry_config`: A [`RetryConfig`] struct defining the retry strategy. This +/// allows fine-grained control over the number of retries, backoff behavior, +/// and retry timeouts. See the documentation for +/// [`object_store::RetryConfig`] for details. +/// +/// # Examples +/// +/// Creating an S3 client with specific options and a retry configuration: +/// +/// ```text +/// # use iota_data_ingestion_core::create_remote_store_client_with_ops; +/// use object_store::{RetryConfig, aws::AmazonS3ConfigKey}; +/// +/// let url = "s3://my-bucket"; +/// let options = vec![( +/// AmazonS3ConfigKey::Region.as_ref().to_owned(), +/// "us-east-1".to_string(), +/// )]; +/// let retry_config = RetryConfig::default(); // Use default retry settings +/// let client = +/// create_remote_store_client_with_ops(url.to_string(), options, 30, retry_config).unwrap(); +/// ``` +/// +/// Creating a GCS client: +/// +/// ```text +/// # use iota_data_ingestion_core::create_remote_store_client_with_ops; +/// use object_store::{RetryConfig, gcp::GoogleConfigKey}; +/// +/// let url = "gs://my-bucket"; +/// let options = vec![( +/// GoogleConfigKey::ServiceAccount.as_ref().to_owned(), +/// "my-service-account".to_string(), +/// )]; +/// let retry_config = RetryConfig::default(); +/// let client = +/// create_remote_store_client_with_ops(url.to_string(), options, 30, retry_config).unwrap(); +/// ``` +/// +/// Creating an HTTP client (no options supported): +/// +/// ```text +/// # use iota_data_ingestion_core::create_remote_store_client_with_ops; +/// use object_store::RetryConfig; +/// +/// let url = "http://example.bucket.com"; +/// let options = vec![]; // No options for HTTP +/// let retry_config = RetryConfig::default(); +/// let client = +/// create_remote_store_client_with_ops(url.to_string(), options, 30, retry_config).unwrap(); +/// ``` +pub fn create_remote_store_client_with_ops( + url: String, + remote_store_options: Vec<(String, String)>, + request_timeout_secs: u64, + retry_config: RetryConfig, +) -> Result> { let client_options = ClientOptions::new() - .with_timeout(Duration::from_secs(timeout_secs)) + .with_timeout(Duration::from_secs(request_timeout_secs)) .with_allow_http(true); if remote_store_options.is_empty() { let http_store = object_store::http::HttpBuilder::new() diff --git a/crates/iota-data-ingestion/src/workers/blob.rs b/crates/iota-data-ingestion/src/workers/blob.rs index 9afa7ece94d..19a3a9e6af9 100644 --- a/crates/iota-data-ingestion/src/workers/blob.rs +++ b/crates/iota-data-ingestion/src/workers/blob.rs @@ -2,31 +2,139 @@ // Modifications Copyright (c) 2024 IOTA Stiftung // SPDX-License-Identifier: Apache-2.0 -use anyhow::Result; +use std::time::Duration; + +use anyhow::{Result, bail}; use async_trait::async_trait; use bytes::Bytes; -use iota_data_ingestion_core::{Worker, create_remote_store_client}; +use futures::{StreamExt, stream}; +use iota_data_ingestion_core::{Worker, create_remote_store_client_with_ops}; use iota_storage::blob::{Blob, BlobEncoding}; use iota_types::full_checkpoint_content::CheckpointData; -use object_store::{ObjectStore, path::Path}; -use serde::{Deserialize, Serialize}; +use object_store::{BackoffConfig, MultipartUpload, ObjectStore, RetryConfig, path::Path}; +use serde::{Deserialize, Deserializer, Serialize}; + +/// Minimum allowed chunk size to be uploaded to remote store +const MIN_CHUNK_SIZE_MB: u64 = 5 * 1024 * 1024; // 5 MB +/// The maximum number of concurrent requests allowed when uploading checkpoint +/// chunk parts to remote store +const MAX_CONCURRENT_PARTS_UPLOAD: usize = 50; #[derive(Serialize, Deserialize, Clone, Debug)] pub struct BlobTaskConfig { pub url: String, pub remote_store_options: Vec<(String, String)>, + pub request_timeout_secs: u64, + #[serde(deserialize_with = "deserialize_chunk")] + pub checkpoint_chunk_size_mb: u64, +} + +fn deserialize_chunk<'de, D>(deserializer: D) -> Result +where + D: Deserializer<'de>, +{ + let checkpoint_chunk_size = u64::deserialize(deserializer)? * 1024 * 1024; + if checkpoint_chunk_size < MIN_CHUNK_SIZE_MB { + return Err(serde::de::Error::custom("Chunk size must be at least 5 MB")); + } + Ok(checkpoint_chunk_size) } pub struct BlobWorker { remote_store: Box, + checkpoint_chunk_size_mb: u64, } impl BlobWorker { pub fn new(config: BlobTaskConfig) -> Self { Self { - remote_store: create_remote_store_client(config.url, config.remote_store_options, 10) - .expect("failed to create remote store client"), + checkpoint_chunk_size_mb: config.checkpoint_chunk_size_mb, + remote_store: create_remote_store_client_with_ops( + config.url, + config.remote_store_options, + config.request_timeout_secs, + RetryConfig { + max_retries: 10, + retry_timeout: Duration::from_secs(config.request_timeout_secs + 1), + backoff: BackoffConfig { + init_backoff: Duration::from_secs(1), + ..Default::default() + }, + }, + ) + .expect("failed to create remote store client"), + } + } + + /// Uploads a Checkpoint blob to the Remote Store. + /// + /// If the blob size exceeds the configured `CHUNK_SIZE`, + /// it uploads the blob in parts using multipart upload. + /// Otherwise, it uploads the blob directly. + async fn upload_blob(&self, bytes: Vec, chk_seq_num: u64, location: Path) -> Result<()> { + if bytes.len() > self.checkpoint_chunk_size_mb as usize { + return self + .upload_blob_multipart(bytes, chk_seq_num, location) + .await; + } + + self.remote_store + .put(&location, Bytes::from(bytes).into()) + .await?; + + Ok(()) + } + + /// Uploads a large Checkpoint blob to the Remote Store using multipart + /// upload. + /// + /// This function divides the input `bytes` into chunks of size `CHUNK_SIZE` + /// and uploads each chunk individually. + /// Finally, it completes the multipart upload by assembling all the + /// uploaded parts. + async fn upload_blob_multipart( + &self, + bytes: Vec, + chk_seq_num: u64, + location: Path, + ) -> Result<()> { + let mut multipart = self.remote_store.put_multipart(&location).await?; + let chunks = bytes.chunks(self.checkpoint_chunk_size_mb as usize); + let total_chunks = chunks.len(); + + let parts_futures = chunks + .into_iter() + .map(|chunk| multipart.put_part(Bytes::copy_from_slice(chunk).into())) + .collect::>(); + + let mut buffered_uploaded_parts = stream::iter(parts_futures) + .buffer_unordered(MAX_CONCURRENT_PARTS_UPLOAD) + .enumerate(); + + while let Some((uploaded_chunk_id, part_result)) = buffered_uploaded_parts.next().await { + match part_result { + Ok(()) => { + tracing::info!( + "uploaded checkpoint {chk_seq_num} chunk {}/{total_chunks}", + uploaded_chunk_id + 1 + ); + } + Err(err) => { + tracing::error!("error uploading part: {err}"); + multipart.abort().await?; + bail!("checkpoint {chk_seq_num} multipart upload aborted"); + } + } } + + let start_time = std::time::Instant::now(); + multipart.complete().await?; + tracing::info!( + "checkpoint {chk_seq_num} multipart completion request finished in {:?}", + start_time.elapsed() + ); + + Ok(()) } } @@ -38,9 +146,14 @@ impl Worker for BlobWorker { "{}.chk", checkpoint.checkpoint_summary.sequence_number )); - self.remote_store - .put(&location, Bytes::from(bytes).into()) - .await?; + + self.upload_blob( + bytes, + checkpoint.checkpoint_summary.sequence_number, + location, + ) + .await?; + Ok(()) } } diff --git a/dev-tools/iota-data-ingestion/config/config.yaml b/dev-tools/iota-data-ingestion/config/config.yaml index bb913f53ebd..b87398e0b14 100644 --- a/dev-tools/iota-data-ingestion/config/config.yaml +++ b/dev-tools/iota-data-ingestion/config/config.yaml @@ -13,7 +13,7 @@ progress_store: # DynamoDB table name table_name: "checkpoint-progress" -# Wrokers Configs +# Workers Configs # tasks: # Task unique name @@ -24,6 +24,33 @@ tasks: blob: # S3 bucket where checkpoints will be stored url: "s3://checkpoints" + # S3 client timeout, a large timeout is needed to accommodate the checkpoints which could have the size of several GBs + # + request_timeout_secs: 180 + # Checkpoint upload chunk size (in MB) that determines the upload strategy: + # + # If checkpoint size < checkpoint_chunk_size_mb: + # - Uploads checkpoint using single PUT operation + # - Optimal for smaller checkpoints + # + # If checkpoint size >= checkpoint_chunk_size_mb: + # - Divides checkpoint into chunks of this size + # - Uploads chunks as multipart + # - Storage service concatenates parts on completion + # + # Example with 50MB chunk size: + # 200MB checkpoint: + # - Splits into 4 parts (50MB each) + # - Multipart upload of each part + # - Parts merged on remote storage + # + # 40MB checkpoint: + # - Single PUT upload + # - No chunking needed + # + # Minimum allowed chunk size is 5MB + # + checkpoint_chunk_size_mb: 100 # AWS S3 client config options remote_store_options: - ["access_key_id", "test"]