Skip to content

Commit

Permalink
Create re_arrow_util (#8689)
Browse files Browse the repository at this point in the history
### Related
* Part of #3741

### Details
Adds crate `re_arrow_util`.

Adds two traits for downcasting `arrow` and `arrow2` arrays in such a
way that we cannot accidentally cast one into another. This will be very
important for the arrow migration. It also makes the code shorter.
  • Loading branch information
emilk authored Jan 15, 2025
1 parent af42828 commit efee5ad
Show file tree
Hide file tree
Showing 48 changed files with 320 additions and 206 deletions.
1 change: 1 addition & 0 deletions ARCHITECTURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ Update instructions:
| Crate | Description |
|--------------------|--------------------------------------------------------------------------------------|
| re_analytics | Rerun's analytics SDK |
| re_arrow_util | Helpers for working with arrow |
| re_byte_size | Calculate the heap-allocated size of values at runtime |
| re_capabilities | Capability tokens |
| re_case | Case conversions, the way Rerun likes them |
Expand Down
23 changes: 23 additions & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -5610,6 +5610,17 @@ dependencies = [
"simdutf8",
]

[[package]]
name = "re_arrow_util"
version = "0.22.0-alpha.1+dev"
dependencies = [
"arrow",
"itertools 0.13.0",
"re_arrow2",
"re_log",
"re_tracing",
]

[[package]]
name = "re_blueprint_tree"
version = "0.22.0-alpha.1+dev"
Expand Down Expand Up @@ -5692,6 +5703,7 @@ dependencies = [
"nohash-hasher",
"rand",
"re_arrow2",
"re_arrow_util",
"re_byte_size",
"re_error",
"re_format",
Expand Down Expand Up @@ -5725,6 +5737,7 @@ dependencies = [
"parking_lot",
"rand",
"re_arrow2",
"re_arrow_util",
"re_byte_size",
"re_chunk",
"re_format",
Expand Down Expand Up @@ -5904,6 +5917,7 @@ dependencies = [
"nohash-hasher",
"rayon",
"re_arrow2",
"re_arrow_util",
"re_chunk",
"re_chunk_store",
"re_log",
Expand Down Expand Up @@ -6001,6 +6015,7 @@ dependencies = [
"arrow",
"comfy-table",
"itertools 0.13.0",
"re_arrow_util",
"re_tuid",
"re_types_core",
]
Expand All @@ -6010,6 +6025,7 @@ name = "re_grpc_client"
version = "0.22.0-alpha.1+dev"
dependencies = [
"arrow",
"re_arrow_util",
"re_chunk",
"re_error",
"re_log",
Expand Down Expand Up @@ -6123,6 +6139,7 @@ dependencies = [
"num-derive",
"num-traits",
"re_arrow2",
"re_arrow_util",
"re_build_info",
"re_byte_size",
"re_format",
Expand Down Expand Up @@ -6226,6 +6243,7 @@ dependencies = [
"paste",
"rand",
"re_arrow2",
"re_arrow_util",
"re_byte_size",
"re_chunk",
"re_chunk_store",
Expand Down Expand Up @@ -6585,6 +6603,7 @@ dependencies = [
"nohash-hasher",
"once_cell",
"re_arrow2",
"re_arrow_util",
"re_byte_size",
"re_case",
"re_error",
Expand Down Expand Up @@ -6613,6 +6632,7 @@ dependencies = [
"once_cell",
"parking_lot",
"rand",
"re_arrow_util",
"re_entity_db",
"re_format",
"re_log",
Expand Down Expand Up @@ -6707,6 +6727,7 @@ dependencies = [
"egui",
"egui_table",
"itertools 0.13.0",
"re_arrow_util",
"re_chunk_store",
"re_dataframe",
"re_error",
Expand Down Expand Up @@ -7371,6 +7392,7 @@ dependencies = [
"once_cell",
"parking_lot",
"re_arrow2",
"re_arrow_util",
"re_log",
"re_sdk",
"re_video",
Expand All @@ -7394,6 +7416,7 @@ dependencies = [
"pyo3-build-config",
"rand",
"re_arrow2",
"re_arrow_util",
"re_build_info",
"re_build_tools",
"re_chunk",
Expand Down
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ rerun-cli = { path = "crates/top/rerun-cli", version = "=0.22.0-alpha.1", defaul

# crates/utils:
re_analytics = { path = "crates/utils/re_analytics", version = "=0.22.0-alpha.1", default-features = false }
re_arrow_util = { path = "crates/utils/re_arrow_util", version = "=0.22.0-alpha.1", default-features = false }
re_byte_size = { path = "crates/utils/re_byte_size", version = "=0.22.0-alpha.1", default-features = false }
re_capabilities = { path = "crates/utils/re_capabilities", version = "=0.22.0-alpha.1", default-features = false }
re_case = { path = "crates/utils/re_case", version = "=0.22.0-alpha.1", default-features = false }
re_crash_handler = { path = "crates/utils/re_crash_handler", version = "=0.22.0-alpha.1", default-features = false }
Expand All @@ -80,7 +82,6 @@ re_format = { path = "crates/utils/re_format", version = "=0.22.0-alpha.1", defa
re_int_histogram = { path = "crates/utils/re_int_histogram", version = "=0.22.0-alpha.1", default-features = false }
re_log = { path = "crates/utils/re_log", version = "=0.22.0-alpha.1", default-features = false }
re_memory = { path = "crates/utils/re_memory", version = "=0.22.0-alpha.1", default-features = false }
re_byte_size = { path = "crates/utils/re_byte_size", version = "=0.22.0-alpha.1", default-features = false }
re_smart_channel = { path = "crates/utils/re_smart_channel", version = "=0.22.0-alpha.1", default-features = false }
re_string_interner = { path = "crates/utils/re_string_interner", version = "=0.22.0-alpha.1", default-features = false }
re_tracing = { path = "crates/utils/re_tracing", version = "=0.22.0-alpha.1", default-features = false }
Expand Down
18 changes: 9 additions & 9 deletions clippy.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,19 +50,19 @@ disallowed-methods = [
{ path = "std::panic::catch_unwind", reason = "We compile with `panic = 'abort'`" },
{ path = "std::thread::spawn", reason = "Use `std::thread::Builder` and name the thread" },

{ path = "arrow::compute::concat", reason = "Use `re_chunk::arrow_util::concat_arrays` instead, which has better memory management" },
{ path = "arrow::compute::filter", reason = "Use `re_chunk::arrow_util::filter_array` instead" },
{ path = "arrow::compute::take", reason = "Use `re_chunk::arrow_util::take_array` instead" },
{ path = "arrow::compute::concat", reason = "Use `re_arrow_util::arrow_util::concat_arrays` instead, which has better memory management" },
{ path = "arrow::compute::filter", reason = "Use `re_arrow_util::arrow_util::filter_array` instead" },
{ path = "arrow::compute::take", reason = "Use `re_arrow_util::arrow_util::take_array` instead" },

{ path = "arrow::datatypes::Schema::new", reason = "Use `arrow::datatypes::Schema::new_with_metadata` instead. There is usually some metadata you want to preserve." },

# Specify both `arrow2` and `re_arrow2` -- clippy gets lost in all the package renaming happening.
{ path = "arrow2::compute::concatenate::concatenate", reason = "Use `re_chunk::arrow2_util::concat_arrays` instead, which has proper early outs" },
{ path = "arrow2::compute::filter::filter", reason = "Use `re_chunk::arrow2_util::filter_array` instead, which has proper early outs" },
{ path = "arrow2::compute::take::take", reason = "Use `re_chunk::arrow2_util::take_array` instead, which has proper early outs" },
{ path = "re_arrow2::compute::concatenate::concatenate", reason = "Use `re_chunk::arrow2_util::concat_arrays` instead, which has proper early outs" },
{ path = "re_arrow2::compute::filter::filter", reason = "Use `re_chunk::arrow2_util::filter_array` instead, which has proper early outs" },
{ path = "re_arrow2::compute::take::take", reason = "Use `re_chunk::arrow2_util::take_array` instead, which has proper early outs" },
{ path = "arrow2::compute::concatenate::concatenate", reason = "Use `re_arrow_util::arrow2_util::concat_arrays` instead, which has proper early outs" },
{ path = "arrow2::compute::filter::filter", reason = "Use `re_arrow_util::arrow2_util::filter_array` instead, which has proper early outs" },
{ path = "arrow2::compute::take::take", reason = "Use `re_arrow_util::arrow2_util::take_array` instead, which has proper early outs" },
{ path = "re_arrow2::compute::concatenate::concatenate", reason = "Use `re_arrow_util::arrow2_util::concat_arrays` instead, which has proper early outs" },
{ path = "re_arrow2::compute::filter::filter", reason = "Use `re_arrow_util::arrow2_util::filter_array` instead, which has proper early outs" },
{ path = "re_arrow2::compute::take::take", reason = "Use `re_arrow_util::arrow2_util::take_array` instead, which has proper early outs" },

# There are many things that aren't allowed on wasm,
# but we cannot disable them all here (because of e.g. https://github.com/rust-lang/rust-clippy/issues/10406)
Expand Down
1 change: 1 addition & 0 deletions crates/store/re_chunk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ arrow = ["arrow2/arrow"]
[dependencies]

# Rerun
re_arrow_util.workspace = true
re_byte_size.workspace = true
re_error.workspace = true
re_format.workspace = true
Expand Down
3 changes: 2 additions & 1 deletion crates/store/re_chunk/src/batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ use arrow::buffer::ScalarBuffer as ArrowScalarBuffer;
use crossbeam::channel::{Receiver, Sender};
use nohash_hasher::IntMap;

use re_arrow_util::arrow_util;
use re_byte_size::SizeBytes as _;
use re_log_types::{EntityPath, ResolvedTimeRange, TimeInt, TimePoint, Timeline};
use re_types_core::ComponentDescriptor;

use crate::{arrow_util, chunk::ChunkComponents, Chunk, ChunkId, ChunkResult, RowId, TimeColumn};
use crate::{chunk::ChunkComponents, Chunk, ChunkId, ChunkResult, RowId, TimeColumn};

// ---

Expand Down
3 changes: 2 additions & 1 deletion crates/store/re_chunk/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ use arrow::{array::ArrayRef, datatypes::DataType as ArrowDatatype};
use itertools::Itertools;
use nohash_hasher::IntMap;

use re_arrow_util::arrow_util;
use re_log_types::{EntityPath, TimeInt, TimePoint, Timeline};
use re_types_core::{AsComponents, ComponentBatch, ComponentDescriptor, SerializedComponentBatch};

use crate::{arrow_util, chunk::ChunkComponents, Chunk, ChunkId, ChunkResult, RowId, TimeColumn};
use crate::{chunk::ChunkComponents, Chunk, ChunkId, ChunkResult, RowId, TimeColumn};

// ---

Expand Down
34 changes: 13 additions & 21 deletions crates/store/re_chunk/src/chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use arrow2::{
use itertools::{izip, Itertools};
use nohash_hasher::IntMap;

use re_arrow_util::ArrowArrayDowncastRef as _;
use re_byte_size::SizeBytes as _;
use re_log_types::{EntityPath, ResolvedTimeRange, Time, TimeInt, TimePoint, Timeline};
use re_types_core::{
Expand Down Expand Up @@ -445,8 +446,7 @@ impl Chunk {
let row_ids = <RowId as Loggable>::to_arrow(&row_ids)
// Unwrap: native RowIds cannot fail to serialize.
.unwrap()
.as_any()
.downcast_ref::<ArrowStructArray>()
.downcast_array_ref::<ArrowStructArray>()
// Unwrap: RowId schema is known in advance to be a struct array -- always.
.unwrap()
.clone();
Expand Down Expand Up @@ -501,8 +501,7 @@ impl Chunk {
let row_ids = <RowId as Loggable>::to_arrow(&row_ids)
// Unwrap: native RowIds cannot fail to serialize.
.unwrap()
.as_any()
.downcast_ref::<ArrowStructArray>()
.downcast_array_ref::<ArrowStructArray>()
// Unwrap: RowId schema is known in advance to be a struct array -- always.
.unwrap()
.clone();
Expand Down Expand Up @@ -876,8 +875,7 @@ impl Chunk {
.map_err(|err| ChunkError::Malformed {
reason: format!("RowIds failed to serialize: {err}"),
})?
.as_any()
.downcast_ref::<ArrowStructArray>()
.downcast_array_ref::<ArrowStructArray>()
// NOTE: impossible, but better safe than sorry.
.ok_or_else(|| ChunkError::Malformed {
reason: "RowIds failed to downcast".to_owned(),
Expand Down Expand Up @@ -1131,21 +1129,18 @@ impl TimeColumn {
}

// Sequence timelines are i64, but time columns are nanoseconds (also as i64).
if let Some(times) = array.as_any().downcast_ref::<arrow::array::Int64Array>() {
if let Some(times) = array.downcast_array_ref::<arrow::array::Int64Array>() {
Ok(times.values().clone())
} else if let Some(times) = array
.as_any()
.downcast_ref::<arrow::array::TimestampNanosecondArray>()
} else if let Some(times) =
array.downcast_array_ref::<arrow::array::TimestampNanosecondArray>()
{
Ok(times.values().clone())
} else if let Some(times) = array
.as_any()
.downcast_ref::<arrow::array::Time64NanosecondArray>()
} else if let Some(times) =
array.downcast_array_ref::<arrow::array::Time64NanosecondArray>()
{
Ok(times.values().clone())
} else if let Some(times) = array
.as_any()
.downcast_ref::<arrow::array::DurationNanosecondArray>()
} else if let Some(times) =
array.downcast_array_ref::<arrow::array::DurationNanosecondArray>()
{
Ok(times.values().clone())
} else {
Expand Down Expand Up @@ -1224,13 +1219,10 @@ impl Chunk {
};

#[allow(clippy::unwrap_used)]
let times = times.as_any().downcast_ref::<ArrowUInt64Array>().unwrap(); // sanity checked
let times = times.downcast_array_ref::<ArrowUInt64Array>().unwrap(); // sanity checked

#[allow(clippy::unwrap_used)]
let counters = counters
.as_any()
.downcast_ref::<ArrowUInt64Array>()
.unwrap(); // sanity checked
let counters = counters.downcast_array_ref::<ArrowUInt64Array>().unwrap(); // sanity checked

(times, counters)
}
Expand Down
40 changes: 40 additions & 0 deletions crates/store/re_chunk/src/concat_record_batches.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
use crate::TransportChunk;

use arrow::datatypes::Schema as ArrowSchema;
use arrow2::chunk::Chunk as Arrow2Chunk;

/// Concatenate multiple [`TransportChunk`]s into one.
///
/// This is a temporary method that we use while waiting to migrate towards `arrow-rs`.
/// * `arrow2` doesn't have a `RecordBatch` type, therefore we emulate that using our `TransportChunk`s.
/// * `arrow-rs` does have one, and it natively supports concatenation.
pub fn concatenate_record_batches(
schema: impl Into<ArrowSchema>,
batches: &[TransportChunk],
) -> anyhow::Result<TransportChunk> {
let schema: ArrowSchema = schema.into();
anyhow::ensure!(
batches
.iter()
.all(|batch| batch.schema_ref().as_ref() == &schema),
"concatenate_record_batches: all batches must have the same schema"
);

let mut output_columns = Vec::new();

if !batches.is_empty() {
for (i, _field) in schema.fields.iter().enumerate() {
let arrays: Option<Vec<_>> = batches.iter().map(|batch| batch.column(i)).collect();
let arrays = arrays.ok_or_else(|| {
anyhow::anyhow!("concatenate_record_batches: all batches must have the same schema")
})?;
let array = re_arrow_util::arrow2_util::concat_arrays(&arrays)?;
output_columns.push(array);
}
}

Ok(TransportChunk::new(
schema,
Arrow2Chunk::new(output_columns),
))
}
Loading

0 comments on commit efee5ad

Please sign in to comment.