From 99c9175edeb43d546967b61edac5346ac000a810 Mon Sep 17 00:00:00 2001 From: Ryan Aston Date: Tue, 10 Oct 2023 22:41:45 -0500 Subject: [PATCH] return adds from writer, expose checkpoint fields --- rust/examples/recordbatch-writer.rs | 4 ++-- rust/src/delta.rs | 17 +++++++++++++++++ rust/src/writer/mod.rs | 20 ++++++++++++++++---- 3 files changed, 35 insertions(+), 6 deletions(-) diff --git a/rust/examples/recordbatch-writer.rs b/rust/examples/recordbatch-writer.rs index cca9c6e3fc..7ec347d39d 100644 --- a/rust/examples/recordbatch-writer.rs +++ b/rust/examples/recordbatch-writer.rs @@ -61,11 +61,11 @@ async fn main() -> Result<(), DeltaTableError> { writer.write(batch).await?; - let adds = writer + let (version, _adds) = writer .flush_and_commit(&mut table) .await .expect("Failed to flush write"); - info!("{} adds written", adds); + info!("{} adds written", version); Ok(()) } diff --git a/rust/src/delta.rs b/rust/src/delta.rs index e9872951ee..b5a5395084 100644 --- a/rust/src/delta.rs +++ b/rust/src/delta.rs @@ -48,6 +48,18 @@ pub struct CheckPoint { pub(crate) num_of_add_files: Option, } +impl CheckPoint { + /// Table version at checkpoint + pub fn version(&self) -> i64 { + self.version + } + + /// Table size in bytes at checkpoint + pub fn size_in_bytes(&self) -> Option { + self.size_in_bytes + } +} + /// Builder for CheckPoint pub struct CheckPointBuilder { /// Delta table version @@ -355,6 +367,11 @@ impl DeltaTable { self.storage.root_uri() } + /// The lastest checkpoint for the table + pub fn last_checkpoint(&self) -> Option { + self.last_check_point + } + /// Return the list of paths of given checkpoint. pub fn get_checkpoint_data_paths(&self, check_point: &CheckPoint) -> Vec { let checkpoint_prefix = format!("{:020}", check_point.version); diff --git a/rust/src/writer/mod.rs b/rust/src/writer/mod.rs index 5685a71d48..ada860fddc 100644 --- a/rust/src/writer/mod.rs +++ b/rust/src/writer/mod.rs @@ -132,21 +132,33 @@ pub trait DeltaWriter { /// Flush the internal write buffers to files in the delta table folder structure. /// and commit the changes to the Delta log, creating a new table version. - async fn flush_and_commit(&mut self, table: &mut DeltaTable) -> Result { - let adds: Vec<_> = self.flush().await?.drain(..).map(Action::add).collect(); + async fn flush_and_commit( + &mut self, + table: &mut DeltaTable, + ) -> Result<(i64, Vec), DeltaTableError> { + let adds = self.flush().await?; let partition_cols = table.get_metadata()?.partition_columns.clone(); let partition_by = if !partition_cols.is_empty() { Some(partition_cols) } else { None }; + let actions = adds.iter().map(|add| Action::add(add.clone())).collect(); let operation = DeltaOperation::Write { mode: SaveMode::Append, partition_by, predicate: None, }; - let version = commit(table.storage.as_ref(), &adds, operation, &table.state, None).await?; + + let version = commit( + table.storage.as_ref(), + &actions, + operation, + &table.state, + None, + ) + .await?; table.update().await?; - Ok(version) + Ok((version, adds)) } }