Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: store large checkpoints as multipart upload #5026

Merged
merged 14 commits into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/iota-data-ingestion-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use iota_types::{
pub use metrics::DataIngestionMetrics;
pub use progress_store::{FileProgressStore, ProgressStore, ShimProgressStore};
pub use reader::ReaderOptions;
pub use util::create_remote_store_client;
pub use util::{create_remote_store_client, create_remote_store_client_with_ops};
pub use worker_pool::WorkerPool;

#[async_trait]
Expand Down
21 changes: 18 additions & 3 deletions crates/iota-data-ingestion-core/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,30 @@ use url::Url;
pub fn create_remote_store_client(
url: String,
remote_store_options: Vec<(String, String)>,
timeout_secs: u64,
request_timeout_secs: u64,
) -> Result<Box<dyn ObjectStore>> {
let retry_config = RetryConfig {
max_retries: 0,
retry_timeout: Duration::from_secs(timeout_secs + 1),
retry_timeout: Duration::from_secs(request_timeout_secs + 1),
..Default::default()
};

create_remote_store_client_with_ops(
url,
remote_store_options,
request_timeout_secs,
retry_config,
)
}

pub fn create_remote_store_client_with_ops(
url: String,
remote_store_options: Vec<(String, String)>,
request_timeout_secs: u64,
retry_config: RetryConfig,
) -> Result<Box<dyn ObjectStore>> {
let client_options = ClientOptions::new()
.with_timeout(Duration::from_secs(timeout_secs))
.with_timeout(Duration::from_secs(request_timeout_secs))
.with_allow_http(true);
if remote_store_options.is_empty() {
let http_store = object_store::http::HttpBuilder::new()
Expand Down
122 changes: 114 additions & 8 deletions crates/iota-data-ingestion/src/workers/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,132 @@
// Modifications Copyright (c) 2024 IOTA Stiftung
// SPDX-License-Identifier: Apache-2.0

use std::time::Duration;

use anyhow::Result;
use async_trait::async_trait;
use bytes::Bytes;
use iota_data_ingestion_core::{Worker, create_remote_store_client};
use futures::{StreamExt, stream::FuturesUnordered};
use iota_data_ingestion_core::{Worker, create_remote_store_client_with_ops};
use iota_storage::blob::{Blob, BlobEncoding};
use iota_types::full_checkpoint_content::CheckpointData;
use object_store::{ObjectStore, path::Path};
use serde::{Deserialize, Serialize};
use object_store::{BackoffConfig, MultipartUpload, ObjectStore, RetryConfig, path::Path};
use serde::{Deserialize, Deserializer, Serialize};

/// Minimum allowed chunk size to be uploaded to remote store
const MIN_CHUNK_SIZE_MB: u64 = 5 * 1024 * 1024; // 5 MB

#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct BlobTaskConfig {
pub url: String,
pub remote_store_options: Vec<(String, String)>,
pub request_timeout_secs: u64,
#[serde(deserialize_with = "deserialize_chunk")]
pub checkpoint_chunk_size_mb: u64,
}

fn deserialize_chunk<'de, D>(deserializer: D) -> Result<u64, D::Error>
where
D: Deserializer<'de>,
{
let checkpoint_chunk_size = u64::deserialize(deserializer)? * 1024 * 1024;
if checkpoint_chunk_size < MIN_CHUNK_SIZE_MB {
return Err(serde::de::Error::custom("Chunk size must be at least 5 MB"));
}
Ok(checkpoint_chunk_size)
}

pub struct BlobWorker {
remote_store: Box<dyn ObjectStore>,
checkpoint_chunk_size_mb: u64,
}

impl BlobWorker {
pub fn new(config: BlobTaskConfig) -> Self {
Self {
remote_store: create_remote_store_client(config.url, config.remote_store_options, 10)
.expect("failed to create remote store client"),
checkpoint_chunk_size_mb: config.checkpoint_chunk_size_mb,
remote_store: create_remote_store_client_with_ops(
config.url,
config.remote_store_options,
config.request_timeout_secs,
RetryConfig {
max_retries: 10,
retry_timeout: Duration::from_secs(config.request_timeout_secs + 1),
backoff: BackoffConfig {
init_backoff: Duration::from_secs(1),
..Default::default()
},
},
)
.expect("failed to create remote store client"),
}
}

/// Uploads a Checkpoint blob to the Remote Store.
///
/// If the blob size exceeds the configured `CHUNK_SIZE`,
/// it uploads the blob in parts using multipart upload.
/// Otherwise, it uploads the blob directly.
async fn upload_blob(&self, bytes: Vec<u8>, chk_seq_num: u64, location: Path) -> Result<()> {
if bytes.len() > self.checkpoint_chunk_size_mb as usize {
return self
.upload_blob_multipart(bytes, chk_seq_num, location)
.await;
}

self.remote_store
.put(&location, Bytes::from(bytes).into())
.await?;

Ok(())
}

/// Uploads a large Checkpoint blob to the Remote Store using multipart
/// upload.
///
/// This function divides the input `bytes` into chunks of size `CHUNK_SIZE`
/// and uploads each chunk individually.
/// Finally, it completes the multipart upload by assembling all the
/// uploaded parts.
async fn upload_blob_multipart(
&self,
bytes: Vec<u8>,
chk_seq_num: u64,
location: Path,
) -> Result<()> {
let mut multipart = self.remote_store.put_multipart(&location).await?;
let chunks = bytes.chunks(self.checkpoint_chunk_size_mb as usize);
let total_chunks = chunks.len();

let mut parts_ordered_futures = chunks
.into_iter()
.map(|chunk| multipart.put_part(Bytes::copy_from_slice(chunk).into()))
.collect::<FuturesUnordered<_>>();

let mut uploaded_part = 0;
while let Some(part_result) = parts_ordered_futures.next().await {
match part_result {
Ok(()) => {
uploaded_part += 1;
tracing::info!(
"uploaded checkpoint {chk_seq_num} chunk {uploaded_part}/{total_chunks}"
);
}
Err(err) => {
multipart.abort().await?;
tracing::error!("error uploading part: {err}");
}
}
}

let start_time = std::time::Instant::now();
multipart.complete().await?;
tracing::info!(
"checkpoint {chk_seq_num} multipart completion request finished in {:?}",
start_time.elapsed()
);

Ok(())
}
}

Expand All @@ -38,9 +139,14 @@ impl Worker for BlobWorker {
"{}.chk",
checkpoint.checkpoint_summary.sequence_number
));
self.remote_store
.put(&location, Bytes::from(bytes).into())
.await?;

self.upload_blob(
bytes,
checkpoint.checkpoint_summary.sequence_number,
location,
)
.await?;

Ok(())
}
}
29 changes: 28 additions & 1 deletion dev-tools/iota-data-ingestion/config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ progress_store:
# DynamoDB table name
table_name: "checkpoint-progress"

# Wrokers Configs
# Workers Configs
#
tasks:
# Task unique name
Expand All @@ -24,6 +24,33 @@ tasks:
blob:
# S3 bucket where checkpoints will be stored
url: "s3://checkpoints"
# S3 client timeout, a large timeout is needed to accommodate the checkpoints which could have the size of several GBs
#
request_timeout_secs: 180
# Checkpoint upload chunk size (in MB) that determines the upload strategy:
#
# If checkpoint size < checkpoint_chunk_size_mb:
# - Uploads checkpoint using single PUT operation
# - Optimal for smaller checkpoints
#
# If checkpoint size >= checkpoint_chunk_size_mb:
# - Divides checkpoint into chunks of this size
# - Uploads chunks as multipart
# - Storage service concatenates parts on completion
#
# Example with 50MB chunk size:
# 200MB checkpoint:
# - Splits into 4 parts (50MB each)
# - Multipart upload of each part
# - Parts merged on remote storage
#
# 40MB checkpoint:
# - Single PUT upload
# - No chunking needed
#
# Minimum allowed chunk size is 5MB
#
checkpoint_chunk_size_mb: 100
# AWS S3 client config options
remote_store_options:
- ["access_key_id", "test"]
Expand Down
60 changes: 60 additions & 0 deletions docker/iota-data-ingestion/config/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# IndexerExecutor config
#
path: "./test-checkpoints"
# IOTA Node Rest API URL
remote_store_url: "http://localhost:9000/api/v1"

# DynamoDbProgressStore config
#
progress_store:
aws_access_key_id: "test"
aws_secret_access_key: "test"
aws_region: "us-east-1"
# DynamoDB table name
table_name: "checkpoint-progress"

# Workers Configs
#
tasks:
# Task unique name
- name: "local-blob-storage"
# Number of workers will process the checkpoints in parallel
concurrency: 1
# Task type
blob:
# S3 bucket where checkpoints will be stored
url: "s3://checkpoints"
# S3 client timeout, a large timeout is needed to accommodate the checkpoints which could have the size of several GBs
#
request_timeout_secs: 180
# Checkpoint upload chunk size (in MB) that determines the upload strategy:
#
# If checkpoint size < checkpoint_chunk_size_mb:
# - Uploads checkpoint using single PUT operation
# - Optimal for smaller checkpoints
#
# If checkpoint size >= checkpoint_chunk_size_mb:
# - Divides checkpoint into chunks of this size
# - Uploads chunks as multipart
# - Storage service concatenates parts on completion
#
# Example with 50MB chunk size:
# 200MB checkpoint:
# - Splits into 4 parts (50MB each)
# - Multipart upload of each part
# - Parts merged on remote storage
#
# 40MB checkpoint:
# - Single PUT upload
# - No chunking needed
#
# Minimum allowed chunk size is 5MB
#
checkpoint_chunk_size_mb: 100
# AWS S3 client config options
remote_store_options:
- ["access_key_id", "test"]
- ["secret_access_key", "test"]
- ["region", "us-east-1"]
# Only needed if using Localstack for local development purposes, it's preferred to be removed
- ["endpoint_url", "http://localhost:4566"]
Loading