From a82fca6db8b2072815e9613cd71049cfffe343ac Mon Sep 17 00:00:00 2001 From: waruto Date: Fri, 30 Dec 2022 15:55:16 +0800 Subject: [PATCH] fix(source): fix csv parser (#7147) fix(source): fix csv parser (#7145) - Fix the bug that the csv parser can't parse the last line when the last line is missing a line break. Approved-By: huangjw806 --- src/source/src/fs_connector_source.rs | 6 ++++ src/source/src/parser/csv_parser.rs | 45 ++++++++++++++++++++------- 2 files changed, 39 insertions(+), 12 deletions(-) diff --git a/src/source/src/fs_connector_source.rs b/src/source/src/fs_connector_source.rs index cb7fe8ac47e75..28d3c91a40232 100644 --- a/src/source/src/fs_connector_source.rs +++ b/src/source/src/fs_connector_source.rs @@ -87,6 +87,12 @@ impl FsConnectorSourceReader { // If a split is finished reading, recreate a parser if content.len() + msg.offset >= msg.split_size { + // the last record in a file may be missing the terminator, + // so we need to pass an empty payload to inform the parser. + if let Err(e) = parser.parse(&mut buff, builder.row_writer()).await { + tracing::warn!("message parsing failed {}, skipping", e.to_string()); + } + parser = ByteStreamSourceParserImpl::create( &self.format, &self.properties, diff --git a/src/source/src/parser/csv_parser.rs b/src/source/src/parser/csv_parser.rs index 33bc8b836f64d..0064f6306be47 100644 --- a/src/source/src/parser/csv_parser.rs +++ b/src/source/src/parser/csv_parser.rs @@ -82,11 +82,8 @@ impl CsvParser { let length = self.ends.len(); self.ends.resize(length * 2, 0); } - csv_core::ReadRecordResult::End => { - break Ok(None); - } // Success cases - csv_core::ReadRecordResult::Record => { + csv_core::ReadRecordResult::Record | csv_core::ReadRecordResult::End => { // skip the header if self.next_row_is_header { self.next_row_is_header = false; @@ -94,6 +91,11 @@ impl CsvParser { continue; } let ends_cursor = self.ends_cursor; + // caller provides an empty chunk, and there is no data + // in inner buffer + if ends_cursor <= 1 { + break Ok(None); + } self.reset_cursor(); let string_columns = (1..ends_cursor) @@ -121,11 +123,6 @@ impl CsvParser { mut writer: SourceStreamChunkRowWriter<'_>, ) -> Result> { let columns_string = match self.parse_columns_to_strings(payload)? { - // parse error, we should reset the internal state to skip this record - // Err(e) => { - // self.reset_cursor(); - // return Err(e); - // } None => return Ok(None), Some(strings) => strings, }; @@ -193,17 +190,41 @@ mod tests { use super::*; #[tokio::test] - async fn test_csv_parser() { + async fn test_csv_parser_without_last_line_break() { let mut parser = CsvParser::new(b',', true).unwrap(); - let data = b"name,age\npite,20\nalex,10\n"; + let data = b" +name,age +pite,20 +alex,10"; let mut part1 = &data[0..data.len() - 1]; let mut part2 = &data[data.len() - 1..data.len()]; let line1 = parser.parse_columns_to_strings(&mut part1).unwrap(); - println!("{:?}", line1); assert!(line1.is_some()); + println!("{:?}", line1); let line2 = parser.parse_columns_to_strings(&mut part1).unwrap(); assert!(line2.is_none()); + let line2 = parser.parse_columns_to_strings(&mut part2).unwrap(); + assert!(line2.is_none()); + let line2 = parser.parse_columns_to_strings(&mut part2).unwrap(); + assert!(line2.is_some()); println!("{:?}", line2); + } + + #[tokio::test] + async fn test_csv_parser_with_last_line_break() { + let mut parser = CsvParser::new(b',', true).unwrap(); + let data = b" +name,age +pite,20 +alex,10 +"; + let mut part1 = &data[0..data.len() - 1]; + let mut part2 = &data[data.len() - 1..data.len()]; + let line1 = parser.parse_columns_to_strings(&mut part1).unwrap(); + assert!(line1.is_some()); + println!("{:?}", line1); + let line2 = parser.parse_columns_to_strings(&mut part1).unwrap(); + assert!(line2.is_none()); let line2 = parser.parse_columns_to_strings(&mut part2).unwrap(); assert!(line2.is_some()); println!("{:?}", line2);