From 7079d4a9464fd56e9fd81200354fe2e109474570 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 3 Jan 2023 15:12:03 -0500 Subject: [PATCH 1/2] Improve arrow flight batch splitting and naming --- arrow-flight/src/encode.rs | 54 +++++++++++++++++++++++++---- arrow-flight/tests/encode_decode.rs | 8 ++--- 2 files changed, 52 insertions(+), 10 deletions(-) diff --git a/arrow-flight/src/encode.rs b/arrow-flight/src/encode.rs index 7c339b67d488..bbee38370be9 100644 --- a/arrow-flight/src/encode.rs +++ b/arrow-flight/src/encode.rs @@ -63,8 +63,8 @@ use futures::{ready, stream::BoxStream, Stream, StreamExt}; /// [`FlightError`]: crate::error::FlightError #[derive(Debug)] pub struct FlightDataEncoderBuilder { - /// The maximum message size (see details on [`Self::with_max_message_size`]). - max_batch_size: usize, + /// The maximum message size in bytes (see details on [`Self::with_max_message_size_bytes`]). + max_batch_size_bytes: usize, /// Ipc writer options options: IpcWriteOptions, /// Metadata to add to the schema message @@ -80,7 +80,7 @@ pub const GRPC_TARGET_MAX_BATCH_SIZE: usize = 2097152; impl Default for FlightDataEncoderBuilder { fn default() -> Self { Self { - max_batch_size: GRPC_TARGET_MAX_BATCH_SIZE, + max_batch_size_bytes: GRPC_TARGET_MAX_BATCH_SIZE, options: IpcWriteOptions::default(), app_metadata: Bytes::new(), } @@ -100,8 +100,8 @@ impl FlightDataEncoderBuilder { /// is approximate because there additional encoding overhead on /// top of the underlying data itself. /// - pub fn with_max_message_size(mut self, max_batch_size: usize) -> Self { - self.max_batch_size = max_batch_size; + pub fn with_max_message_size_bytes(mut self, max_batch_size_bytes: usize) -> Self { + self.max_batch_size_bytes = max_batch_size_bytes; self } @@ -126,7 +126,7 @@ impl FlightDataEncoderBuilder { S: Stream> + Send + 'static, { let Self { - max_batch_size, + max_batch_size_bytes: max_batch_size, options, app_metadata, } = self; @@ -419,6 +419,7 @@ mod tests { array::{UInt32Array, UInt8Array}, compute::concat_batches, }; + use arrow_array::UInt64Array; use super::*; @@ -506,6 +507,47 @@ mod tests { assert_eq!(concat_batches(&batch.schema(), &split).unwrap(), batch); } + #[test] + fn test_split_batch_for_grpc_response_sizes() { + // 2000 8 byte entries into 2k pieces: 8 chunks of 250 rows + verify_split(2000, 2 * 1024, vec![250, 250, 250, 250, 250, 250, 250, 250]); + + // 2000 8 byte entries into 4k pieces: 4 chunks of 500 rows + verify_split(2000, 4 * 1024, vec![500, 500, 500, 500]); + + // 2023 8 byte entries into 3k pieces does not divide evenly + verify_split(2023, 3 * 1024, vec![337, 337, 337, 337, 337, 337, 1]); + + // 10 8 byte entries into 1 byte pieces means each rows gets its own + verify_split(10, 1, vec![1, 1, 1, 1, 1, 1, 1, 1, 1, 1]); + + // 10 8 byte entries into 1k byte pieces means one piece + verify_split(10, 1024, vec![10]); + } + + /// Creates a UInt64Array of 8 byte integers with input_rows rows + /// `max_batch_size_bytes` pieces and verifies the row counts in + /// those pieces + fn verify_split( + num_input_rows: u64, + max_batch_size_bytes: usize, + expected_sizes: Vec, + ) { + let array: UInt64Array = (0..num_input_rows).collect(); + + let batch = RecordBatch::try_from_iter(vec![("a", Arc::new(array) as ArrayRef)]) + .expect("cannot create record batch"); + + let input_rows = batch.num_rows(); + + let split = split_batch_for_grpc_response(batch.clone(), max_batch_size_bytes); + let sizes: Vec<_> = split.iter().map(|batch| batch.num_rows()).collect(); + let output_rows: usize = sizes.iter().sum(); + + assert_eq!(sizes, expected_sizes, "mismatch for {batch:?}"); + assert_eq!(input_rows, output_rows, "mismatch for {batch:?}"); + } + // test sending record batches // test sending record batches with multiple different dictionaries } diff --git a/arrow-flight/tests/encode_decode.rs b/arrow-flight/tests/encode_decode.rs index 45b8c0bf5ac9..f13468fb4175 100644 --- a/arrow-flight/tests/encode_decode.rs +++ b/arrow-flight/tests/encode_decode.rs @@ -131,7 +131,7 @@ async fn test_max_message_size() { let input_batch_stream = futures::stream::iter(vec![Ok(make_primative_batch(5))]); // 5 input rows, with a very small limit should result in 5 batch messages - let encoder = FlightDataEncoderBuilder::default().with_max_message_size(1); + let encoder = FlightDataEncoderBuilder::default().with_max_message_size_bytes(1); let encode_stream = encoder.build(input_batch_stream); @@ -164,9 +164,9 @@ async fn test_max_message_size_fuzz() { make_primative_batch(127), ]; - for max_message_size in [10, 1024, 2048, 6400, 3211212] { - let encoder = - FlightDataEncoderBuilder::default().with_max_message_size(max_message_size); + for max_message_size_bytes in [10, 1024, 2048, 6400, 3211212] { + let encoder = FlightDataEncoderBuilder::default() + .with_max_message_size_bytes(max_message_size_bytes); let input_batch_stream = futures::stream::iter(input.clone()).map(Ok); From f4f6c7aabbc3b6438195556c94bae41ae0ad5518 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 4 Jan 2023 16:57:06 -0500 Subject: [PATCH 2/2] Review feedback: rename to max_flight_data_size --- arrow-flight/src/encode.rs | 68 ++++++++++++++++------------- arrow-flight/tests/encode_decode.rs | 4 +- 2 files changed, 39 insertions(+), 33 deletions(-) diff --git a/arrow-flight/src/encode.rs b/arrow-flight/src/encode.rs index bbee38370be9..55000bba2fad 100644 --- a/arrow-flight/src/encode.rs +++ b/arrow-flight/src/encode.rs @@ -63,24 +63,25 @@ use futures::{ready, stream::BoxStream, Stream, StreamExt}; /// [`FlightError`]: crate::error::FlightError #[derive(Debug)] pub struct FlightDataEncoderBuilder { - /// The maximum message size in bytes (see details on [`Self::with_max_message_size_bytes`]). - max_batch_size_bytes: usize, + /// The maximum approximate target message size in bytes + /// (see details on [`Self::with_max_flight_data_size`]). + max_flight_data_size: usize, /// Ipc writer options options: IpcWriteOptions, /// Metadata to add to the schema message app_metadata: Bytes, } -/// Default target size for record batches to send. +/// Default target size for encoded [`FlightData`]. /// /// Note this value would normally be 4MB, but the size calculation is /// somewhat inexact, so we set it to 2MB. -pub const GRPC_TARGET_MAX_BATCH_SIZE: usize = 2097152; +pub const GRPC_TARGET_MAX_FLIGHT_SIZE_BYTES: usize = 2097152; impl Default for FlightDataEncoderBuilder { fn default() -> Self { Self { - max_batch_size_bytes: GRPC_TARGET_MAX_BATCH_SIZE, + max_flight_data_size: GRPC_TARGET_MAX_FLIGHT_SIZE_BYTES, options: IpcWriteOptions::default(), app_metadata: Bytes::new(), } @@ -92,16 +93,18 @@ impl FlightDataEncoderBuilder { Self::default() } - /// Set the (approximate) maximum encoded [`RecordBatch`] size to - /// limit the gRPC message size. Defaults to 2MB. + /// Set the (approximate) maximum size, in bytes, of the + /// [`FlightData`] produced by this encoder. Defaults to 2MB. /// - /// The encoder splits up [`RecordBatch`]s (preserving order) to - /// limit individual messages to approximately this size. The size - /// is approximate because there additional encoding overhead on - /// top of the underlying data itself. + /// Since there is often a maximum message size for gRPC messages + /// (typically around 4MB), this encoder splits up [`RecordBatch`]s + /// (preserving order) into multiple [`FlightData`] objects to + /// limit the size individual messages sent via gRPC. /// - pub fn with_max_message_size_bytes(mut self, max_batch_size_bytes: usize) -> Self { - self.max_batch_size_bytes = max_batch_size_bytes; + /// The size is approximate because of the additional encoding + /// overhead on top of the underlying data buffers themselves. + pub fn with_max_flight_data_size(mut self, max_flight_data_size: usize) -> Self { + self.max_flight_data_size = max_flight_data_size; self } @@ -126,12 +129,12 @@ impl FlightDataEncoderBuilder { S: Stream> + Send + 'static, { let Self { - max_batch_size_bytes: max_batch_size, + max_flight_data_size, options, app_metadata, } = self; - FlightDataEncoder::new(input.boxed(), max_batch_size, options, app_metadata) + FlightDataEncoder::new(input.boxed(), max_flight_data_size, options, app_metadata) } } @@ -143,29 +146,30 @@ pub struct FlightDataEncoder { inner: BoxStream<'static, Result>, /// schema, set after the first batch schema: Option, - /// Max size of batches to encode - max_batch_size: usize, + /// Target maximum size of flight data + /// (see details on [`FlightDataEncoderBuilder::with_max_flight_data_size`]). + max_flight_data_size: usize, /// do the encoding / tracking of dictionaries encoder: FlightIpcEncoder, /// optional metadata to add to schema FlightData app_metadata: Option, /// data queued up to send but not yet sent queue: VecDeque, - /// Is this strema done (inner is empty or errored) + /// Is this stream done (inner is empty or errored) done: bool, } impl FlightDataEncoder { fn new( inner: BoxStream<'static, Result>, - max_batch_size: usize, + max_flight_data_size: usize, options: IpcWriteOptions, app_metadata: Bytes, ) -> Self { Self { inner, schema: None, - max_batch_size, + max_flight_data_size, encoder: FlightIpcEncoder::new(options), app_metadata: Some(app_metadata), queue: VecDeque::new(), @@ -210,7 +214,7 @@ impl FlightDataEncoder { // encode the batch let batch = prepare_batch_for_flight(&batch, schema)?; - for batch in split_batch_for_grpc_response(batch, self.max_batch_size) { + for batch in split_batch_for_grpc_response(batch, self.max_flight_data_size) { let (flight_dictionaries, flight_batch) = self.encoder.encode_batch(&batch)?; @@ -300,7 +304,7 @@ fn prepare_schema_for_flight(schema: &Schema) -> Schema { /// arrays: fn split_batch_for_grpc_response( batch: RecordBatch, - max_batch_size: usize, + max_flight_data_size: usize, ) -> Vec { let size = batch .columns() @@ -308,8 +312,9 @@ fn split_batch_for_grpc_response( .map(|col| col.get_buffer_memory_size()) .sum::(); - let n_batches = - (size / max_batch_size + usize::from(size % max_batch_size != 0)).max(1); + let n_batches = (size / max_flight_data_size + + usize::from(size % max_flight_data_size != 0)) + .max(1); let rows_per_batch = (batch.num_rows() / n_batches).max(1); let mut out = Vec::with_capacity(n_batches + 1); @@ -481,24 +486,24 @@ mod tests { #[test] fn test_split_batch_for_grpc_response() { - let max_batch_size = 1024; + let max_flight_data_size = 1024; // no split let c = UInt32Array::from(vec![1, 2, 3, 4, 5, 6]); let batch = RecordBatch::try_from_iter(vec![("a", Arc::new(c) as ArrayRef)]) .expect("cannot create record batch"); - let split = split_batch_for_grpc_response(batch.clone(), max_batch_size); + let split = split_batch_for_grpc_response(batch.clone(), max_flight_data_size); assert_eq!(split.len(), 1); assert_eq!(batch, split[0]); // split once - let n_rows = max_batch_size + 1; + let n_rows = max_flight_data_size + 1; assert!(n_rows % 2 == 1, "should be an odd number"); let c = UInt8Array::from((0..n_rows).map(|i| (i % 256) as u8).collect::>()); let batch = RecordBatch::try_from_iter(vec![("a", Arc::new(c) as ArrayRef)]) .expect("cannot create record batch"); - let split = split_batch_for_grpc_response(batch.clone(), max_batch_size); + let split = split_batch_for_grpc_response(batch.clone(), max_flight_data_size); assert_eq!(split.len(), 3); assert_eq!( split.iter().map(|batch| batch.num_rows()).sum::(), @@ -526,11 +531,11 @@ mod tests { } /// Creates a UInt64Array of 8 byte integers with input_rows rows - /// `max_batch_size_bytes` pieces and verifies the row counts in + /// `max_flight_data_size_bytes` pieces and verifies the row counts in /// those pieces fn verify_split( num_input_rows: u64, - max_batch_size_bytes: usize, + max_flight_data_size_bytes: usize, expected_sizes: Vec, ) { let array: UInt64Array = (0..num_input_rows).collect(); @@ -540,7 +545,8 @@ mod tests { let input_rows = batch.num_rows(); - let split = split_batch_for_grpc_response(batch.clone(), max_batch_size_bytes); + let split = + split_batch_for_grpc_response(batch.clone(), max_flight_data_size_bytes); let sizes: Vec<_> = split.iter().map(|batch| batch.num_rows()).collect(); let output_rows: usize = sizes.iter().sum(); diff --git a/arrow-flight/tests/encode_decode.rs b/arrow-flight/tests/encode_decode.rs index f13468fb4175..0aa98768774e 100644 --- a/arrow-flight/tests/encode_decode.rs +++ b/arrow-flight/tests/encode_decode.rs @@ -131,7 +131,7 @@ async fn test_max_message_size() { let input_batch_stream = futures::stream::iter(vec![Ok(make_primative_batch(5))]); // 5 input rows, with a very small limit should result in 5 batch messages - let encoder = FlightDataEncoderBuilder::default().with_max_message_size_bytes(1); + let encoder = FlightDataEncoderBuilder::default().with_max_flight_data_size(1); let encode_stream = encoder.build(input_batch_stream); @@ -166,7 +166,7 @@ async fn test_max_message_size_fuzz() { for max_message_size_bytes in [10, 1024, 2048, 6400, 3211212] { let encoder = FlightDataEncoderBuilder::default() - .with_max_message_size_bytes(max_message_size_bytes); + .with_max_flight_data_size(max_message_size_bytes); let input_batch_stream = futures::stream::iter(input.clone()).map(Ok);