Skip to content

Commit

Permalink
add merge into empty table test
Browse files Browse the repository at this point in the history
  • Loading branch information
Blajda committed Nov 19, 2023
1 parent 86d9b5e commit 8b33ecb
Showing 1 changed file with 97 additions and 2 deletions.
99 changes: 97 additions & 2 deletions crates/deltalake-core/src/operations/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,15 @@ const TARGET_DELETE_COLUMN: &str = "__delta_rs_target_delete";
const TARGET_COPY_COLUMN: &str = "__delta_rs_target_copy";

const SOURCE_COUNT_METRIC: &str = "num_source_rows";
const TARGET_COUNT_METRIC: &str = "num_target_rows";
const TARGET_COPY_METRIC: &str = "num_copied_rows";
const TARGET_INSERTED_METRIC: &str = "num_target_inserted_rows";
const TARGET_UPDATED_METRIC: &str = "num_target_updated_rows";
const TARGET_DELETED_METRIC: &str = "num_target_deleted_rows";

const SOURCE_COUNT_ID: &str = "merge_source_count";
const TARGET_COUNT_ID: &str = "merge_target_count";
const OUTPUT_COUNT_ID: &str = "merge_output_count";

/// Merge records into a Delta Table.
pub struct MergeBuilder {
Expand Down Expand Up @@ -591,6 +593,18 @@ impl ExtensionPlanner for MergeMetricExtensionPlanner {
return Ok(Some(MetricObserverExec::try_new(
TARGET_COUNT_ID.into(),
physical_inputs,
|batch, metrics| {
MetricBuilder::new(metrics)
.global_counter(TARGET_COUNT_METRIC)
.add(batch.num_rows());
},
)?));
}

if metric_observer.id.eq(OUTPUT_COUNT_ID) {
return Ok(Some(MetricObserverExec::try_new(
OUTPUT_COUNT_ID.into(),
physical_inputs,
|batch, metrics| {
MetricBuilder::new(metrics)
.global_counter(TARGET_INSERTED_METRIC)
Expand Down Expand Up @@ -698,6 +712,14 @@ async fn execute(
let target_provider = provider_as_source(target_provider);

let target = LogicalPlanBuilder::scan(target_name, target_provider, None)?.build()?;

// TODO: This is here to prevent predicate pushdowns. In the future we can replace this node to allow pushdowns depending on which operations are being used.
let target = LogicalPlan::Extension(Extension {
node: Arc::new(MetricObserver {
id: TARGET_COUNT_ID.into(),
input: target,
}),
});
let target = DataFrame::new(state.clone(), target);
let target = target.with_column(TARGET_COLUMN, lit(true))?;

Expand Down Expand Up @@ -960,7 +982,7 @@ async fn execute(
let new_columns = new_columns.into_optimized_plan()?;
let operation_count = LogicalPlan::Extension(Extension {
node: Arc::new(MetricObserver {
id: TARGET_COUNT_ID.into(),
id: OUTPUT_COUNT_ID.into(),
input: new_columns,
}),
});
Expand All @@ -976,7 +998,7 @@ async fn execute(

let err = || DeltaTableError::Generic("Unable to locate expected metric node".into());
let source_count = find_metric_node(SOURCE_COUNT_ID, &write).ok_or_else(err)?;
let op_count = find_metric_node(TARGET_COUNT_ID, &write).ok_or_else(err)?;
let op_count = find_metric_node(OUTPUT_COUNT_ID, &write).ok_or_else(err)?;

// write projected records
let table_partition_cols = current_metadata.partition_columns.clone();
Expand Down Expand Up @@ -1827,4 +1849,77 @@ mod tests {
let actual = get_data(&table).await;
assert_batches_sorted_eq!(&expected, &actual);
}

#[tokio::test]
async fn test_merge_empty_table() {
let schema = get_arrow_schema(&None);
let table = setup_table(Some(vec!["modified"])).await;

assert_eq!(table.version(), 0);
assert_eq!(table.get_file_uris().count(), 0);

let ctx = SessionContext::new();
let batch = RecordBatch::try_new(
Arc::clone(&schema),
vec![
Arc::new(arrow::array::StringArray::from(vec!["B", "C", "X"])),
Arc::new(arrow::array::Int32Array::from(vec![10, 20, 30])),
Arc::new(arrow::array::StringArray::from(vec![
"2021-02-02",
"2023-07-04",
"2023-07-04",
])),
],
)
.unwrap();
let source = ctx.read_batch(batch).unwrap();

let (table, metrics) = DeltaOps(table)
.merge(
source,
col("target.id")
.eq(col("source.id"))
.and(col("target.modified").eq(lit("2021-02-02"))),
)
.with_source_alias("source")
.with_target_alias("target")
.when_matched_update(|update| {
update
.update("value", col("source.value"))
.update("modified", col("source.modified"))
})
.unwrap()
.when_not_matched_insert(|insert| {
insert
.set("id", col("source.id"))
.set("value", col("source.value"))
.set("modified", col("source.modified"))
})
.unwrap()
.await
.unwrap();

assert_eq!(table.version(), 1);
assert!(table.get_file_uris().count() >= 2);
assert!(metrics.num_target_files_added >= 2);
assert_eq!(metrics.num_target_files_removed, 0);
assert_eq!(metrics.num_target_rows_copied, 0);
assert_eq!(metrics.num_target_rows_updated, 0);
assert_eq!(metrics.num_target_rows_inserted, 3);
assert_eq!(metrics.num_target_rows_deleted, 0);
assert_eq!(metrics.num_output_rows, 3);
assert_eq!(metrics.num_source_rows, 3);

let expected = vec![
"+----+-------+------------+",
"| id | value | modified |",
"+----+-------+------------+",
"| B | 10 | 2021-02-02 |",
"| C | 20 | 2023-07-04 |",
"| X | 30 | 2023-07-04 |",
"+----+-------+------------+",
];
let actual = get_data(&table).await;
assert_batches_sorted_eq!(&expected, &actual);
}
}

0 comments on commit 8b33ecb

Please sign in to comment.