Skip to content

Commit

Permalink
Save operational params in the same way with delta io (delta-io#1054)
Browse files Browse the repository at this point in the history
# Description
Currently writing "operationParameters" in commit info is misaligned
with delta io connector.


[Here](https://github.com/delta-io/delta/blob/36a7edb8cf507e713700ba827c5fb5ad32b9163e/core/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala#L695)
the sample of structure which is used in delta io.

So the goal of this PR is to align with delta io approach and the PR do
two thins: convert all values to string and delete keys with null
values.

# Related Issue(s)
Closes [issue delta-io#1017](delta-io#1017)

Co-authored-by: Ilya Moshkov <ilya.moshkov@exosfinancial.com>
  • Loading branch information
2 people authored and chitralverma committed Mar 17, 2023
1 parent abab3d4 commit a262444
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 2 deletions.
18 changes: 17 additions & 1 deletion rust/src/action/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -509,9 +509,25 @@ impl DeltaOperation {
);

if let Ok(serde_json::Value::Object(map)) = serde_json::to_value(self) {
let all_operation_fields = map.values().next().unwrap().as_object().unwrap();
let converted_operation_fields: Map<String, Value> = all_operation_fields
.iter()
.filter(|item| !item.1.is_null())
.map(|(k, v)| {
(
k.clone(),
serde_json::Value::String(if v.is_string() {
String::from(v.as_str().unwrap())
} else {
v.to_string()
}),
)
})
.collect();

commit_info.insert(
"operationParameters".to_string(),
map.values().next().unwrap().clone(),
serde_json::Value::Object(converted_operation_fields),
);
};

Expand Down
2 changes: 1 addition & 1 deletion rust/tests/command_optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ async fn test_commit_info() -> Result<(), Box<dyn Error>> {
assert_eq!(last_commit["readVersion"], json!(version));
assert_eq!(
last_commit["operationParameters"]["targetSize"],
json!(2_000_000)
json!("2000000")
);
// TODO: Requires a string representation for PartitionFilter
assert_eq!(last_commit["operationParameters"]["predicate"], Value::Null);
Expand Down
40 changes: 40 additions & 0 deletions rust/tests/commit_info_format.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#[allow(dead_code)]
mod fs_common;

use deltalake::action::{Action, DeltaOperation, SaveMode};

use serde_json::{json, Value};
use std::error::Error;

#[tokio::test]
async fn test_operational_parameters() -> Result<(), Box<dyn Error>> {
let path = "./tests/data/operational_parameters";
let mut table = fs_common::create_table(path, None).await;

let add = fs_common::add(0);

let operation = DeltaOperation::Write {
mode: SaveMode::Append,
partition_by: Some(vec!["some_partition".to_string()]),
predicate: None,
};

let mut tx = table.create_transaction(None);
let actions = vec![Action::add(add.clone())];
tx.add_actions(actions);
tx.commit(Some(operation), None).await.unwrap();

let commit_info = table.history(None).await?;
let last_commit = &commit_info[commit_info.len() - 1];

assert_eq!(last_commit["operationParameters"]["mode"], json!("Append"));

assert_eq!(
last_commit["operationParameters"]["partitionBy"],
json!("[\"some_partition\"]")
);

assert_eq!(last_commit["operationParameters"]["predicate"], Value::Null);

Ok(())
}

0 comments on commit a262444

Please sign in to comment.