-
Notifications
You must be signed in to change notification settings - Fork 750
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improve arrow flight batch splitting and naming #3444
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -63,24 +63,25 @@ use futures::{ready, stream::BoxStream, Stream, StreamExt}; | |
/// [`FlightError`]: crate::error::FlightError | ||
#[derive(Debug)] | ||
pub struct FlightDataEncoderBuilder { | ||
/// The maximum message size (see details on [`Self::with_max_message_size`]). | ||
max_batch_size: usize, | ||
/// The maximum approximate target message size in bytes | ||
/// (see details on [`Self::with_max_flight_data_size`]). | ||
max_flight_data_size: usize, | ||
/// Ipc writer options | ||
options: IpcWriteOptions, | ||
/// Metadata to add to the schema message | ||
app_metadata: Bytes, | ||
} | ||
|
||
/// Default target size for record batches to send. | ||
/// Default target size for encoded [`FlightData`]. | ||
/// | ||
/// Note this value would normally be 4MB, but the size calculation is | ||
/// somewhat inexact, so we set it to 2MB. | ||
pub const GRPC_TARGET_MAX_BATCH_SIZE: usize = 2097152; | ||
pub const GRPC_TARGET_MAX_FLIGHT_SIZE_BYTES: usize = 2097152; | ||
|
||
impl Default for FlightDataEncoderBuilder { | ||
fn default() -> Self { | ||
Self { | ||
max_batch_size: GRPC_TARGET_MAX_BATCH_SIZE, | ||
max_flight_data_size: GRPC_TARGET_MAX_FLIGHT_SIZE_BYTES, | ||
options: IpcWriteOptions::default(), | ||
app_metadata: Bytes::new(), | ||
} | ||
|
@@ -92,16 +93,18 @@ impl FlightDataEncoderBuilder { | |
Self::default() | ||
} | ||
|
||
/// Set the (approximate) maximum encoded [`RecordBatch`] size to | ||
/// limit the gRPC message size. Defaults to 2MB. | ||
/// Set the (approximate) maximum size, in bytes, of the | ||
/// [`FlightData`] produced by this encoder. Defaults to 2MB. | ||
/// | ||
/// The encoder splits up [`RecordBatch`]s (preserving order) to | ||
/// limit individual messages to approximately this size. The size | ||
/// is approximate because there additional encoding overhead on | ||
/// top of the underlying data itself. | ||
/// Since there is often a maximum message size for gRPC messages | ||
/// (typically around 4MB), this encoder splits up [`RecordBatch`]s | ||
/// (preserving order) into multiple [`FlightData`] objects to | ||
/// limit the size individual messages sent via gRPC. | ||
/// | ||
pub fn with_max_message_size(mut self, max_batch_size: usize) -> Self { | ||
self.max_batch_size = max_batch_size; | ||
/// The size is approximate because of the additional encoding | ||
/// overhead on top of the underlying data buffers themselves. | ||
pub fn with_max_flight_data_size(mut self, max_flight_data_size: usize) -> Self { | ||
self.max_flight_data_size = max_flight_data_size; | ||
self | ||
} | ||
|
||
|
@@ -126,12 +129,12 @@ impl FlightDataEncoderBuilder { | |
S: Stream<Item = Result<RecordBatch>> + Send + 'static, | ||
{ | ||
let Self { | ||
max_batch_size, | ||
max_flight_data_size, | ||
options, | ||
app_metadata, | ||
} = self; | ||
|
||
FlightDataEncoder::new(input.boxed(), max_batch_size, options, app_metadata) | ||
FlightDataEncoder::new(input.boxed(), max_flight_data_size, options, app_metadata) | ||
} | ||
} | ||
|
||
|
@@ -143,29 +146,30 @@ pub struct FlightDataEncoder { | |
inner: BoxStream<'static, Result<RecordBatch>>, | ||
/// schema, set after the first batch | ||
schema: Option<SchemaRef>, | ||
/// Max size of batches to encode | ||
max_batch_size: usize, | ||
/// Target maximum size of flight data | ||
/// (see details on [`FlightDataEncoderBuilder::with_max_flight_data_size`]). | ||
max_flight_data_size: usize, | ||
/// do the encoding / tracking of dictionaries | ||
encoder: FlightIpcEncoder, | ||
/// optional metadata to add to schema FlightData | ||
app_metadata: Option<Bytes>, | ||
/// data queued up to send but not yet sent | ||
queue: VecDeque<FlightData>, | ||
/// Is this strema done (inner is empty or errored) | ||
/// Is this stream done (inner is empty or errored) | ||
done: bool, | ||
} | ||
|
||
impl FlightDataEncoder { | ||
fn new( | ||
inner: BoxStream<'static, Result<RecordBatch>>, | ||
max_batch_size: usize, | ||
max_flight_data_size: usize, | ||
options: IpcWriteOptions, | ||
app_metadata: Bytes, | ||
) -> Self { | ||
Self { | ||
inner, | ||
schema: None, | ||
max_batch_size, | ||
max_flight_data_size, | ||
encoder: FlightIpcEncoder::new(options), | ||
app_metadata: Some(app_metadata), | ||
queue: VecDeque::new(), | ||
|
@@ -210,7 +214,7 @@ impl FlightDataEncoder { | |
// encode the batch | ||
let batch = prepare_batch_for_flight(&batch, schema)?; | ||
|
||
for batch in split_batch_for_grpc_response(batch, self.max_batch_size) { | ||
for batch in split_batch_for_grpc_response(batch, self.max_flight_data_size) { | ||
let (flight_dictionaries, flight_batch) = | ||
self.encoder.encode_batch(&batch)?; | ||
|
||
|
@@ -300,16 +304,17 @@ fn prepare_schema_for_flight(schema: &Schema) -> Schema { | |
/// arrays: <https://github.com/apache/arrow-rs/issues/3407> | ||
fn split_batch_for_grpc_response( | ||
batch: RecordBatch, | ||
max_batch_size: usize, | ||
max_flight_data_size: usize, | ||
) -> Vec<RecordBatch> { | ||
let size = batch | ||
.columns() | ||
.iter() | ||
.map(|col| col.get_buffer_memory_size()) | ||
.sum::<usize>(); | ||
|
||
let n_batches = | ||
(size / max_batch_size + usize::from(size % max_batch_size != 0)).max(1); | ||
let n_batches = (size / max_flight_data_size | ||
+ usize::from(size % max_flight_data_size != 0)) | ||
.max(1); | ||
let rows_per_batch = (batch.num_rows() / n_batches).max(1); | ||
let mut out = Vec::with_capacity(n_batches + 1); | ||
|
||
|
@@ -419,6 +424,7 @@ mod tests { | |
array::{UInt32Array, UInt8Array}, | ||
compute::concat_batches, | ||
}; | ||
use arrow_array::UInt64Array; | ||
|
||
use super::*; | ||
|
||
|
@@ -480,24 +486,24 @@ mod tests { | |
|
||
#[test] | ||
fn test_split_batch_for_grpc_response() { | ||
let max_batch_size = 1024; | ||
let max_flight_data_size = 1024; | ||
|
||
// no split | ||
let c = UInt32Array::from(vec![1, 2, 3, 4, 5, 6]); | ||
let batch = RecordBatch::try_from_iter(vec![("a", Arc::new(c) as ArrayRef)]) | ||
.expect("cannot create record batch"); | ||
let split = split_batch_for_grpc_response(batch.clone(), max_batch_size); | ||
let split = split_batch_for_grpc_response(batch.clone(), max_flight_data_size); | ||
assert_eq!(split.len(), 1); | ||
assert_eq!(batch, split[0]); | ||
|
||
// split once | ||
let n_rows = max_batch_size + 1; | ||
let n_rows = max_flight_data_size + 1; | ||
assert!(n_rows % 2 == 1, "should be an odd number"); | ||
let c = | ||
UInt8Array::from((0..n_rows).map(|i| (i % 256) as u8).collect::<Vec<_>>()); | ||
let batch = RecordBatch::try_from_iter(vec![("a", Arc::new(c) as ArrayRef)]) | ||
.expect("cannot create record batch"); | ||
let split = split_batch_for_grpc_response(batch.clone(), max_batch_size); | ||
let split = split_batch_for_grpc_response(batch.clone(), max_flight_data_size); | ||
assert_eq!(split.len(), 3); | ||
assert_eq!( | ||
split.iter().map(|batch| batch.num_rows()).sum::<usize>(), | ||
|
@@ -506,6 +512,48 @@ mod tests { | |
assert_eq!(concat_batches(&batch.schema(), &split).unwrap(), batch); | ||
} | ||
|
||
#[test] | ||
fn test_split_batch_for_grpc_response_sizes() { | ||
// 2000 8 byte entries into 2k pieces: 8 chunks of 250 rows | ||
verify_split(2000, 2 * 1024, vec![250, 250, 250, 250, 250, 250, 250, 250]); | ||
|
||
// 2000 8 byte entries into 4k pieces: 4 chunks of 500 rows | ||
verify_split(2000, 4 * 1024, vec![500, 500, 500, 500]); | ||
|
||
// 2023 8 byte entries into 3k pieces does not divide evenly | ||
verify_split(2023, 3 * 1024, vec![337, 337, 337, 337, 337, 337, 1]); | ||
|
||
// 10 8 byte entries into 1 byte pieces means each rows gets its own | ||
verify_split(10, 1, vec![1, 1, 1, 1, 1, 1, 1, 1, 1, 1]); | ||
|
||
// 10 8 byte entries into 1k byte pieces means one piece | ||
verify_split(10, 1024, vec![10]); | ||
} | ||
|
||
/// Creates a UInt64Array of 8 byte integers with input_rows rows | ||
/// `max_flight_data_size_bytes` pieces and verifies the row counts in | ||
/// those pieces | ||
fn verify_split( | ||
num_input_rows: u64, | ||
max_flight_data_size_bytes: usize, | ||
expected_sizes: Vec<usize>, | ||
) { | ||
let array: UInt64Array = (0..num_input_rows).collect(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It occurs to me that this splitting won't work for dictionaries, not really sure what we can do about that though... 🤔 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🤔 I guess we hope for small dictionaries |
||
|
||
let batch = RecordBatch::try_from_iter(vec![("a", Arc::new(array) as ArrayRef)]) | ||
.expect("cannot create record batch"); | ||
|
||
let input_rows = batch.num_rows(); | ||
|
||
let split = | ||
split_batch_for_grpc_response(batch.clone(), max_flight_data_size_bytes); | ||
let sizes: Vec<_> = split.iter().map(|batch| batch.num_rows()).collect(); | ||
let output_rows: usize = sizes.iter().sum(); | ||
|
||
assert_eq!(sizes, expected_sizes, "mismatch for {batch:?}"); | ||
assert_eq!(input_rows, output_rows, "mismatch for {batch:?}"); | ||
} | ||
|
||
// test sending record batches | ||
// test sending record batches with multiple different dictionaries | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I renamed this to
max_flight_data_size
and tried to clarify the intent more