Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve arrow flight batch splitting and naming #3444

Merged
merged 3 commits into from
Jan 5, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 48 additions & 6 deletions arrow-flight/src/encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ use futures::{ready, stream::BoxStream, Stream, StreamExt};
/// [`FlightError`]: crate::error::FlightError
#[derive(Debug)]
pub struct FlightDataEncoderBuilder {
/// The maximum message size (see details on [`Self::with_max_message_size`]).
max_batch_size: usize,
/// The maximum message size in bytes (see details on [`Self::with_max_message_size_bytes`]).
max_batch_size_bytes: usize,
/// Ipc writer options
options: IpcWriteOptions,
/// Metadata to add to the schema message
Expand All @@ -80,7 +80,7 @@ pub const GRPC_TARGET_MAX_BATCH_SIZE: usize = 2097152;
impl Default for FlightDataEncoderBuilder {
fn default() -> Self {
Self {
max_batch_size: GRPC_TARGET_MAX_BATCH_SIZE,
max_batch_size_bytes: GRPC_TARGET_MAX_BATCH_SIZE,
options: IpcWriteOptions::default(),
app_metadata: Bytes::new(),
}
Expand All @@ -100,8 +100,8 @@ impl FlightDataEncoderBuilder {
/// is approximate because there additional encoding overhead on
/// top of the underlying data itself.
///
pub fn with_max_message_size(mut self, max_batch_size: usize) -> Self {
self.max_batch_size = max_batch_size;
pub fn with_max_message_size_bytes(mut self, max_batch_size_bytes: usize) -> Self {
self.max_batch_size_bytes = max_batch_size_bytes;
alamb marked this conversation as resolved.
Show resolved Hide resolved
self
}

Expand All @@ -126,7 +126,7 @@ impl FlightDataEncoderBuilder {
S: Stream<Item = Result<RecordBatch>> + Send + 'static,
{
let Self {
max_batch_size,
max_batch_size_bytes: max_batch_size,
options,
app_metadata,
} = self;
Expand Down Expand Up @@ -419,6 +419,7 @@ mod tests {
array::{UInt32Array, UInt8Array},
compute::concat_batches,
};
use arrow_array::UInt64Array;

use super::*;

Expand Down Expand Up @@ -506,6 +507,47 @@ mod tests {
assert_eq!(concat_batches(&batch.schema(), &split).unwrap(), batch);
}

#[test]
fn test_split_batch_for_grpc_response_sizes() {
// 2000 8 byte entries into 2k pieces: 8 chunks of 250 rows
verify_split(2000, 2 * 1024, vec![250, 250, 250, 250, 250, 250, 250, 250]);

// 2000 8 byte entries into 4k pieces: 4 chunks of 500 rows
verify_split(2000, 4 * 1024, vec![500, 500, 500, 500]);

// 2023 8 byte entries into 3k pieces does not divide evenly
verify_split(2023, 3 * 1024, vec![337, 337, 337, 337, 337, 337, 1]);

// 10 8 byte entries into 1 byte pieces means each rows gets its own
verify_split(10, 1, vec![1, 1, 1, 1, 1, 1, 1, 1, 1, 1]);

// 10 8 byte entries into 1k byte pieces means one piece
verify_split(10, 1024, vec![10]);
}

/// Creates a UInt64Array of 8 byte integers with input_rows rows
/// `max_batch_size_bytes` pieces and verifies the row counts in
/// those pieces
fn verify_split(
num_input_rows: u64,
max_batch_size_bytes: usize,
expected_sizes: Vec<usize>,
) {
let array: UInt64Array = (0..num_input_rows).collect();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It occurs to me that this splitting won't work for dictionaries, not really sure what we can do about that though... 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 I guess we hope for small dictionaries


let batch = RecordBatch::try_from_iter(vec![("a", Arc::new(array) as ArrayRef)])
.expect("cannot create record batch");

let input_rows = batch.num_rows();

let split = split_batch_for_grpc_response(batch.clone(), max_batch_size_bytes);
let sizes: Vec<_> = split.iter().map(|batch| batch.num_rows()).collect();
let output_rows: usize = sizes.iter().sum();

assert_eq!(sizes, expected_sizes, "mismatch for {batch:?}");
assert_eq!(input_rows, output_rows, "mismatch for {batch:?}");
}

// test sending record batches
// test sending record batches with multiple different dictionaries
}
8 changes: 4 additions & 4 deletions arrow-flight/tests/encode_decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ async fn test_max_message_size() {
let input_batch_stream = futures::stream::iter(vec![Ok(make_primative_batch(5))]);

// 5 input rows, with a very small limit should result in 5 batch messages
let encoder = FlightDataEncoderBuilder::default().with_max_message_size(1);
let encoder = FlightDataEncoderBuilder::default().with_max_message_size_bytes(1);

let encode_stream = encoder.build(input_batch_stream);

Expand Down Expand Up @@ -164,9 +164,9 @@ async fn test_max_message_size_fuzz() {
make_primative_batch(127),
];

for max_message_size in [10, 1024, 2048, 6400, 3211212] {
let encoder =
FlightDataEncoderBuilder::default().with_max_message_size(max_message_size);
for max_message_size_bytes in [10, 1024, 2048, 6400, 3211212] {
let encoder = FlightDataEncoderBuilder::default()
.with_max_message_size_bytes(max_message_size_bytes);

let input_batch_stream = futures::stream::iter(input.clone()).map(Ok);

Expand Down