Skip to content

Commit

Permalink
fix(source): fix csv parser (#7147)
Browse files Browse the repository at this point in the history
fix(source): fix csv parser (#7145)

- Fix the bug that the csv parser can't parse the last line when the last line is missing a line break.

Approved-By: huangjw806
  • Loading branch information
waruto210 authored Dec 30, 2022
1 parent f0eacd4 commit a82fca6
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 12 deletions.
6 changes: 6 additions & 0 deletions src/source/src/fs_connector_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,12 @@ impl FsConnectorSourceReader {

// If a split is finished reading, recreate a parser
if content.len() + msg.offset >= msg.split_size {
// the last record in a file may be missing the terminator,
// so we need to pass an empty payload to inform the parser.
if let Err(e) = parser.parse(&mut buff, builder.row_writer()).await {
tracing::warn!("message parsing failed {}, skipping", e.to_string());
}

parser = ByteStreamSourceParserImpl::create(
&self.format,
&self.properties,
Expand Down
45 changes: 33 additions & 12 deletions src/source/src/parser/csv_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,18 +82,20 @@ impl CsvParser {
let length = self.ends.len();
self.ends.resize(length * 2, 0);
}
csv_core::ReadRecordResult::End => {
break Ok(None);
}
// Success cases
csv_core::ReadRecordResult::Record => {
csv_core::ReadRecordResult::Record | csv_core::ReadRecordResult::End => {
// skip the header
if self.next_row_is_header {
self.next_row_is_header = false;
self.reset_cursor();
continue;
}
let ends_cursor = self.ends_cursor;
// caller provides an empty chunk, and there is no data
// in inner buffer
if ends_cursor <= 1 {
break Ok(None);
}
self.reset_cursor();

let string_columns = (1..ends_cursor)
Expand Down Expand Up @@ -121,11 +123,6 @@ impl CsvParser {
mut writer: SourceStreamChunkRowWriter<'_>,
) -> Result<Option<WriteGuard>> {
let columns_string = match self.parse_columns_to_strings(payload)? {
// parse error, we should reset the internal state to skip this record
// Err(e) => {
// self.reset_cursor();
// return Err(e);
// }
None => return Ok(None),
Some(strings) => strings,
};
Expand Down Expand Up @@ -193,17 +190,41 @@ mod tests {

use super::*;
#[tokio::test]
async fn test_csv_parser() {
async fn test_csv_parser_without_last_line_break() {
let mut parser = CsvParser::new(b',', true).unwrap();
let data = b"name,age\npite,20\nalex,10\n";
let data = b"
name,age
pite,20
alex,10";
let mut part1 = &data[0..data.len() - 1];
let mut part2 = &data[data.len() - 1..data.len()];
let line1 = parser.parse_columns_to_strings(&mut part1).unwrap();
println!("{:?}", line1);
assert!(line1.is_some());
println!("{:?}", line1);
let line2 = parser.parse_columns_to_strings(&mut part1).unwrap();
assert!(line2.is_none());
let line2 = parser.parse_columns_to_strings(&mut part2).unwrap();
assert!(line2.is_none());
let line2 = parser.parse_columns_to_strings(&mut part2).unwrap();
assert!(line2.is_some());
println!("{:?}", line2);
}

#[tokio::test]
async fn test_csv_parser_with_last_line_break() {
let mut parser = CsvParser::new(b',', true).unwrap();
let data = b"
name,age
pite,20
alex,10
";
let mut part1 = &data[0..data.len() - 1];
let mut part2 = &data[data.len() - 1..data.len()];
let line1 = parser.parse_columns_to_strings(&mut part1).unwrap();
assert!(line1.is_some());
println!("{:?}", line1);
let line2 = parser.parse_columns_to_strings(&mut part1).unwrap();
assert!(line2.is_none());
let line2 = parser.parse_columns_to_strings(&mut part2).unwrap();
assert!(line2.is_some());
println!("{:?}", line2);
Expand Down

0 comments on commit a82fca6

Please sign in to comment.