diff --git a/rust/src/operations/vacuum.rs b/rust/src/operations/vacuum.rs index a4eab1b317..ca253edbad 100644 --- a/rust/src/operations/vacuum.rs +++ b/rust/src/operations/vacuum.rs @@ -26,7 +26,7 @@ use crate::table_state::DeltaTableState; use crate::{DeltaDataTypeLong, DeltaResult, DeltaTable, DeltaTableError}; use chrono::{Duration, Utc}; use futures::future::BoxFuture; -use futures::StreamExt; +use futures::{StreamExt, TryStreamExt}; use object_store::{path::Path, ObjectStore}; use std::collections::HashSet; use std::fmt::Debug; @@ -77,6 +77,8 @@ pub struct VacuumBuilder { retention_period: Option, /// Validate the retention period is not below the retention period configured in the table enforce_retention_duration: bool, + /// Maximum number of concurrent requests to make + max_concurrent_requests: usize, /// Don't delete the files. Just determine which files can be deleted dry_run: bool, /// Override the source of time @@ -101,6 +103,7 @@ impl VacuumBuilder { store, retention_period: None, enforce_retention_duration: true, + max_concurrent_requests: 10, dry_run: false, clock: None, } @@ -124,6 +127,12 @@ impl VacuumBuilder { self } + /// Set the maximum number of concurrent requests to make + pub fn with_max_concurrent_requests(mut self, max_concurrent_requests: usize) -> Self { + self.max_concurrent_requests = max_concurrent_requests; + self + } + /// add a time source for testing #[doc(hidden)] pub fn with_clock(mut self, clock: Arc) -> Self { @@ -174,7 +183,10 @@ impl VacuumBuilder { files_to_delete.push(obj_meta.location); } - Ok(VacuumPlan { files_to_delete }) + Ok(VacuumPlan { + files_to_delete, + max_concurrent_requests: self.max_concurrent_requests, + }) } } @@ -210,6 +222,8 @@ impl std::future::IntoFuture for VacuumBuilder { struct VacuumPlan { /// What files are to be deleted pub files_to_delete: Vec, + /// How many concurrent requests to make + pub max_concurrent_requests: usize, } impl VacuumPlan { @@ -222,14 +236,15 @@ impl VacuumPlan { }); } - // Delete the files - let files_deleted = match store.delete_batch(&self.files_to_delete).await { - Ok(_) => Ok(self.files_to_delete), - Err(err) => Err(err), - }? - .into_iter() - .map(|file| file.to_string()) - .collect(); + let files_deleted = futures::stream::iter(self.files_to_delete) + .map(|path| { + let store = store.clone(); + async move { store.delete(&path).await.map(|_| path) } + }) + .buffer_unordered(self.max_concurrent_requests) + .map_ok(|path| path.to_string()) + .try_collect::>() + .await?; Ok(VacuumMetrics { files_deleted,