Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Copying inappropriately aligned buffer in ipc reader #2883

Merged
merged 10 commits into from
Oct 16, 2022
32 changes: 28 additions & 4 deletions arrow/src/ipc/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
//! The `FileReader` and `StreamReader` have similar interfaces,
//! however the `FileReader` expects a reader that supports `Seek`ing

use arrow_buffer::i256;
use std::collections::HashMap;
use std::fmt;
use std::io::{BufReader, Read, Seek, SeekFrom};
Expand Down Expand Up @@ -477,18 +478,23 @@ fn create_primitive_array(
| Timestamp(_, _)
| Date64
| Duration(_)
| Interval(IntervalUnit::DayTime)
| Interval(IntervalUnit::MonthDayNano) => ArrayData::builder(data_type.clone())
| Interval(IntervalUnit::DayTime) => ArrayData::builder(data_type.clone())
.len(length)
.add_buffer(buffers[1].clone())
.null_bit_buffer(null_buffer)
.build()
.unwrap(),
Decimal128(_, _) | Decimal256(_, _) => {
Interval(IntervalUnit::MonthDayNano) | Decimal128(_, _) | Decimal256(_, _) => {
let buffer = if matches!(data_type, &DataType::Decimal256(_, _)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not lift into parent match?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Save a few lines? :) I lifted it now.

get_aligned_buffer::<i256>(&buffers[1], length)
} else {
get_aligned_buffer::<i128>(&buffers[1], length)
};

// read 2 buffers: null buffer (optional) and data buffer
ArrayData::builder(data_type.clone())
.len(length)
.add_buffer(buffers[1].clone())
.add_buffer(buffer)
.null_bit_buffer(null_buffer)
.build()
.unwrap()
Expand All @@ -499,6 +505,24 @@ fn create_primitive_array(
make_array(array_data)
}

/// Checks if given `Buffer` is properly aligned with `T`.
/// If not, copying the data and padded it for alignment.
fn get_aligned_buffer<T>(buffer: &Buffer, length: usize) -> Buffer {
let ptr = buffer.as_ptr();
let align_req = std::mem::align_of::<T>();
let align_offset = ptr.align_offset(align_req);
// The buffer is not aligned properly. The writer might use a smaller alignment
// e.g. 8 bytes, but on some platform (e.g. ARM) i128 requires 16 bytes alignment.
// We need to copy the buffer as fallback.
if align_offset != 0 {
let len_in_bytes = length * std::mem::size_of::<T>();
let slice = &buffer.as_slice()[0..len_in_bytes];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor nit, this will panic for invalid data where previously we would error. Should just be a case of taking the minimum of expected and actual length

Buffer::from_slice_ref(&slice)
} else {
buffer.clone()
}
}

/// Reads the correct number of buffers based on list type and null_count, and creates a
/// list array ref
fn create_list_array(
Expand Down