From 77f0df90dabb19770474a98ba7a355addc54194a Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Fri, 25 Feb 2022 11:11:55 +0100 Subject: [PATCH 1/5] change csv-writer --- Cargo.toml | 3 +- benches/write_csv.rs | 4 +- examples/csv_write.rs | 4 +- src/io/csv/write/mod.rs | 33 +++++++++++---- src/io/csv/write/serialize.rs | 78 +++++++++++++++++++++++++++++++---- tests/it/io/csv/write.rs | 43 +++++++++++++------ 6 files changed, 133 insertions(+), 32 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 1359736e7e3..0ae82024661 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,6 +30,7 @@ simdutf8 = "0.1.3" # for csv io csv = { version = "^1.1", optional = true } +csv-core = { version = "0.1", optional = true } # for csv async io csv-async = { version = "^1.1", optional = true } @@ -129,7 +130,7 @@ io_csv = ["io_csv_read", "io_csv_write"] io_csv_async = ["io_csv_read_async"] io_csv_read = ["csv", "lexical-core"] io_csv_read_async = ["csv-async", "lexical-core", "futures"] -io_csv_write = ["csv", "streaming-iterator", "lexical-core"] +io_csv_write = ["csv", "csv-core", "streaming-iterator", "lexical-core"] io_json = ["serde", "serde_json", "streaming-iterator", "fallible-streaming-iterator", "indexmap", "lexical-core"] io_ipc = ["arrow-format"] io_ipc_write_async = ["io_ipc", "futures"] diff --git a/benches/write_csv.rs b/benches/write_csv.rs index 0d341cea9d7..bfac8b61fec 100644 --- a/benches/write_csv.rs +++ b/benches/write_csv.rs @@ -14,9 +14,9 @@ fn write_batch(columns: &ChunkArc) -> Result<()> { let writer = &mut write::WriterBuilder::new().from_writer(vec![]); assert_eq!(columns.arrays().len(), 1); - write::write_header(writer, &["a"])?; - let options = write::SerializeOptions::default(); + write::write_header(writer, &["a"], &options)?; + write::write_chunk(writer, columns, &options) } diff --git a/examples/csv_write.rs b/examples/csv_write.rs index f9f73cfef53..70bc4f61835 100644 --- a/examples/csv_write.rs +++ b/examples/csv_write.rs @@ -8,9 +8,9 @@ use arrow2::{ fn write_batch>(path: &str, columns: &[Chunk]) -> Result<()> { let writer = &mut write::WriterBuilder::new().from_path(path)?; - write::write_header(writer, &["c1"])?; - let options = write::SerializeOptions::default(); + write::write_header(writer, &["c1"], &options)?; + columns .iter() .try_for_each(|batch| write::write_chunk(writer, batch, &options)) diff --git a/src/io/csv/write/mod.rs b/src/io/csv/write/mod.rs index 37d926f482e..72369db040c 100644 --- a/src/io/csv/write/mod.rs +++ b/src/io/csv/write/mod.rs @@ -6,7 +6,7 @@ use super::super::iterator::StreamingIterator; use std::io::Write; // re-export necessary public APIs from csv -pub use csv::{ByteRecord, Writer, WriterBuilder}; +pub use csv::{ByteRecord, WriterBuilder}; pub use serialize::*; @@ -48,33 +48,50 @@ pub fn serialize>( /// Writes [`Chunk`] to `writer` according to the serialization options `options`. pub fn write_chunk>( - writer: &mut Writer, + writer: &mut W, columns: &Chunk, options: &SerializeOptions, ) -> Result<()> { let mut serializers = new_serializers(columns.arrays(), options)?; let rows = columns.len(); - let mut record = ByteRecord::with_capacity(0, columns.arrays().len()); + let mut row = Vec::with_capacity(columns.arrays().len() * 10); // this is where the (expensive) transposition happens: the outer loop is on rows, the inner on columns (0..rows).try_for_each(|_| { serializers .iter_mut() // `unwrap` is infalible because `array.len()` equals `Chunk::len` - .for_each(|iter| record.push_field(iter.next().unwrap())); - writer.write_byte_record(&record)?; - record.clear(); + .for_each(|iter| { + let field = iter.next().unwrap(); + row.extend_from_slice(field); + row.push(options.delimiter); + }); + // replace last delimiter with new line + let last_byte = row.len() - 1; + row[last_byte] = b'\n'; + writer.write_all(&row)?; + row.clear(); Result::Ok(()) })?; Ok(()) } /// Writes a CSV header to `writer` -pub fn write_header(writer: &mut Writer, names: &[T]) -> Result<()> +pub fn write_header( + writer: &mut W, + names: &[T], + options: &SerializeOptions, +) -> Result<()> where T: AsRef, { - writer.write_record(names.iter().map(|x| x.as_ref().as_bytes()))?; + let names = names.iter().map(|x| x.as_ref()).collect::>(); + writer.write_all( + names + .join(std::str::from_utf8(&[options.delimiter]).unwrap()) + .as_bytes(), + )?; + writer.write_all(&[b'\n'])?; Ok(()) } diff --git a/src/io/csv/write/serialize.rs b/src/io/csv/write/serialize.rs index 660bde3da1b..647b95b0974 100644 --- a/src/io/csv/write/serialize.rs +++ b/src/io/csv/write/serialize.rs @@ -12,11 +12,12 @@ use crate::{ use super::super::super::iterator::{BufStreamingIterator, StreamingIterator}; use crate::array::{DictionaryArray, DictionaryKey, Offset}; +use csv_core::WriteResult; use std::any::Any; /// Options to serialize logical types to CSV /// The default is to format times and dates as `chrono` crate formats them. -#[derive(Debug, PartialEq, Eq, Hash, Clone, Default)] +#[derive(Debug, PartialEq, Eq, Hash, Clone)] pub struct SerializeOptions { /// used for [`DataType::Date32`] pub date32_format: Option, @@ -28,6 +29,21 @@ pub struct SerializeOptions { pub time64_format: Option, /// used for [`DataType::Timestamp`] pub timestamp_format: Option, + /// used as separator/delimiter + pub delimiter: u8, +} + +impl Default for SerializeOptions { + fn default() -> Self { + SerializeOptions { + date32_format: None, + date64_format: None, + time32_format: None, + time64_format: None, + timestamp_format: None, + delimiter: b',', + } + } } fn primitive_write<'a, T: NativeType + ToLexical>( @@ -337,11 +353,35 @@ pub fn new_serializer<'a>( } DataType::Utf8 => { let array = array.as_any().downcast_ref::>().unwrap(); + let mut local_buf = vec![0u8; 64]; + let mut ser_writer = csv_core::Writer::new(); + Box::new(BufStreamingIterator::new( array.iter(), - |x, buf| { - if let Some(x) = x { - buf.extend_from_slice(x.as_bytes()); + move |x, buf| { + match x { + // Empty strings are quoted. + // This will ensure a csv parser will not read them as missing + // in a delimited field + Some("") => buf.extend_from_slice(b"\"\""), + Some(s) => { + let bytes = s.as_bytes(); + buf.reserve(bytes.len() * 2); + + loop { + match ser_writer.field(s.as_bytes(), &mut local_buf) { + (WriteResult::OutputFull, _, _) => { + let additional = local_buf.len(); + local_buf.extend(std::iter::repeat(0u8).take(additional)) + } + (WriteResult::InputEmpty, _, n_out) => { + buf.extend_from_slice(&local_buf[..n_out]); + break; + } + } + } + } + _ => {} } }, vec![], @@ -349,11 +389,35 @@ pub fn new_serializer<'a>( } DataType::LargeUtf8 => { let array = array.as_any().downcast_ref::>().unwrap(); + let mut local_buf = vec![0u8; 64]; + let mut ser_writer = csv_core::Writer::new(); + Box::new(BufStreamingIterator::new( array.iter(), - |x, buf| { - if let Some(x) = x { - buf.extend_from_slice(x.as_bytes()); + move |x, buf| { + match x { + // Empty strings are quoted. + // This will ensure a csv parser will not read them as missing + // in a delimited field + Some("") => buf.extend_from_slice(b"\"\""), + Some(s) => { + let bytes = s.as_bytes(); + buf.reserve(bytes.len() * 2); + + loop { + match ser_writer.field(s.as_bytes(), &mut local_buf) { + (WriteResult::OutputFull, _, _) => { + let additional = local_buf.len(); + local_buf.extend(std::iter::repeat(0u8).take(additional)) + } + (WriteResult::InputEmpty, _, n_out) => { + buf.extend_from_slice(&local_buf[..n_out]); + break; + } + } + } + } + _ => {} } }, vec![], diff --git a/tests/it/io/csv/write.rs b/tests/it/io/csv/write.rs index 656a04f7839..6d8e27b9660 100644 --- a/tests/it/io/csv/write.rs +++ b/tests/it/io/csv/write.rs @@ -34,15 +34,18 @@ fn data() -> Chunk> { fn write_csv() -> Result<()> { let columns = data(); - let write = Cursor::new(Vec::::new()); - let mut writer = WriterBuilder::new().from_writer(write); - - write_header(&mut writer, &["c1", "c2", "c3", "c4", "c5", "c6", "c7"])?; + let mut writer = Cursor::new(Vec::::new()); let options = SerializeOptions::default(); + + write_header( + &mut writer, + &["c1", "c2", "c3", "c4", "c5", "c6", "c7"], + &options, + )?; write_chunk(&mut writer, &columns, &options)?; // check - let buffer = writer.into_inner().unwrap().into_inner(); + let buffer = writer.into_inner(); assert_eq!( r#"c1,c2,c3,c4,c5,c6,c7 a b,123.564532,3,true,,00:20:34,d @@ -59,18 +62,18 @@ d,-556132.25,1,,2019-04-18 02:45:55.555,23:46:03,c fn write_csv_custom_options() -> Result<()> { let batch = data(); - let write = Cursor::new(Vec::::new()); - let mut writer = WriterBuilder::new().delimiter(b'|').from_writer(write); + let mut writer = Cursor::new(Vec::::new()); let options = SerializeOptions { time32_format: Some("%r".to_string()), time64_format: Some("%r".to_string()), + delimiter: b'|', ..Default::default() }; write_chunk(&mut writer, &batch, &options)?; // check - let buffer = writer.into_inner().unwrap().into_inner(); + let buffer = writer.into_inner(); assert_eq!( r#"a b|123.564532|3|true||12:20:34 AM|d c||2|false|2019-04-18 10:54:47.378|06:51:20 AM|a b @@ -230,14 +233,13 @@ fn test_array( data: Vec<&'static str>, options: SerializeOptions, ) -> Result<()> { - let write = Cursor::new(Vec::::new()); - let mut writer = WriterBuilder::new().delimiter(b'|').from_writer(write); + let mut writer = Cursor::new(Vec::::new()); - write_header(&mut writer, &["c1"])?; + write_header(&mut writer, &["c1"], &options)?; write_chunk(&mut writer, &columns, &options)?; // check - let buffer = writer.into_inner().unwrap().into_inner(); + let buffer = writer.into_inner(); let mut expected = "c1\n".to_owned(); expected.push_str(&data.join("\n")); @@ -314,3 +316,20 @@ fn write_tz_timezone_formatted_tz() -> Result<()> { }, ) } + +#[test] +fn write_empty_and_missing() { + let a = Utf8Array::::from(&[Some(""), None]); + let b = Utf8Array::::from(&[None, Some("")]); + let columns = Chunk::new(vec![ + Arc::new(a) as Arc, + Arc::new(b) as Arc, + ]); + + let mut writer = vec![]; + let options = SerializeOptions::default(); + write_chunk(&mut writer, &columns, &options).unwrap(); + let csv = std::str::from_utf8(&writer).unwrap(); + + assert_eq!(csv, "\"\",\n,\"\"\n"); +} From d9cccffc22beacf7b966fcaadda5c6242e663614 Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Fri, 25 Feb 2022 11:25:16 +0100 Subject: [PATCH 2/5] add quote options --- src/io/csv/write/serialize.rs | 110 ++++++++++++++-------------------- 1 file changed, 44 insertions(+), 66 deletions(-) diff --git a/src/io/csv/write/serialize.rs b/src/io/csv/write/serialize.rs index 647b95b0974..9ac2581a20c 100644 --- a/src/io/csv/write/serialize.rs +++ b/src/io/csv/write/serialize.rs @@ -31,6 +31,8 @@ pub struct SerializeOptions { pub timestamp_format: Option, /// used as separator/delimiter pub delimiter: u8, + /// used as separator/delimiter + pub quote: u8, } impl Default for SerializeOptions { @@ -42,6 +44,7 @@ impl Default for SerializeOptions { time64_format: None, timestamp_format: None, delimiter: b',', + quote: b'"', } } } @@ -203,6 +206,45 @@ fn timestamp_with_tz<'a>( } } +fn new_utf8_serializer<'a, O: Offset>( + array: &'a Utf8Array, + options: &'a SerializeOptions, +) -> Box + 'a> { + let mut local_buf = vec![0u8; 64]; + let mut ser_writer = csv_core::WriterBuilder::new().quote(options.quote).build(); + + Box::new(BufStreamingIterator::new( + array.iter(), + move |x, buf| { + match x { + // Empty strings are quoted. + // This will ensure a csv parser will not read them as missing + // in a delimited field + Some("") => buf.extend_from_slice(b"\"\""), + Some(s) => { + let bytes = s.as_bytes(); + buf.reserve(bytes.len() * 2); + + loop { + match ser_writer.field(s.as_bytes(), &mut local_buf) { + (WriteResult::OutputFull, _, _) => { + let additional = local_buf.len(); + local_buf.extend(std::iter::repeat(0u8).take(additional)) + } + (WriteResult::InputEmpty, _, n_out) => { + buf.extend_from_slice(&local_buf[..n_out]); + break; + } + } + } + } + _ => {} + } + }, + vec![], + )) +} + /// Returns a [`StreamingIterator`] that yields `&[u8]` serialized from `array` according to `options`. /// For numeric types, this serializes as usual. For dates, times and timestamps, it uses `options` to /// Supported types: @@ -353,75 +395,11 @@ pub fn new_serializer<'a>( } DataType::Utf8 => { let array = array.as_any().downcast_ref::>().unwrap(); - let mut local_buf = vec![0u8; 64]; - let mut ser_writer = csv_core::Writer::new(); - - Box::new(BufStreamingIterator::new( - array.iter(), - move |x, buf| { - match x { - // Empty strings are quoted. - // This will ensure a csv parser will not read them as missing - // in a delimited field - Some("") => buf.extend_from_slice(b"\"\""), - Some(s) => { - let bytes = s.as_bytes(); - buf.reserve(bytes.len() * 2); - - loop { - match ser_writer.field(s.as_bytes(), &mut local_buf) { - (WriteResult::OutputFull, _, _) => { - let additional = local_buf.len(); - local_buf.extend(std::iter::repeat(0u8).take(additional)) - } - (WriteResult::InputEmpty, _, n_out) => { - buf.extend_from_slice(&local_buf[..n_out]); - break; - } - } - } - } - _ => {} - } - }, - vec![], - )) + new_utf8_serializer(array, options) } DataType::LargeUtf8 => { let array = array.as_any().downcast_ref::>().unwrap(); - let mut local_buf = vec![0u8; 64]; - let mut ser_writer = csv_core::Writer::new(); - - Box::new(BufStreamingIterator::new( - array.iter(), - move |x, buf| { - match x { - // Empty strings are quoted. - // This will ensure a csv parser will not read them as missing - // in a delimited field - Some("") => buf.extend_from_slice(b"\"\""), - Some(s) => { - let bytes = s.as_bytes(); - buf.reserve(bytes.len() * 2); - - loop { - match ser_writer.field(s.as_bytes(), &mut local_buf) { - (WriteResult::OutputFull, _, _) => { - let additional = local_buf.len(); - local_buf.extend(std::iter::repeat(0u8).take(additional)) - } - (WriteResult::InputEmpty, _, n_out) => { - buf.extend_from_slice(&local_buf[..n_out]); - break; - } - } - } - } - _ => {} - } - }, - vec![], - )) + new_utf8_serializer(array, options) } DataType::Binary => { let array = array.as_any().downcast_ref::>().unwrap(); From 2af74f0df446afe301bb5511d0c13b850017339c Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Fri, 25 Feb 2022 11:31:57 +0100 Subject: [PATCH 3/5] update examples --- examples/csv_write.rs | 6 +++--- examples/csv_write_parallel.rs | 13 ++++++------- src/io/csv/write/mod.rs | 33 +++++++++++++++++++++++---------- src/io/csv/write/serialize.rs | 2 +- 4 files changed, 33 insertions(+), 21 deletions(-) diff --git a/examples/csv_write.rs b/examples/csv_write.rs index 70bc4f61835..6a40fb7b515 100644 --- a/examples/csv_write.rs +++ b/examples/csv_write.rs @@ -6,14 +6,14 @@ use arrow2::{ }; fn write_batch>(path: &str, columns: &[Chunk]) -> Result<()> { - let writer = &mut write::WriterBuilder::new().from_path(path)?; + let mut writer = std::fs::File::create(path)?; let options = write::SerializeOptions::default(); - write::write_header(writer, &["c1"], &options)?; + write::write_header(&mut writer, &["c1"], &options)?; columns .iter() - .try_for_each(|batch| write::write_chunk(writer, batch, &options)) + .try_for_each(|batch| write::write_chunk(&mut writer, batch, &options)) } fn main() -> Result<()> { diff --git a/examples/csv_write_parallel.rs b/examples/csv_write_parallel.rs index f616a1b8292..5611d39b1b3 100644 --- a/examples/csv_write_parallel.rs +++ b/examples/csv_write_parallel.rs @@ -1,3 +1,4 @@ +use std::io::Write; use std::sync::mpsc; use std::sync::mpsc::{Receiver, Sender}; use std::sync::Arc; @@ -14,8 +15,8 @@ fn parallel_write(path: &str, batches: [Chunk>; 2]) -> Result<()> let options = write::SerializeOptions::default(); // write a header - let writer = &mut write::WriterBuilder::new().from_path(path)?; - write::write_header(writer, &["c1"])?; + let mut writer = std::fs::File::create(path)?; + write::write_header(&mut writer, &["c1"], &options)?; // prepare a channel to send serialized records from threads let (tx, rx): (Sender<_>, Receiver<_>) = mpsc::channel(); @@ -28,8 +29,8 @@ fn parallel_write(path: &str, batches: [Chunk>; 2]) -> Result<()> let options = options.clone(); let batch = batches[id].clone(); // note: this is cheap let child = thread::spawn(move || { - let records = write::serialize(&batch, &options).unwrap(); - thread_tx.send(records).unwrap(); + let rows = write::serialize(&batch, &options).unwrap(); + thread_tx.send(rows).unwrap(); }); children.push(child); @@ -38,9 +39,7 @@ fn parallel_write(path: &str, batches: [Chunk>; 2]) -> Result<()> for _ in 0..2 { // block: assumes that the order of batches matter. let records = rx.recv().unwrap(); - records - .iter() - .try_for_each(|record| writer.write_byte_record(record))? + records.iter().try_for_each(|row| writer.write_all(&row))? } for child in children { diff --git a/src/io/csv/write/mod.rs b/src/io/csv/write/mod.rs index 72369db040c..2a1fb313ae2 100644 --- a/src/io/csv/write/mod.rs +++ b/src/io/csv/write/mod.rs @@ -26,24 +26,37 @@ fn new_serializers<'a, A: AsRef>( .collect() } -/// Serializes [`Chunk`] to a vector of `ByteRecord`. +/// Serializes [`Chunk`] to a vector of rows. /// The vector is guaranteed to have `columns.len()` entries. -/// Each `ByteRecord` is guaranteed to have `columns.array().len()` fields. +/// Each `row` is guaranteed to have `columns.array().len()` fields. pub fn serialize>( columns: &Chunk, options: &SerializeOptions, -) -> Result> { +) -> Result>> { let mut serializers = new_serializers(columns, options)?; - let rows = columns.len(); - let mut records = vec![ByteRecord::with_capacity(0, columns.arrays().len()); rows]; - records.iter_mut().for_each(|record| { + let mut rows = Vec::with_capacity(columns.len()); + let mut row = vec![]; + + // this is where the (expensive) transposition happens: the outer loop is on rows, the inner on columns + (0..columns.len()).try_for_each(|_| { serializers .iter_mut() - // `unwrap` is infalible because `array.len()` equals `len` in `Chunk::len` - .for_each(|iter| record.push_field(iter.next().unwrap())); - }); - Ok(records) + // `unwrap` is infalible because `array.len()` equals `Chunk::len` + .for_each(|iter| { + let field = iter.next().unwrap(); + row.extend_from_slice(field); + row.push(options.delimiter); + }); + // replace last delimiter with new line + let last_byte = row.len() - 1; + row[last_byte] = b'\n'; + rows.push(row.clone()); + row.clear(); + Result::Ok(()) + })?; + + Ok(rows) } /// Writes [`Chunk`] to `writer` according to the serialization options `options`. diff --git a/src/io/csv/write/serialize.rs b/src/io/csv/write/serialize.rs index 9ac2581a20c..f1857f91025 100644 --- a/src/io/csv/write/serialize.rs +++ b/src/io/csv/write/serialize.rs @@ -31,7 +31,7 @@ pub struct SerializeOptions { pub timestamp_format: Option, /// used as separator/delimiter pub delimiter: u8, - /// used as separator/delimiter + /// quoting character pub quote: u8, } From 359a776a335213ff79b85e29d2f4059f9d780f8a Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Fri, 25 Feb 2022 15:48:38 +0100 Subject: [PATCH 4/5] fix bench --- benches/write_csv.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/benches/write_csv.rs b/benches/write_csv.rs index bfac8b61fec..faba0701c65 100644 --- a/benches/write_csv.rs +++ b/benches/write_csv.rs @@ -11,13 +11,13 @@ use arrow2::util::bench_util::*; type ChunkArc = Chunk>; fn write_batch(columns: &ChunkArc) -> Result<()> { - let writer = &mut write::WriterBuilder::new().from_writer(vec![]); + let mut writer = vec![]; assert_eq!(columns.arrays().len(), 1); let options = write::SerializeOptions::default(); - write::write_header(writer, &["a"], &options)?; + write::write_header(&mut writer, &["a"], &options)?; - write::write_chunk(writer, columns, &options) + write::write_chunk(&mut writer, columns, &options) } fn make_chunk(array: impl Array + 'static) -> Chunk> { From bbd900d0c1ecb289255ffeaeac2792a557b3ef30 Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Sat, 26 Feb 2022 09:22:38 +0100 Subject: [PATCH 5/5] minor changes --- src/io/csv/write/mod.rs | 12 +++++++----- src/io/csv/write/serialize.rs | 25 ++++++++++--------------- 2 files changed, 17 insertions(+), 20 deletions(-) diff --git a/src/io/csv/write/mod.rs b/src/io/csv/write/mod.rs index 2a1fb313ae2..275705c744e 100644 --- a/src/io/csv/write/mod.rs +++ b/src/io/csv/write/mod.rs @@ -48,11 +48,13 @@ pub fn serialize>( row.extend_from_slice(field); row.push(options.delimiter); }); - // replace last delimiter with new line - let last_byte = row.len() - 1; - row[last_byte] = b'\n'; - rows.push(row.clone()); - row.clear(); + if !row.is_empty() { + // replace last delimiter with new line + let last_byte = row.len() - 1; + row[last_byte] = b'\n'; + rows.push(row.clone()); + row.clear(); + } Result::Ok(()) })?; diff --git a/src/io/csv/write/serialize.rs b/src/io/csv/write/serialize.rs index f1857f91025..22f26e5a27a 100644 --- a/src/io/csv/write/serialize.rs +++ b/src/io/csv/write/serialize.rs @@ -221,23 +221,18 @@ fn new_utf8_serializer<'a, O: Offset>( // This will ensure a csv parser will not read them as missing // in a delimited field Some("") => buf.extend_from_slice(b"\"\""), - Some(s) => { - let bytes = s.as_bytes(); - buf.reserve(bytes.len() * 2); - - loop { - match ser_writer.field(s.as_bytes(), &mut local_buf) { - (WriteResult::OutputFull, _, _) => { - let additional = local_buf.len(); - local_buf.extend(std::iter::repeat(0u8).take(additional)) - } - (WriteResult::InputEmpty, _, n_out) => { - buf.extend_from_slice(&local_buf[..n_out]); - break; - } + Some(s) => loop { + match ser_writer.field(s.as_bytes(), &mut local_buf) { + (WriteResult::OutputFull, _, _) => { + let additional = local_buf.len(); + local_buf.extend(std::iter::repeat(0u8).take(additional)) + } + (WriteResult::InputEmpty, _, n_out) => { + buf.extend_from_slice(&local_buf[..n_out]); + break; } } - } + }, _ => {} } },