Skip to content

Commit

Permalink
feat: add metadata for operations::write::WriteBuilder (delta-io#1584)
Browse files Browse the repository at this point in the history
  • Loading branch information
abhimanyusinghgaur authored and polynomialherder committed Aug 15, 2023
1 parent 9f17a19 commit 838e707
Showing 1 changed file with 59 additions and 7 deletions.
66 changes: 59 additions & 7 deletions rust/src/operations/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use datafusion::physical_plan::{memory::MemoryExec, ExecutionPlan};
use futures::future::BoxFuture;
use futures::StreamExt;
use parquet::file::properties::WriterProperties;
use serde_json::Map;

use super::writer::{DeltaWriter, WriterConfig};
use super::MAX_SUPPORTED_WRITER_VERSION;
Expand Down Expand Up @@ -101,6 +102,8 @@ pub struct WriteBuilder {
safe_cast: bool,
/// Parquet writer properties
writer_properties: Option<WriterProperties>,
/// Additional metadata to be added to commit
app_metadata: Option<Map<String, serde_json::Value>>,
}

impl WriteBuilder {
Expand All @@ -119,6 +122,7 @@ impl WriteBuilder {
batches: None,
safe_cast: false,
writer_properties: None,
app_metadata: None,
}
}

Expand Down Expand Up @@ -187,6 +191,15 @@ impl WriteBuilder {
self
}

/// Additional metadata to be added to commit info
pub fn with_metadata(
mut self,
metadata: impl IntoIterator<Item = (String, serde_json::Value)>,
) -> Self {
self.app_metadata = Some(Map::from_iter(metadata));
self
}

async fn check_preconditions(&self) -> DeltaResult<Vec<Action>> {
match self.store.is_delta_table_location().await? {
true => {
Expand Down Expand Up @@ -455,8 +468,7 @@ impl std::future::IntoFuture for WriteBuilder {
predicate: this.predicate,
},
&this.snapshot,
// TODO pass through metadata
None,
this.app_metadata,
)
.await?;

Expand Down Expand Up @@ -550,7 +562,7 @@ mod tests {
use arrow_array::{Int32Array, StringArray, TimestampMicrosecondArray};
use arrow_schema::{DataType, TimeUnit};
use datafusion::assert_batches_sorted_eq;
use serde_json::json;
use serde_json::{json, Value};

#[tokio::test]
async fn test_create_write() {
Expand All @@ -563,33 +575,73 @@ mod tests {
.await
.unwrap();
assert_eq!(table.version(), 0);
assert_eq!(table.state.commit_infos().len(), 1);

// write some data
let table = DeltaOps(table)
let metadata = Map::from_iter(vec![("k1".to_string(), json!("v1.1"))]);
let mut table = DeltaOps(table)
.write(vec![batch.clone()])
.with_save_mode(SaveMode::Append)
.with_metadata(metadata.clone())
.await
.unwrap();
assert_eq!(table.version(), 1);
assert_eq!(table.get_file_uris().count(), 1);
table.load().await.unwrap();
assert_eq!(table.state.commit_infos().len(), 2);
assert_eq!(
table.state.commit_infos()[1]
.info
.clone()
.into_iter()
.filter(|(k, _)| k != "clientVersion")
.collect::<Map<String, Value>>(),
metadata
);

// append some data
let table = DeltaOps(table)
let metadata: Map<String, Value> = Map::from_iter(vec![("k1".to_string(), json!("v1.2"))]);
let mut table = DeltaOps(table)
.write(vec![batch.clone()])
.with_save_mode(SaveMode::Append)
.with_metadata(metadata.clone())
.await
.unwrap();
assert_eq!(table.version(), 2);
assert_eq!(table.get_file_uris().count(), 2);
table.load().await.unwrap();
assert_eq!(table.state.commit_infos().len(), 3);
assert_eq!(
table.state.commit_infos()[2]
.info
.clone()
.into_iter()
.filter(|(k, _)| k != "clientVersion")
.collect::<Map<String, Value>>(),
metadata
);

// overwrite table
let table = DeltaOps(table)
let metadata: Map<String, Value> = Map::from_iter(vec![("k2".to_string(), json!("v2.1"))]);
let mut table = DeltaOps(table)
.write(vec![batch])
.with_save_mode(SaveMode::Overwrite)
.with_metadata(metadata.clone())
.await
.unwrap();
assert_eq!(table.version(), 3);
assert_eq!(table.get_file_uris().count(), 1)
assert_eq!(table.get_file_uris().count(), 1);
table.load().await.unwrap();
assert_eq!(table.state.commit_infos().len(), 4);
assert_eq!(
table.state.commit_infos()[3]
.info
.clone()
.into_iter()
.filter(|(k, _)| k != "clientVersion")
.collect::<Map<String, Value>>(),
metadata
);
}

#[tokio::test]
Expand Down

0 comments on commit 838e707

Please sign in to comment.