Skip to content

Commit

Permalink
feat: vacuum with concurrent requests (#1382)
Browse files Browse the repository at this point in the history
# Description

We don't yet have batch deletes in object store (but will soon). In the
meantime, we can at least issue multiple requests in parallel. Set the
default at 10. I think that should be reasonable for now; later we can
optimize to try to find the right rate to avoid rate-limiting.

# Related Issue(s)

* helps with #1366

# Documentation

<!---
Share links to useful documentation
--->
  • Loading branch information
wjones127 authored May 21, 2023
1 parent 8fd0af8 commit 5d8f20c
Showing 1 changed file with 25 additions and 10 deletions.
35 changes: 25 additions & 10 deletions rust/src/operations/vacuum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::table_state::DeltaTableState;
use crate::{DeltaDataTypeLong, DeltaResult, DeltaTable, DeltaTableError};
use chrono::{Duration, Utc};
use futures::future::BoxFuture;
use futures::StreamExt;
use futures::{StreamExt, TryStreamExt};
use object_store::{path::Path, ObjectStore};
use std::collections::HashSet;
use std::fmt::Debug;
Expand Down Expand Up @@ -77,6 +77,8 @@ pub struct VacuumBuilder {
retention_period: Option<Duration>,
/// Validate the retention period is not below the retention period configured in the table
enforce_retention_duration: bool,
/// Maximum number of concurrent requests to make
max_concurrent_requests: usize,
/// Don't delete the files. Just determine which files can be deleted
dry_run: bool,
/// Override the source of time
Expand All @@ -101,6 +103,7 @@ impl VacuumBuilder {
store,
retention_period: None,
enforce_retention_duration: true,
max_concurrent_requests: 10,
dry_run: false,
clock: None,
}
Expand All @@ -124,6 +127,12 @@ impl VacuumBuilder {
self
}

/// Set the maximum number of concurrent requests to make
pub fn with_max_concurrent_requests(mut self, max_concurrent_requests: usize) -> Self {
self.max_concurrent_requests = max_concurrent_requests;
self
}

/// add a time source for testing
#[doc(hidden)]
pub fn with_clock(mut self, clock: Arc<dyn Clock>) -> Self {
Expand Down Expand Up @@ -174,7 +183,10 @@ impl VacuumBuilder {
files_to_delete.push(obj_meta.location);
}

Ok(VacuumPlan { files_to_delete })
Ok(VacuumPlan {
files_to_delete,
max_concurrent_requests: self.max_concurrent_requests,
})
}
}

Expand Down Expand Up @@ -210,6 +222,8 @@ impl std::future::IntoFuture for VacuumBuilder {
struct VacuumPlan {
/// What files are to be deleted
pub files_to_delete: Vec<Path>,
/// How many concurrent requests to make
pub max_concurrent_requests: usize,
}

impl VacuumPlan {
Expand All @@ -222,14 +236,15 @@ impl VacuumPlan {
});
}

// Delete the files
let files_deleted = match store.delete_batch(&self.files_to_delete).await {
Ok(_) => Ok(self.files_to_delete),
Err(err) => Err(err),
}?
.into_iter()
.map(|file| file.to_string())
.collect();
let files_deleted = futures::stream::iter(self.files_to_delete)
.map(|path| {
let store = store.clone();
async move { store.delete(&path).await.map(|_| path) }
})
.buffer_unordered(self.max_concurrent_requests)
.map_ok(|path| path.to_string())
.try_collect::<Vec<_>>()
.await?;

Ok(VacuumMetrics {
files_deleted,
Expand Down

0 comments on commit 5d8f20c

Please sign in to comment.