Skip to content

Commit

Permalink
Merge branch 'delta-io:main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
ryanaston authored Nov 5, 2023
2 parents 068fa08 + a5e2e3b commit 9b9f8ed
Show file tree
Hide file tree
Showing 109 changed files with 1,284 additions and 239 deletions.
3 changes: 2 additions & 1 deletion .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
rust/ @wjones127 @roeap @rtyler
crates/ @wjones127 @roeap @rtyler
delta-inspect/ @wjones127 @rtyler
proofs/ @houqp
python/ @wjones127 @fvaleye @roeap
tlaplus/ @houqp
Expand Down
64 changes: 64 additions & 0 deletions .github/workflows/docs.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
name: Build documentation

on:
pull_request:
paths:
- python/**
- docs/**
- mkdocs.yml
- .github/workflows/docs.yml
jobs:
markdown-link-check:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: gaurav-nelson/github-action-markdown-link-check@v1
with:
config-file: docs/mlc-config.json
folder-path: docs

lint:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: psf/black@stable
with:
src: docs/src/python

build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3

- name: Install Rust
uses: actions-rs/toolchain@v1
with:
toolchain: stable
override: true
components: rustfmt, clippy

- uses: Swatinem/rust-cache@v2

- name: Set up Python
uses: actions/setup-python@v3
with:
python-version: '3.10'

- name: Build and install deltalake
run: |
cd python
pip install virtualenv
virtualenv venv
source venv/bin/activate
make develop
cd ..
- name: Install dependencies
run: |
source python/venv/bin/activate
pip install -r docs/requirements.txt
- name: Build documentation
run: |
source python/venv/bin/activate
mkdocs build
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,5 @@ Cargo.lock
!/proofs/Cargo.lock

justfile
site
site
__pycache__
111 changes: 91 additions & 20 deletions crates/deltalake-core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -586,8 +586,14 @@ impl<'a> DeltaScanBuilder<'a> {
// However we may want to do some additional balancing in case we are far off from the above.
let mut file_groups: HashMap<Vec<ScalarValue>, Vec<PartitionedFile>> = HashMap::new();

let table_partition_cols = &self
.snapshot
.current_metadata()
.ok_or(DeltaTableError::NoMetadata)?
.partition_columns;

for action in files.iter() {
let mut part = partitioned_file_from_action(action, &schema);
let mut part = partitioned_file_from_action(action, table_partition_cols, &schema);

if config.file_column_name.is_some() {
part.partition_values
Expand All @@ -602,13 +608,6 @@ impl<'a> DeltaScanBuilder<'a> {
.push(part);
}

let table_partition_cols = self
.snapshot
.current_metadata()
.ok_or(DeltaTableError::NoMetadata)?
.partition_columns
.clone();

let file_schema = Arc::new(ArrowSchema::new(
schema
.fields()
Expand Down Expand Up @@ -923,20 +922,30 @@ pub(crate) fn get_null_of_arrow_type(t: &ArrowDataType) -> DeltaResult<ScalarVal

pub(crate) fn partitioned_file_from_action(
action: &protocol::Add,
partition_columns: &[String],
schema: &ArrowSchema,
) -> PartitionedFile {
let partition_values = schema
.fields()
let partition_values = partition_columns
.iter()
.filter_map(|f| {
action.partition_values.get(f.name()).map(|val| match val {
Some(value) => to_correct_scalar_value(
&serde_json::Value::String(value.to_string()),
f.data_type(),
)
.unwrap_or(ScalarValue::Null),
None => get_null_of_arrow_type(f.data_type()).unwrap_or(ScalarValue::Null),
})
.map(|part| {
action
.partition_values
.get(part)
.map(|val| {
schema
.field_with_name(part)
.map(|field| match val {
Some(value) => to_correct_scalar_value(
&serde_json::Value::String(value.to_string()),
field.data_type(),
)
.unwrap_or(ScalarValue::Null),
None => get_null_of_arrow_type(field.data_type())
.unwrap_or(ScalarValue::Null),
})
.unwrap_or(ScalarValue::Null)
})
.unwrap_or(ScalarValue::Null)
})
.collect::<Vec<_>>();

Expand Down Expand Up @@ -1618,6 +1627,7 @@ pub async fn find_files<'a>(

#[cfg(test)]
mod tests {
use crate::writer::test_utils::get_delta_schema;
use arrow::array::StructArray;
use arrow::datatypes::{DataType, Field, Schema};
use chrono::{TimeZone, Utc};
Expand Down Expand Up @@ -1797,7 +1807,8 @@ mod tests {
Field::new("month", ArrowDataType::Int64, true),
]);

let file = partitioned_file_from_action(&action, &schema);
let part_columns = vec!["year".to_string(), "month".to_string()];
let file = partitioned_file_from_action(&action, &part_columns, &schema);
let ref_file = PartitionedFile {
object_meta: object_store::ObjectMeta {
location: Path::from("year=2015/month=1/part-00000-4dcb50d3-d017-450c-9df7-a7257dbd3c5d-c000.snappy.parquet".to_string()),
Expand Down Expand Up @@ -1929,4 +1940,64 @@ mod tests {
];
assert_batches_sorted_eq!(&expected, &actual);
}

#[tokio::test]
async fn delta_scan_mixed_partition_order() {
// Tests issue (1787) where partition columns were incorrect when they
// have a different order in the metadata and table schema
let schema = Arc::new(ArrowSchema::new(vec![
Field::new("modified", DataType::Utf8, true),
Field::new("id", DataType::Utf8, true),
Field::new("value", DataType::Int32, true),
]));

let table = crate::DeltaOps::new_in_memory()
.create()
.with_columns(get_delta_schema().get_fields().clone())
.with_partition_columns(["modified", "id"])
.await
.unwrap();
assert_eq!(table.version(), 0);

let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(arrow::array::StringArray::from(vec![
"2021-02-01",
"2021-02-01",
"2021-02-02",
"2021-02-02",
])),
Arc::new(arrow::array::StringArray::from(vec!["A", "B", "C", "D"])),
Arc::new(arrow::array::Int32Array::from(vec![1, 10, 20, 100])),
],
)
.unwrap();
// write some data
let table = crate::DeltaOps(table)
.write(vec![batch.clone()])
.with_save_mode(crate::protocol::SaveMode::Append)
.await
.unwrap();

let config = DeltaScanConfigBuilder::new().build(&table.state).unwrap();

let provider = DeltaTableProvider::try_new(table.state, table.storage, config).unwrap();
let ctx = SessionContext::new();
ctx.register_table("test", Arc::new(provider)).unwrap();

let df = ctx.sql("select * from test").await.unwrap();
let actual = df.collect().await.unwrap();
let expected = vec![
"+-------+------------+----+",
"| value | modified | id |",
"+-------+------------+----+",
"| 1 | 2021-02-01 | A |",
"| 10 | 2021-02-01 | B |",
"| 100 | 2021-02-02 | D |",
"| 20 | 2021-02-02 | C |",
"+-------+------------+----+",
];
assert_batches_sorted_eq!(&expected, &actual);
}
}
4 changes: 4 additions & 0 deletions crates/deltalake-core/src/operations/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,8 @@ impl CreateBuilder {
.unwrap_or_else(|| Protocol {
min_reader_version: MAX_SUPPORTED_READER_VERSION,
min_writer_version: MAX_SUPPORTED_WRITER_VERSION,
writer_features: None,
reader_features: None,
});

let metadata = DeltaTableMetaData::new(
Expand Down Expand Up @@ -399,6 +401,8 @@ mod tests {
let protocol = Protocol {
min_reader_version: 0,
min_writer_version: 0,
writer_features: None,
reader_features: None,
};
let table = CreateBuilder::new()
.with_location("memory://")
Expand Down
2 changes: 1 addition & 1 deletion crates/deltalake-core/src/operations/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,9 +315,9 @@ mod tests {
use crate::operations::DeltaOps;
use crate::protocol::*;
use crate::writer::test_utils::datafusion::get_data;
use crate::writer::test_utils::datafusion::write_batch;
use crate::writer::test_utils::{
get_arrow_schema, get_delta_schema, get_record_batch, setup_table_with_configuration,
write_batch,
};
use crate::DeltaConfigKey;
use crate::DeltaTable;
Expand Down
1 change: 0 additions & 1 deletion crates/deltalake-core/src/operations/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1432,7 +1432,6 @@ pub(super) mod zorder {
assert_eq!(result.null_count(), 0);

let data: &BinaryArray = as_generic_binary_array(result.as_ref());
dbg!(data);
assert_eq!(data.value_data().len(), 3 * 16 * 3);
assert!(data.iter().all(|x| x.unwrap().len() == 3 * 16));
}
Expand Down
12 changes: 12 additions & 0 deletions crates/deltalake-core/src/operations/restore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,16 @@ async fn execute(
Protocol {
min_reader_version: table.get_min_reader_version(),
min_writer_version: table.get_min_writer_version(),
writer_features: if snapshot.min_writer_version() < 7 {
None
} else {
table.get_writer_features().cloned()
},
reader_features: if snapshot.min_reader_version() < 3 {
None
} else {
table.get_reader_features().cloned()
},
}
} else {
Protocol {
Expand All @@ -216,6 +226,8 @@ async fn execute(
table.get_min_writer_version(),
snapshot.min_writer_version(),
),
writer_features: snapshot.writer_features().cloned(),
reader_features: snapshot.reader_features().cloned(),
}
};
actions.push(Action::protocol(protocol));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ pub fn create_protocol_action(max_reader: Option<i32>, max_writer: Option<i32>)
let protocol = Protocol {
min_reader_version: max_reader.unwrap_or(crate::operations::MAX_SUPPORTED_READER_VERSION),
min_writer_version: max_writer.unwrap_or(crate::operations::MAX_SUPPORTED_WRITER_VERSION),
writer_features: None,
reader_features: None,
};
Action::protocol(protocol)
}
Expand Down Expand Up @@ -134,6 +136,8 @@ pub async fn create_initialized_table(
protocol: Protocol {
min_reader_version: 1,
min_writer_version: 1,
writer_features: None,
reader_features: None,
},
metadata: DeltaTableMetaData::new(
None,
Expand Down
2 changes: 1 addition & 1 deletion crates/deltalake-core/src/operations/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -461,9 +461,9 @@ impl std::future::IntoFuture for UpdateBuilder {
mod tests {
use crate::operations::DeltaOps;
use crate::writer::test_utils::datafusion::get_data;
use crate::writer::test_utils::datafusion::write_batch;
use crate::writer::test_utils::{
get_arrow_schema, get_delta_schema, get_record_batch, setup_table_with_configuration,
write_batch,
};
use crate::DeltaConfigKey;
use crate::DeltaTable;
Expand Down
3 changes: 2 additions & 1 deletion crates/deltalake-core/src/operations/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -565,9 +565,10 @@ mod tests {
use crate::operations::{collect_sendable_stream, DeltaOps};
use crate::protocol::SaveMode;
use crate::writer::test_utils::datafusion::get_data;
use crate::writer::test_utils::datafusion::write_batch;
use crate::writer::test_utils::{
get_delta_schema, get_delta_schema_with_nested_struct, get_record_batch,
get_record_batch_with_nested_struct, setup_table_with_configuration, write_batch,
get_record_batch_with_nested_struct, setup_table_with_configuration,
};
use crate::DeltaConfigKey;
use arrow::datatypes::Field;
Expand Down
2 changes: 2 additions & 0 deletions crates/deltalake-core/src/protocol/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,8 @@ fn parquet_bytes_from_state(
let jsons = std::iter::once(Action::protocol(Protocol {
min_reader_version: state.min_reader_version(),
min_writer_version: state.min_writer_version(),
writer_features: None,
reader_features: None,
}))
// metaData
.chain(std::iter::once(Action::metaData(MetaData::try_from(
Expand Down
Loading

0 comments on commit 9b9f8ed

Please sign in to comment.