Skip to content

Commit

Permalink
return adds from writer, expose checkpoint fields
Browse files Browse the repository at this point in the history
  • Loading branch information
Ryan Aston committed Oct 11, 2023
1 parent 9d1857d commit 99c9175
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 6 deletions.
4 changes: 2 additions & 2 deletions rust/examples/recordbatch-writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,11 @@ async fn main() -> Result<(), DeltaTableError> {

writer.write(batch).await?;

let adds = writer
let (version, _adds) = writer
.flush_and_commit(&mut table)
.await
.expect("Failed to flush write");
info!("{} adds written", adds);
info!("{} adds written", version);

Ok(())
}
Expand Down
17 changes: 17 additions & 0 deletions rust/src/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,18 @@ pub struct CheckPoint {
pub(crate) num_of_add_files: Option<i64>,
}

impl CheckPoint {
/// Table version at checkpoint
pub fn version(&self) -> i64 {
self.version
}

/// Table size in bytes at checkpoint
pub fn size_in_bytes(&self) -> Option<i64> {
self.size_in_bytes
}
}

/// Builder for CheckPoint
pub struct CheckPointBuilder {
/// Delta table version
Expand Down Expand Up @@ -355,6 +367,11 @@ impl DeltaTable {
self.storage.root_uri()
}

/// The lastest checkpoint for the table
pub fn last_checkpoint(&self) -> Option<CheckPoint> {
self.last_check_point
}

/// Return the list of paths of given checkpoint.
pub fn get_checkpoint_data_paths(&self, check_point: &CheckPoint) -> Vec<Path> {
let checkpoint_prefix = format!("{:020}", check_point.version);
Expand Down
20 changes: 16 additions & 4 deletions rust/src/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,21 +132,33 @@ pub trait DeltaWriter<T> {

/// Flush the internal write buffers to files in the delta table folder structure.
/// and commit the changes to the Delta log, creating a new table version.
async fn flush_and_commit(&mut self, table: &mut DeltaTable) -> Result<i64, DeltaTableError> {
let adds: Vec<_> = self.flush().await?.drain(..).map(Action::add).collect();
async fn flush_and_commit(
&mut self,
table: &mut DeltaTable,
) -> Result<(i64, Vec<Add>), DeltaTableError> {
let adds = self.flush().await?;
let partition_cols = table.get_metadata()?.partition_columns.clone();
let partition_by = if !partition_cols.is_empty() {
Some(partition_cols)
} else {
None
};
let actions = adds.iter().map(|add| Action::add(add.clone())).collect();
let operation = DeltaOperation::Write {
mode: SaveMode::Append,
partition_by,
predicate: None,
};
let version = commit(table.storage.as_ref(), &adds, operation, &table.state, None).await?;

let version = commit(
table.storage.as_ref(),
&actions,
operation,
&table.state,
None,
)
.await?;
table.update().await?;
Ok(version)
Ok((version, adds))
}
}

0 comments on commit 99c9175

Please sign in to comment.