From 72c2695201957a4eda72e7f67ffb509b27ec007a Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Thu, 1 Feb 2024 18:54:24 +0100 Subject: [PATCH] GH-39857: [C++] Improve error message for "chunker out of sync" condition --- cpp/src/arrow/csv/parser_test.cc | 20 ++++++++++++++++++++ cpp/src/arrow/csv/reader.cc | 29 +++++++++++++++++++++++++---- python/pyarrow/tests/test_csv.py | 22 ++++++++++++++++++++++ 3 files changed, 67 insertions(+), 4 deletions(-) diff --git a/cpp/src/arrow/csv/parser_test.cc b/cpp/src/arrow/csv/parser_test.cc index 960a69c59db5d..41c315165b97d 100644 --- a/cpp/src/arrow/csv/parser_test.cc +++ b/cpp/src/arrow/csv/parser_test.cc @@ -175,6 +175,13 @@ void AssertParsePartial(BlockParser& parser, const std::string& str, ASSERT_EQ(parsed_size, expected_size); } +void AssertParsePartial(BlockParser& parser, const std::vector& data, + uint32_t expected_size) { + uint32_t parsed_size = static_cast(-1); + ASSERT_OK(parser.Parse(data, &parsed_size)); + ASSERT_EQ(parsed_size, expected_size); +} + void AssertLastRowEq(const BlockParser& parser, const std::vector& expected) { std::vector values; @@ -376,6 +383,19 @@ TEST(BlockParser, TruncatedData) { } } +TEST(BlockParser, TruncatedDataViews) { + // If non-last block is truncated, parsing stops + BlockParser parser(ParseOptions::Defaults(), /*num_cols=*/3); + AssertParsePartial(parser, Views({"a,b,", "c\n"}), 0); + // (XXX should we guarantee this one below?) + AssertParsePartial(parser, Views({"a,b,c\nd,", "e,f\n"}), 6); + + // More sophisticated: non-last block ends on some newline inside a quoted string + // (terse reproducer of gh-39857) + AssertParsePartial(parser, Views({"a,b,\"c\n", "\"\n"}), 0); + AssertParsePartial(parser, Views({"a,b,c\n\"d\n", "\",e,f\n"}), 6); +} + TEST(BlockParser, Final) { // Tests for ParseFinal() BlockParser parser(ParseOptions::Defaults()); diff --git a/cpp/src/arrow/csv/reader.cc b/cpp/src/arrow/csv/reader.cc index 332fad054fea3..fc761223da820 100644 --- a/cpp/src/arrow/csv/reader.cc +++ b/cpp/src/arrow/csv/reader.cc @@ -262,10 +262,6 @@ class SerialBlockReader : public BlockReader { next_buffer](int64_t nbytes) -> Status { DCHECK_GE(nbytes, 0); auto offset = nbytes - bytes_before_buffer; - if (offset < 0) { - // Should not happen - return Status::Invalid("CSV parser got out of sync with chunker"); - } partial_ = SliceBuffer(buffer_, offset); buffer_ = next_buffer; return Status::OK(); @@ -400,6 +396,7 @@ class BlockParsingOperator { count_rows_(first_row >= 0), num_rows_seen_(first_row) {} + // TODO: this is almost entirely the same as ReaderMixin::Parse(). Refactor? Result operator()(const CSVBlock& block) { constexpr int32_t max_num_rows = std::numeric_limits::max(); auto parser = std::make_shared( @@ -427,9 +424,24 @@ class BlockParsingOperator { } else { RETURN_NOT_OK(parser->Parse(views, &parsed_size)); } + + // `partial + completion` should have been entirely consumed. + const int64_t bytes_before_buffer = block.partial->size() + block.completion->size(); + if (static_cast(parsed_size) < bytes_before_buffer) { + // This can happen if `newlines_in_values` is not enabled and + // `partial + completion` ends with a newline inside a quoted string. + // In this case, the BlockParser stops at the truncated data in the first + // block (see gh-39857). + return Status::Invalid( + "CSV parser got out of sync with chunker. This can mean the data file " + "contains cell values spanning multiple lines; please consider enabling " + "the option 'newlines_in_values'."); + } + if (count_rows_) { num_rows_seen_ += parser->total_num_rows(); } + RETURN_NOT_OK(block.consume_bytes(parsed_size)); return ParsedBlock{std::move(parser), block.block_index, static_cast(parsed_size) + block.bytes_skipped}; @@ -738,6 +750,15 @@ class ReaderMixin { } else { RETURN_NOT_OK(parser->Parse(views, &parsed_size)); } + // See BlockParsingOperator for explanation. + const int64_t bytes_before_buffer = partial->size() + completion->size(); + if (static_cast(parsed_size) < bytes_before_buffer) { + return Status::Invalid( + "CSV parser got out of sync with chunker. This can mean the data file " + "contains cell values spanning multiple lines; please consider enabling " + "the option 'newlines_in_values'."); + } + if (count_rows_) { num_rows_seen_ += parser->total_num_rows(); } diff --git a/python/pyarrow/tests/test_csv.py b/python/pyarrow/tests/test_csv.py index 31f24187e3b37..8ebe748e88d4b 100644 --- a/python/pyarrow/tests/test_csv.py +++ b/python/pyarrow/tests/test_csv.py @@ -667,6 +667,28 @@ def row_num(x): 'b': ["e", "j"], } + def test_chunker_out_of_sync(self): + # GH-39892: if there are newlines in values, the parser may become + # out of sync with the chunker. In this case, we try to produce an + # informative error message. + rows = b"""a,b,c\nd,e,"f\n"\ng,h,i\n""" + expected = { + 'a': ["d", "g"], + 'b': ["e", "h"], + 'c': ["f\n", "i"], + } + for block_size in range(8, 15): + # Sanity check: parsing works with newlines_in_values=True + d = self.read_bytes( + rows, parse_options=ParseOptions(newlines_in_values=True), + read_options=ReadOptions(block_size=block_size)).to_pydict() + assert d == expected + for block_size in range(8, 11): + with pytest.raises(ValueError, + match="cell values spanning multiple lines"): + self.read_bytes( + rows, read_options=ReadOptions(block_size=block_size)) + class BaseCSVTableRead(BaseTestCSV):