Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: typed commit info #1207

Merged
merged 5 commits into from
Mar 7, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
202 changes: 173 additions & 29 deletions rust/src/action/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ mod parquet_read;
#[cfg(feature = "parquet2")]
pub mod parquet2_read;

use crate::{schema::*, DeltaTableError, DeltaTableMetaData};
use crate::delta_config::IsolationLevel;
use crate::{schema::*, DeltaResult, DeltaTableError, DeltaTableMetaData};
use percent_encoding::percent_decode;
use serde::{Deserialize, Serialize};
use serde_json::{Map, Value};
Expand Down Expand Up @@ -44,6 +45,13 @@ pub enum ActionError {
#[from]
source: parquet::errors::ParquetError,
},
/// Faild to serialize operation
#[error("Failed to serialize operation: {source}")]
SerializeOperation {
#[from]
/// The source error
source: serde_json::Error,
},
}

fn decode_path(raw_path: &str) -> Result<String, ActionError> {
Expand Down Expand Up @@ -435,7 +443,46 @@ pub struct Protocol {
pub min_writer_version: DeltaDataTypeInt,
}

type CommitInfo = Map<String, Value>;
/// The commitInfo is a fairly flexible action within the delta specification, where arbitrary data can be stored.
/// However the reference implementation as well as delta-rs store useful information that may for instance
/// allow us to be more permissive in commit conflict resolution.
#[derive(Serialize, Deserialize, Debug, Clone, Default, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct CommitInfo {
/// Version number the commit corresponds to
#[serde(skip_serializing_if = "Option::is_none")]
pub version: Option<DeltaDataTypeVersion>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this redundant?

Copy link
Collaborator Author

@roeap roeap Mar 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is, and nobody actually writes it to the commit info in the log. Initially I was following the spark implementation for the optimistic commits quite closely, where this field is being used. Not sure if we still do though. I'll remove it for now, and bring it back in case its needed in the follow up PR...

/// Timestamp in millis when the commit was created
#[serde(skip_serializing_if = "Option::is_none")]
pub timestamp: Option<DeltaDataTypeTimestamp>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As an aside, it would be cool if we swapped out DeltaDataTypeTimestamp for a type whose Debug / Display implementation is a formatted timestamp rather than an integer.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Absolutely, we may also be able to implement custom serde, so it is parsed as a datetime.

/// Id of the user invoking the commit
#[serde(skip_serializing_if = "Option::is_none")]
pub user_id: Option<String>,
/// Name of the user invoking the commit
#[serde(skip_serializing_if = "Option::is_none")]
pub user_name: Option<String>,
/// The operation performed during the
#[serde(skip_serializing_if = "Option::is_none")]
pub operation: Option<String>,
/// Parameters used for table operation
#[serde(skip_serializing_if = "Option::is_none")]
pub operation_parameters: Option<HashMap<String, serde_json::Value>>,
/// Version of the table when the operation was started
#[serde(skip_serializing_if = "Option::is_none")]
pub read_version: Option<i64>,
/// The isolation level of the commit
#[serde(skip_serializing_if = "Option::is_none")]
pub isolation_level: Option<IsolationLevel>,
/// TODO
#[serde(skip_serializing_if = "Option::is_none")]
pub is_blind_append: Option<bool>,
/// Delta engine which created the commit.
#[serde(skip_serializing_if = "Option::is_none")]
pub engine_info: Option<String>,
/// Additional provenance information for the commit
#[serde(flatten, default)]
pub info: Map<String, serde_json::Value>,
}

/// Represents an action in the Delta log. The Delta log is an aggregate of all actions performed
/// on the table, so the full list of actions is required to properly read a table.
Expand All @@ -459,6 +506,16 @@ pub enum Action {
commitInfo(CommitInfo),
}

impl Action {
/// Create a commit info from a map
pub fn commit_info(info: Map<String, serde_json::Value>) -> Self {
Self::commitInfo(CommitInfo {
info,
..Default::default()
})
}
}

/// Operation performed when creating a new log entry with one or more actions.
/// This is a key element of the `CommitInfo` action.
#[allow(clippy::large_enum_variant)]
Expand Down Expand Up @@ -517,45 +574,71 @@ pub enum DeltaOperation {
}

impl DeltaOperation {
/// Retrieve basic commit information to be added to Delta commits
pub fn get_commit_info(&self) -> Map<String, Value> {
let mut commit_info = Map::<String, Value>::new();
let operation = match &self {
DeltaOperation::Create { .. } => "delta-rs.Create",
DeltaOperation::Write { .. } => "delta-rs.Write",
DeltaOperation::StreamingUpdate { .. } => "delta-rs.StreamingUpdate",
DeltaOperation::Optimize { .. } => "delta-rs.Optimize",
DeltaOperation::FileSystemCheck { .. } => "delta-rs.FileSystemCheck",
};
commit_info.insert(
"operation".to_string(),
serde_json::Value::String(operation.into()),
);
/// A human readable name for the operation
pub fn name(&self) -> &str {
// operation names taken from https://learn.microsoft.com/en-us/azure/databricks/delta/history#--operation-metrics-keys
match &self {
DeltaOperation::Create { mode, .. } if matches!(mode, SaveMode::Overwrite) => {
"CREATE OR REPLACE TABLE"
}
DeltaOperation::Create { .. } => "CREATE TABLE",
DeltaOperation::Write { .. } => "WRITE",
DeltaOperation::StreamingUpdate { .. } => "STREAMING UPDATE",
DeltaOperation::Optimize { .. } => "OPTIMIZE",
DeltaOperation::FileSystemCheck { .. } => "FSCK",
}
}

if let Ok(serde_json::Value::Object(map)) = serde_json::to_value(self) {
let all_operation_fields = map.values().next().unwrap().as_object().unwrap();
let converted_operation_fields: Map<String, Value> = all_operation_fields
.iter()
/// Paraemters configured for operation.
roeap marked this conversation as resolved.
Show resolved Hide resolved
pub fn operation_parameters(&self) -> DeltaResult<impl Iterator<Item = (String, Value)>> {
// TODO remove unwrap
let serialized = serde_json::to_value(self)
.map_err(|err| ActionError::SerializeOperation { source: err })?;
if let serde_json::Value::Object(map) = serialized {
let all_operation_fields = map.values().next().unwrap().as_object().unwrap().clone();
roeap marked this conversation as resolved.
Show resolved Hide resolved
Ok(all_operation_fields
.into_iter()
.filter(|item| !item.1.is_null())
.map(|(k, v)| {
(
k.clone(),
k,
serde_json::Value::String(if v.is_string() {
String::from(v.as_str().unwrap())
} else {
v.to_string()
}),
)
})
.collect();
}))
} else {
Err(ActionError::Generic(
"operation parameetrs serialized into unexpected shape".into(),
roeap marked this conversation as resolved.
Show resolved Hide resolved
)
.into())
}
}

commit_info.insert(
"operationParameters".to_string(),
serde_json::Value::Object(converted_operation_fields),
);
};
/// Denotes if the operation changes the data contained in the table
pub fn changes_data(&self) -> bool {
!matches!(self, Self::Optimize { .. })
roeap marked this conversation as resolved.
Show resolved Hide resolved
}

/// Retrieve basic commit information to be added to Delta commits
pub fn get_commit_info(&self) -> CommitInfo {
// TODO infer additional info from operation parameters ...
CommitInfo {
operation: Some(self.name().into()),
operation_parameters: self.operation_parameters().ok().map(|iter| iter.collect()),
..Default::default()
}
}

commit_info
/// Get predicate expression applien when the operation reads data from the table.
roeap marked this conversation as resolved.
Show resolved Hide resolved
pub fn read_predicate(&self) -> Option<String> {
match self {
// TODO add more operations
Self::Write { predicate, .. } => predicate.clone(),
_ => None,
}
}
}

Expand Down Expand Up @@ -654,4 +737,65 @@ mod tests {
1
);
}

#[test]
fn test_read_commit_info() {
let raw = r#"
{
"timestamp": 1670892998177,
"operation": "WRITE",
"operationParameters": {
"mode": "Append",
"partitionBy": "[\"c1\",\"c2\"]"
},
"isolationLevel": "Serializable",
"isBlindAppend": true,
"operationMetrics": {
"numFiles": "3",
"numOutputRows": "3",
"numOutputBytes": "1356"
},
"engineInfo": "Apache-Spark/3.3.1 Delta-Lake/2.2.0",
"txnId": "046a258f-45e3-4657-b0bf-abfb0f76681c"
}"#;

let info = serde_json::from_str::<CommitInfo>(raw);
assert!(info.is_ok());

println!("{:?}", info);
roeap marked this conversation as resolved.
Show resolved Hide resolved

// assert that commit info has no required filelds
let raw = "{}";
let info = serde_json::from_str::<CommitInfo>(raw);
assert!(info.is_ok());

// arbitrary field data may be added to commit
let raw = r#"
{
"timestamp": 1670892998177,
"operation": "WRITE",
"operationParameters": {
"mode": "Append",
"partitionBy": "[\"c1\",\"c2\"]"
},
"isolationLevel": "Serializable",
"isBlindAppend": true,
"operationMetrics": {
"numFiles": "3",
"numOutputRows": "3",
"numOutputBytes": "1356"
},
"engineInfo": "Apache-Spark/3.3.1 Delta-Lake/2.2.0",
"txnId": "046a258f-45e3-4657-b0bf-abfb0f76681c",
"additionalField": "more data",
"additionalStruct": {
"key": "value",
"otherKey": 123
}
}"#;

let info = serde_json::from_str::<CommitInfo>(raw).expect("should parse");
assert!(info.info.contains_key("additionalField"));
assert!(info.info.contains_key("additionalStruct"));
}
}
2 changes: 1 addition & 1 deletion rust/src/action/parquet2_read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ impl ActionVariant for CommitInfo {
type Variant = CommitInfo;

fn default_action() -> Action {
Action::commitInfo(CommitInfo::new())
Action::commitInfo(CommitInfo::default())
}

fn try_mut_from_action(a: &mut Action) -> Result<&mut Self, ParseError> {
Expand Down
Loading