diff --git a/arrow-row/src/dictionary.rs b/arrow-row/src/dictionary.rs index e332e11316fd..bacc116cade7 100644 --- a/arrow-row/src/dictionary.rs +++ b/arrow-row/src/dictionary.rs @@ -17,7 +17,7 @@ use crate::fixed::{FixedLengthEncoding, FromSlice}; use crate::interner::{Interned, OrderPreservingInterner}; -use crate::{null_sentinel, Rows}; +use crate::{null_sentinel, Row, Rows}; use arrow_array::builder::*; use arrow_array::cast::*; use arrow_array::types::*; @@ -56,6 +56,24 @@ pub fn compute_dictionary_mapping( } } +/// Encode dictionary values not preserving the dictionary encoding +pub fn encode_dictionary_values( + out: &mut Rows, + column: &DictionaryArray, + values: &Rows, + null: &Row<'_>, +) { + for (offset, k) in out.offsets.iter_mut().skip(1).zip(column.keys()) { + let row = match k { + Some(k) => values.row(k.as_usize()).data, + None => null.data, + }; + let end_offset = *offset + row.len(); + out.buffer[*offset..end_offset].copy_from_slice(row); + *offset = end_offset; + } +} + /// Dictionary types are encoded as /// /// - single `0_u8` if null diff --git a/arrow-row/src/lib.rs b/arrow-row/src/lib.rs index 2e489c974750..e4b02fbf230d 100644 --- a/arrow-row/src/lib.rs +++ b/arrow-row/src/lib.rs @@ -137,6 +137,7 @@ use arrow_schema::*; use crate::dictionary::{ compute_dictionary_mapping, decode_dictionary, encode_dictionary, + encode_dictionary_values, }; use crate::fixed::{decode_bool, decode_fixed_size_binary, decode_primitive}; use crate::interner::OrderPreservingInterner; @@ -426,7 +427,14 @@ enum Codec { /// No additional codec state is necessary Stateless, /// The interner used to encode dictionary values + /// + /// Used when preserving the dictionary encoding Dictionary(OrderPreservingInterner), + /// A row converter for the dictionary values + /// and the encoding of a row containing only nulls + /// + /// Used when not preserving dictionary encoding + DictionaryValues(RowConverter, OwnedRow), /// A row converter for the child fields /// and the encoding of a row containing only nulls Struct(RowConverter, OwnedRow), @@ -437,7 +445,25 @@ enum Codec { impl Codec { fn new(sort_field: &SortField) -> Result { match &sort_field.data_type { - DataType::Dictionary(_, _) => Ok(Self::Dictionary(Default::default())), + DataType::Dictionary(_, values) => match sort_field.preserve_dictionaries { + true => Ok(Self::Dictionary(Default::default())), + false => { + let sort_field = SortField::new_with_options( + values.as_ref().clone(), + sort_field.options, + ); + + let mut converter = RowConverter::new(vec![sort_field])?; + let null_array = new_null_array(values.as_ref(), 1); + let nulls = converter.convert_columns(&[null_array])?; + + let owned = OwnedRow { + data: nulls.buffer, + config: nulls.config, + }; + Ok(Self::DictionaryValues(converter, owned)) + } + }, d if !d.is_nested() => Ok(Self::Stateless), DataType::List(f) | DataType::LargeList(f) => { // The encoded contents will be inverted if descending is set to true @@ -501,6 +527,15 @@ impl Codec { Ok(Encoder::Dictionary(mapping)) } + Codec::DictionaryValues(converter, nulls) => { + let values = downcast_dictionary_array! { + array => array.values(), + _ => unreachable!() + }; + + let rows = converter.convert_columns(&[values.clone()])?; + Ok(Encoder::DictionaryValues(rows, nulls.row())) + } Codec::Struct(converter, null) => { let v = as_struct_array(array); let rows = converter.convert_columns(v.columns())?; @@ -522,6 +557,9 @@ impl Codec { match self { Codec::Stateless => 0, Codec::Dictionary(interner) => interner.size(), + Codec::DictionaryValues(converter, nulls) => { + converter.size() + nulls.data.len() + } Codec::Struct(converter, nulls) => converter.size() + nulls.data.len(), Codec::List(converter) => converter.size(), } @@ -534,6 +572,8 @@ enum Encoder<'a> { Stateless, /// The mapping from dictionary keys to normalized keys Dictionary(Vec>), + /// The encoding of the child array and the encoding of a null row + DictionaryValues(Rows, Row<'a>), /// The row encoding of the child arrays and the encoding of a null row /// /// It is necessary to encode to a temporary [`Rows`] to avoid serializing @@ -551,6 +591,8 @@ pub struct SortField { options: SortOptions, /// Data type data_type: DataType, + /// Preserve dictionaries + preserve_dictionaries: bool, } impl SortField { @@ -561,7 +603,30 @@ impl SortField { /// Create a new column with the given data type and [`SortOptions`] pub fn new_with_options(data_type: DataType, options: SortOptions) -> Self { - Self { options, data_type } + Self { + options, + data_type, + preserve_dictionaries: true, + } + } + + /// By default dictionaries are preserved as described on [`RowConverter`] + /// + /// However, this process requires maintaining and incrementally updating + /// an order-preserving mapping of dictionary values. This is relatively expensive + /// computationally but reduces the size of the encoded rows, minimising memory + /// usage and potentially yielding faster comparisons. + /// + /// Some applications may wish to instead trade-off space efficiency, for improved + /// encoding performance, by instead encoding dictionary values directly + /// + /// When `preserve_dictionaries` is true, fields will instead be encoded as their + /// underlying value, reversing any dictionary encoding + pub fn preserve_dictionaries(self, preserve_dictionaries: bool) -> Self { + Self { + preserve_dictionaries, + ..self + } } /// Return size of this instance in bytes. @@ -1045,6 +1110,19 @@ fn new_empty_rows(cols: &[ArrayRef], encoders: &[Encoder], config: RowConfig) -> _ => unreachable!(), } } + Encoder::DictionaryValues(values, null) => { + downcast_dictionary_array! { + array => { + for (v, length) in array.keys().iter().zip(lengths.iter_mut()) { + *length += match v { + Some(k) => values.row(k.as_usize()).data.len(), + None => null.data.len(), + } + } + } + _ => unreachable!(), + } + } Encoder::Struct(rows, null) => { let array = as_struct_array(array); lengths.iter_mut().enumerate().for_each(|(idx, length)| { @@ -1143,6 +1221,12 @@ fn encode_column( _ => unreachable!() } } + Encoder::DictionaryValues(values, nulls) => { + downcast_dictionary_array! { + column => encode_dictionary_values(out, column, values, nulls), + _ => unreachable!() + } + } Encoder::Struct(rows, null) => { let array = as_struct_array(column); let null_sentinel = null_sentinel(opts); @@ -1221,6 +1305,10 @@ unsafe fn decode_column( _ => unreachable!() } } + Codec::DictionaryValues(converter, _) => { + let cols = converter.convert_raw(rows, validate_utf8)?; + cols.into_iter().next().unwrap() + } Codec::Struct(converter, _) => { let (null_count, nulls) = fixed::decode_nulls(rows); rows.iter_mut().for_each(|row| *row = &row[1..]); @@ -1557,8 +1645,25 @@ mod tests { assert_eq!(&cols[0], &col); } + /// If `exact` is false performs a logical comparison between a and dictionary-encoded b + fn dictionary_eq(exact: bool, a: &dyn Array, b: &dyn Array) { + match b.data_type() { + DataType::Dictionary(_, v) if !exact => { + assert_eq!(a.data_type(), v.as_ref()); + let b = arrow_cast::cast(b, v).unwrap(); + assert_eq!(a.data(), b.data()) + } + _ => assert_eq!(a.data(), b.data()), + } + } + #[test] fn test_string_dictionary() { + test_string_dictionary_impl(false); + test_string_dictionary_impl(true); + } + + fn test_string_dictionary_impl(preserve: bool) { let a = Arc::new(DictionaryArray::::from_iter([ Some("foo"), Some("hello"), @@ -1570,8 +1675,8 @@ mod tests { Some("hello"), ])) as ArrayRef; - let mut converter = - RowConverter::new(vec![SortField::new(a.data_type().clone())]).unwrap(); + let field = SortField::new(a.data_type().clone()).preserve_dictionaries(preserve); + let mut converter = RowConverter::new(vec![field]).unwrap(); let rows_a = converter.convert_columns(&[Arc::clone(&a)]).unwrap(); assert!(rows_a.row(3) < rows_a.row(5)); @@ -1584,7 +1689,7 @@ mod tests { assert_eq!(rows_a.row(1), rows_a.row(7)); let cols = converter.convert_rows(&rows_a).unwrap(); - assert_eq!(&cols[0], &a); + dictionary_eq(preserve, &cols[0], &a); let b = Arc::new(DictionaryArray::::from_iter([ Some("hello"), @@ -1598,7 +1703,7 @@ mod tests { assert!(rows_b.row(2) < rows_a.row(0)); let cols = converter.convert_rows(&rows_b).unwrap(); - assert_eq!(&cols[0], &b); + dictionary_eq(preserve, &cols[0], &b); let mut converter = RowConverter::new(vec![SortField::new_with_options( a.data_type().clone(), @@ -1606,7 +1711,8 @@ mod tests { descending: true, nulls_first: false, }, - )]) + ) + .preserve_dictionaries(preserve)]) .unwrap(); let rows_c = converter.convert_columns(&[Arc::clone(&a)]).unwrap(); @@ -1616,7 +1722,7 @@ mod tests { assert!(rows_c.row(3) > rows_c.row(0)); let cols = converter.convert_rows(&rows_c).unwrap(); - assert_eq!(&cols[0], &a); + dictionary_eq(preserve, &cols[0], &a); let mut converter = RowConverter::new(vec![SortField::new_with_options( a.data_type().clone(), @@ -1624,7 +1730,8 @@ mod tests { descending: true, nulls_first: true, }, - )]) + ) + .preserve_dictionaries(preserve)]) .unwrap(); let rows_c = converter.convert_columns(&[Arc::clone(&a)]).unwrap(); @@ -1634,7 +1741,7 @@ mod tests { assert!(rows_c.row(3) < rows_c.row(0)); let cols = converter.convert_rows(&rows_c).unwrap(); - assert_eq!(&cols[0], &a); + dictionary_eq(preserve, &cols[0], &a); } #[test] @@ -1694,15 +1801,19 @@ mod tests { builder.append(-1).unwrap(); let a = builder.finish(); - - let mut converter = - RowConverter::new(vec![SortField::new(a.data_type().clone())]).unwrap(); - let rows = converter.convert_columns(&[Arc::new(a)]).unwrap(); - assert!(rows.row(0) < rows.row(1)); - assert!(rows.row(2) < rows.row(0)); - assert!(rows.row(3) < rows.row(2)); - assert!(rows.row(6) < rows.row(2)); - assert!(rows.row(3) < rows.row(6)); + let data_type = a.data_type().clone(); + let columns = [Arc::new(a) as ArrayRef]; + + for preserve in [true, false] { + let field = SortField::new(data_type.clone()).preserve_dictionaries(preserve); + let mut converter = RowConverter::new(vec![field]).unwrap(); + let rows = converter.convert_columns(&columns).unwrap(); + assert!(rows.row(0) < rows.row(1)); + assert!(rows.row(2) < rows.row(0)); + assert!(rows.row(3) < rows.row(2)); + assert!(rows.row(6) < rows.row(2)); + assert!(rows.row(3) < rows.row(6)); + } } #[test] @@ -1722,15 +1833,17 @@ mod tests { .build() .unwrap(); - let mut converter = RowConverter::new(vec![SortField::new(data_type)]).unwrap(); - let rows = converter - .convert_columns(&[Arc::new(DictionaryArray::::from(data))]) - .unwrap(); + let columns = [Arc::new(DictionaryArray::::from(data)) as ArrayRef]; + for preserve in [true, false] { + let field = SortField::new(data_type.clone()).preserve_dictionaries(preserve); + let mut converter = RowConverter::new(vec![field]).unwrap(); + let rows = converter.convert_columns(&columns).unwrap(); - assert_eq!(rows.row(0), rows.row(1)); - assert_eq!(rows.row(3), rows.row(4)); - assert_eq!(rows.row(4), rows.row(5)); - assert!(rows.row(3) < rows.row(0)); + assert_eq!(rows.row(0), rows.row(1)); + assert_eq!(rows.row(3), rows.row(4)); + assert_eq!(rows.row(4), rows.row(5)); + assert!(rows.row(3) < rows.row(0)); + } } #[test] @@ -1974,6 +2087,35 @@ mod tests { test_nested_list::(); } + #[test] + fn test_dictionary_preserving() { + let mut dict = StringDictionaryBuilder::::new(); + dict.append_value("foo"); + dict.append_value("foo"); + dict.append_value("bar"); + dict.append_value("bar"); + dict.append_value("bar"); + dict.append_value("bar"); + + let array = Arc::new(dict.finish()) as ArrayRef; + let preserve = SortField::new(array.data_type().clone()); + let non_preserve = preserve.clone().preserve_dictionaries(false); + + let mut c1 = RowConverter::new(vec![preserve]).unwrap(); + let r1 = c1.convert_columns(&[array.clone()]).unwrap(); + + let mut c2 = RowConverter::new(vec![non_preserve]).unwrap(); + let r2 = c2.convert_columns(&[array.clone()]).unwrap(); + + for r in r1.iter() { + assert_eq!(r.data.len(), 3); + } + + for r in r2.iter() { + assert_eq!(r.data.len(), 34); + } + } + fn generate_primitive_array(len: usize, valid_percent: f64) -> PrimitiveArray where K: ArrowPrimitiveType, @@ -2129,12 +2271,18 @@ mod tests { }) .collect(); + let preserve: Vec<_> = (0..num_columns).map(|_| rng.gen_bool(0.5)).collect(); + let comparator = LexicographicalComparator::try_new(&sort_columns).unwrap(); let columns = options .into_iter() .zip(&arrays) - .map(|(o, a)| SortField::new_with_options(a.data_type().clone(), o)) + .zip(&preserve) + .map(|((o, a), p)| { + SortField::new_with_options(a.data_type().clone(), o) + .preserve_dictionaries(*p) + }) .collect(); let mut converter = RowConverter::new(columns).unwrap(); @@ -2160,9 +2308,9 @@ mod tests { } let back = converter.convert_rows(&rows).unwrap(); - for (actual, expected) in back.iter().zip(&arrays) { + for ((actual, expected), preserve) in back.iter().zip(&arrays).zip(preserve) { actual.data().validate_full().unwrap(); - assert_eq!(actual, expected) + dictionary_eq(preserve, actual, expected) } } } diff --git a/arrow/benches/row_format.rs b/arrow/benches/row_format.rs index 961cf07de721..12ce71764f7e 100644 --- a/arrow/benches/row_format.rs +++ b/arrow/benches/row_format.rs @@ -30,10 +30,18 @@ use arrow_array::Array; use criterion::{black_box, Criterion}; use std::sync::Arc; -fn do_bench(c: &mut Criterion, name: &str, cols: Vec) { +fn do_bench( + c: &mut Criterion, + name: &str, + cols: Vec, + preserve_dictionaries: bool, +) { let fields: Vec<_> = cols .iter() - .map(|x| SortField::new(x.data_type().clone())) + .map(|x| { + SortField::new(x.data_type().clone()) + .preserve_dictionaries(preserve_dictionaries) + }) .collect(); c.bench_function(&format!("convert_columns {name}"), |b| { @@ -57,42 +65,46 @@ fn do_bench(c: &mut Criterion, name: &str, cols: Vec) { fn row_bench(c: &mut Criterion) { let cols = vec![Arc::new(create_primitive_array::(4096, 0.)) as ArrayRef]; - do_bench(c, "4096 u64(0)", cols); + do_bench(c, "4096 u64(0)", cols, true); let cols = vec![Arc::new(create_primitive_array::(4096, 0.)) as ArrayRef]; - do_bench(c, "4096 i64(0)", cols); + do_bench(c, "4096 i64(0)", cols, true); let cols = vec![Arc::new(create_string_array_with_len::(4096, 0., 10)) as ArrayRef]; - do_bench(c, "4096 string(10, 0)", cols); + do_bench(c, "4096 string(10, 0)", cols, true); let cols = vec![Arc::new(create_string_array_with_len::(4096, 0., 30)) as ArrayRef]; - do_bench(c, "4096 string(30, 0)", cols); + do_bench(c, "4096 string(30, 0)", cols, true); let cols = vec![Arc::new(create_string_array_with_len::(4096, 0., 100)) as ArrayRef]; - do_bench(c, "4096 string(100, 0)", cols); + do_bench(c, "4096 string(100, 0)", cols, true); let cols = vec![Arc::new(create_string_array_with_len::(4096, 0.5, 100)) as ArrayRef]; - do_bench(c, "4096 string(100, 0.5)", cols); + do_bench(c, "4096 string(100, 0.5)", cols, true); let cols = vec![Arc::new(create_string_dict_array::(4096, 0., 10)) as ArrayRef]; - do_bench(c, "4096 string_dictionary(10, 0)", cols); + do_bench(c, "4096 string_dictionary(10, 0)", cols, true); let cols = vec![Arc::new(create_string_dict_array::(4096, 0., 30)) as ArrayRef]; - do_bench(c, "4096 string_dictionary(30, 0)", cols); + do_bench(c, "4096 string_dictionary(30, 0)", cols, true); let cols = vec![Arc::new(create_string_dict_array::(4096, 0., 100)) as ArrayRef]; - do_bench(c, "4096 string_dictionary(100, 0)", cols); + do_bench(c, "4096 string_dictionary(100, 0)", cols.clone(), true); + let name = "4096 string_dictionary_non_preserving(100, 0)"; + do_bench(c, name, cols, false); let cols = vec![Arc::new(create_string_dict_array::(4096, 0.5, 100)) as ArrayRef]; - do_bench(c, "4096 string_dictionary(100, 0.5)", cols); + do_bench(c, "4096 string_dictionary(100, 0.5)", cols.clone(), true); + let name = "4096 string_dictionary_non_preserving(100, 0.5)"; + do_bench(c, name, cols, false); let cols = vec![ Arc::new(create_string_array_with_len::(4096, 0.5, 20)) as ArrayRef, @@ -104,6 +116,7 @@ fn row_bench(c: &mut Criterion) { c, "4096 string(20, 0.5), string(30, 0), string(100, 0), i64(0)", cols, + false, ); let cols = vec![ @@ -112,7 +125,7 @@ fn row_bench(c: &mut Criterion) { Arc::new(create_string_dict_array::(4096, 0., 100)) as ArrayRef, Arc::new(create_primitive_array::(4096, 0.)) as ArrayRef, ]; - do_bench(c, "4096 4096 string_dictionary(20, 0.5), string_dictionary(30, 0), string_dictionary(100, 0), i64(0)", cols); + do_bench(c, "4096 4096 string_dictionary(20, 0.5), string_dictionary(30, 0), string_dictionary(100, 0), i64(0)", cols, false); } criterion_group!(benches, row_bench);