Skip to content

Commit

Permalink
Copying inappropriately aligned buffer in ipc reader (#2883)
Browse files Browse the repository at this point in the history
* Fix ptr alignment error.

* Rewrite test

* Add values

* Copy buffer if it is not aligned properly

* Move to a function

* Cover IntervalMonthDayNanoType too

* Remove unnecessary change

* For review

* Make it generic for i256

* Lift into parent match and use minimum length.
  • Loading branch information
viirya authored Oct 16, 2022
1 parent a3effc1 commit bfd87bd
Showing 1 changed file with 35 additions and 4 deletions.
39 changes: 35 additions & 4 deletions arrow/src/ipc/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
//! The `FileReader` and `StreamReader` have similar interfaces,
//! however the `FileReader` expects a reader that supports `Seek`ing

use arrow_buffer::i256;
use std::collections::HashMap;
use std::fmt;
use std::io::{BufReader, Read, Seek, SeekFrom};
Expand Down Expand Up @@ -477,18 +478,30 @@ fn create_primitive_array(
| Timestamp(_, _)
| Date64
| Duration(_)
| Interval(IntervalUnit::DayTime)
| Interval(IntervalUnit::MonthDayNano) => ArrayData::builder(data_type.clone())
| Interval(IntervalUnit::DayTime) => ArrayData::builder(data_type.clone())
.len(length)
.add_buffer(buffers[1].clone())
.null_bit_buffer(null_buffer)
.build()
.unwrap(),
Decimal128(_, _) | Decimal256(_, _) => {
Interval(IntervalUnit::MonthDayNano) | Decimal128(_, _) => {
let buffer = get_aligned_buffer::<i128>(&buffers[1], length);

// read 2 buffers: null buffer (optional) and data buffer
ArrayData::builder(data_type.clone())
.len(length)
.add_buffer(buffers[1].clone())
.add_buffer(buffer)
.null_bit_buffer(null_buffer)
.build()
.unwrap()
}
Decimal256(_, _) => {
let buffer = get_aligned_buffer::<i256>(&buffers[1], length);

// read 2 buffers: null buffer (optional) and data buffer
ArrayData::builder(data_type.clone())
.len(length)
.add_buffer(buffer)
.null_bit_buffer(null_buffer)
.build()
.unwrap()
Expand All @@ -499,6 +512,24 @@ fn create_primitive_array(
make_array(array_data)
}

/// Checks if given `Buffer` is properly aligned with `T`.
/// If not, copying the data and padded it for alignment.
fn get_aligned_buffer<T>(buffer: &Buffer, length: usize) -> Buffer {
let ptr = buffer.as_ptr();
let align_req = std::mem::align_of::<T>();
let align_offset = ptr.align_offset(align_req);
// The buffer is not aligned properly. The writer might use a smaller alignment
// e.g. 8 bytes, but on some platform (e.g. ARM) i128 requires 16 bytes alignment.
// We need to copy the buffer as fallback.
if align_offset != 0 {
let len_in_bytes = (length * std::mem::size_of::<T>()).min(buffer.len());
let slice = &buffer.as_slice()[0..len_in_bytes];
Buffer::from_slice_ref(&slice)
} else {
buffer.clone()
}
}

/// Reads the correct number of buffers based on list type and null_count, and creates a
/// list array ref
fn create_list_array(
Expand Down

0 comments on commit bfd87bd

Please sign in to comment.