Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve arrow flight batch splitting and naming #3444

Merged
merged 3 commits into from
Jan 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 76 additions & 28 deletions arrow-flight/src/encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,24 +63,25 @@ use futures::{ready, stream::BoxStream, Stream, StreamExt};
/// [`FlightError`]: crate::error::FlightError
#[derive(Debug)]
pub struct FlightDataEncoderBuilder {
/// The maximum message size (see details on [`Self::with_max_message_size`]).
max_batch_size: usize,
/// The maximum approximate target message size in bytes
/// (see details on [`Self::with_max_flight_data_size`]).
max_flight_data_size: usize,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I renamed this to max_flight_data_size and tried to clarify the intent more

/// Ipc writer options
options: IpcWriteOptions,
/// Metadata to add to the schema message
app_metadata: Bytes,
}

/// Default target size for record batches to send.
/// Default target size for encoded [`FlightData`].
///
/// Note this value would normally be 4MB, but the size calculation is
/// somewhat inexact, so we set it to 2MB.
pub const GRPC_TARGET_MAX_BATCH_SIZE: usize = 2097152;
pub const GRPC_TARGET_MAX_FLIGHT_SIZE_BYTES: usize = 2097152;

impl Default for FlightDataEncoderBuilder {
fn default() -> Self {
Self {
max_batch_size: GRPC_TARGET_MAX_BATCH_SIZE,
max_flight_data_size: GRPC_TARGET_MAX_FLIGHT_SIZE_BYTES,
options: IpcWriteOptions::default(),
app_metadata: Bytes::new(),
}
Expand All @@ -92,16 +93,18 @@ impl FlightDataEncoderBuilder {
Self::default()
}

/// Set the (approximate) maximum encoded [`RecordBatch`] size to
/// limit the gRPC message size. Defaults to 2MB.
/// Set the (approximate) maximum size, in bytes, of the
/// [`FlightData`] produced by this encoder. Defaults to 2MB.
///
/// The encoder splits up [`RecordBatch`]s (preserving order) to
/// limit individual messages to approximately this size. The size
/// is approximate because there additional encoding overhead on
/// top of the underlying data itself.
/// Since there is often a maximum message size for gRPC messages
/// (typically around 4MB), this encoder splits up [`RecordBatch`]s
/// (preserving order) into multiple [`FlightData`] objects to
/// limit the size individual messages sent via gRPC.
///
pub fn with_max_message_size(mut self, max_batch_size: usize) -> Self {
self.max_batch_size = max_batch_size;
/// The size is approximate because of the additional encoding
/// overhead on top of the underlying data buffers themselves.
pub fn with_max_flight_data_size(mut self, max_flight_data_size: usize) -> Self {
self.max_flight_data_size = max_flight_data_size;
self
}

Expand All @@ -126,12 +129,12 @@ impl FlightDataEncoderBuilder {
S: Stream<Item = Result<RecordBatch>> + Send + 'static,
{
let Self {
max_batch_size,
max_flight_data_size,
options,
app_metadata,
} = self;

FlightDataEncoder::new(input.boxed(), max_batch_size, options, app_metadata)
FlightDataEncoder::new(input.boxed(), max_flight_data_size, options, app_metadata)
}
}

Expand All @@ -143,29 +146,30 @@ pub struct FlightDataEncoder {
inner: BoxStream<'static, Result<RecordBatch>>,
/// schema, set after the first batch
schema: Option<SchemaRef>,
/// Max size of batches to encode
max_batch_size: usize,
/// Target maximum size of flight data
/// (see details on [`FlightDataEncoderBuilder::with_max_flight_data_size`]).
max_flight_data_size: usize,
/// do the encoding / tracking of dictionaries
encoder: FlightIpcEncoder,
/// optional metadata to add to schema FlightData
app_metadata: Option<Bytes>,
/// data queued up to send but not yet sent
queue: VecDeque<FlightData>,
/// Is this strema done (inner is empty or errored)
/// Is this stream done (inner is empty or errored)
done: bool,
}

impl FlightDataEncoder {
fn new(
inner: BoxStream<'static, Result<RecordBatch>>,
max_batch_size: usize,
max_flight_data_size: usize,
options: IpcWriteOptions,
app_metadata: Bytes,
) -> Self {
Self {
inner,
schema: None,
max_batch_size,
max_flight_data_size,
encoder: FlightIpcEncoder::new(options),
app_metadata: Some(app_metadata),
queue: VecDeque::new(),
Expand Down Expand Up @@ -210,7 +214,7 @@ impl FlightDataEncoder {
// encode the batch
let batch = prepare_batch_for_flight(&batch, schema)?;

for batch in split_batch_for_grpc_response(batch, self.max_batch_size) {
for batch in split_batch_for_grpc_response(batch, self.max_flight_data_size) {
let (flight_dictionaries, flight_batch) =
self.encoder.encode_batch(&batch)?;

Expand Down Expand Up @@ -300,16 +304,17 @@ fn prepare_schema_for_flight(schema: &Schema) -> Schema {
/// arrays: <https://github.com/apache/arrow-rs/issues/3407>
fn split_batch_for_grpc_response(
batch: RecordBatch,
max_batch_size: usize,
max_flight_data_size: usize,
) -> Vec<RecordBatch> {
let size = batch
.columns()
.iter()
.map(|col| col.get_buffer_memory_size())
.sum::<usize>();

let n_batches =
(size / max_batch_size + usize::from(size % max_batch_size != 0)).max(1);
let n_batches = (size / max_flight_data_size
+ usize::from(size % max_flight_data_size != 0))
.max(1);
let rows_per_batch = (batch.num_rows() / n_batches).max(1);
let mut out = Vec::with_capacity(n_batches + 1);

Expand Down Expand Up @@ -419,6 +424,7 @@ mod tests {
array::{UInt32Array, UInt8Array},
compute::concat_batches,
};
use arrow_array::UInt64Array;

use super::*;

Expand Down Expand Up @@ -480,24 +486,24 @@ mod tests {

#[test]
fn test_split_batch_for_grpc_response() {
let max_batch_size = 1024;
let max_flight_data_size = 1024;

// no split
let c = UInt32Array::from(vec![1, 2, 3, 4, 5, 6]);
let batch = RecordBatch::try_from_iter(vec![("a", Arc::new(c) as ArrayRef)])
.expect("cannot create record batch");
let split = split_batch_for_grpc_response(batch.clone(), max_batch_size);
let split = split_batch_for_grpc_response(batch.clone(), max_flight_data_size);
assert_eq!(split.len(), 1);
assert_eq!(batch, split[0]);

// split once
let n_rows = max_batch_size + 1;
let n_rows = max_flight_data_size + 1;
assert!(n_rows % 2 == 1, "should be an odd number");
let c =
UInt8Array::from((0..n_rows).map(|i| (i % 256) as u8).collect::<Vec<_>>());
let batch = RecordBatch::try_from_iter(vec![("a", Arc::new(c) as ArrayRef)])
.expect("cannot create record batch");
let split = split_batch_for_grpc_response(batch.clone(), max_batch_size);
let split = split_batch_for_grpc_response(batch.clone(), max_flight_data_size);
assert_eq!(split.len(), 3);
assert_eq!(
split.iter().map(|batch| batch.num_rows()).sum::<usize>(),
Expand All @@ -506,6 +512,48 @@ mod tests {
assert_eq!(concat_batches(&batch.schema(), &split).unwrap(), batch);
}

#[test]
fn test_split_batch_for_grpc_response_sizes() {
// 2000 8 byte entries into 2k pieces: 8 chunks of 250 rows
verify_split(2000, 2 * 1024, vec![250, 250, 250, 250, 250, 250, 250, 250]);

// 2000 8 byte entries into 4k pieces: 4 chunks of 500 rows
verify_split(2000, 4 * 1024, vec![500, 500, 500, 500]);

// 2023 8 byte entries into 3k pieces does not divide evenly
verify_split(2023, 3 * 1024, vec![337, 337, 337, 337, 337, 337, 1]);

// 10 8 byte entries into 1 byte pieces means each rows gets its own
verify_split(10, 1, vec![1, 1, 1, 1, 1, 1, 1, 1, 1, 1]);

// 10 8 byte entries into 1k byte pieces means one piece
verify_split(10, 1024, vec![10]);
}

/// Creates a UInt64Array of 8 byte integers with input_rows rows
/// `max_flight_data_size_bytes` pieces and verifies the row counts in
/// those pieces
fn verify_split(
num_input_rows: u64,
max_flight_data_size_bytes: usize,
expected_sizes: Vec<usize>,
) {
let array: UInt64Array = (0..num_input_rows).collect();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It occurs to me that this splitting won't work for dictionaries, not really sure what we can do about that though... 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 I guess we hope for small dictionaries


let batch = RecordBatch::try_from_iter(vec![("a", Arc::new(array) as ArrayRef)])
.expect("cannot create record batch");

let input_rows = batch.num_rows();

let split =
split_batch_for_grpc_response(batch.clone(), max_flight_data_size_bytes);
let sizes: Vec<_> = split.iter().map(|batch| batch.num_rows()).collect();
let output_rows: usize = sizes.iter().sum();

assert_eq!(sizes, expected_sizes, "mismatch for {batch:?}");
assert_eq!(input_rows, output_rows, "mismatch for {batch:?}");
}

// test sending record batches
// test sending record batches with multiple different dictionaries
}
8 changes: 4 additions & 4 deletions arrow-flight/tests/encode_decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ async fn test_max_message_size() {
let input_batch_stream = futures::stream::iter(vec![Ok(make_primative_batch(5))]);

// 5 input rows, with a very small limit should result in 5 batch messages
let encoder = FlightDataEncoderBuilder::default().with_max_message_size(1);
let encoder = FlightDataEncoderBuilder::default().with_max_flight_data_size(1);

let encode_stream = encoder.build(input_batch_stream);

Expand Down Expand Up @@ -164,9 +164,9 @@ async fn test_max_message_size_fuzz() {
make_primative_batch(127),
];

for max_message_size in [10, 1024, 2048, 6400, 3211212] {
let encoder =
FlightDataEncoderBuilder::default().with_max_message_size(max_message_size);
for max_message_size_bytes in [10, 1024, 2048, 6400, 3211212] {
let encoder = FlightDataEncoderBuilder::default()
.with_max_flight_data_size(max_message_size_bytes);

let input_batch_stream = futures::stream::iter(input.clone()).map(Ok);

Expand Down