Skip to content

Commit

Permalink
apacheGH-39857: [C++] Improve error message for "chunker out of sync"…
Browse files Browse the repository at this point in the history
… condition
  • Loading branch information
pitrou committed Feb 5, 2024
1 parent 85e2a68 commit 72c2695
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 4 deletions.
20 changes: 20 additions & 0 deletions cpp/src/arrow/csv/parser_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,13 @@ void AssertParsePartial(BlockParser& parser, const std::string& str,
ASSERT_EQ(parsed_size, expected_size);
}

void AssertParsePartial(BlockParser& parser, const std::vector<std::string_view>& data,
uint32_t expected_size) {
uint32_t parsed_size = static_cast<uint32_t>(-1);
ASSERT_OK(parser.Parse(data, &parsed_size));
ASSERT_EQ(parsed_size, expected_size);
}

void AssertLastRowEq(const BlockParser& parser,
const std::vector<std::string>& expected) {
std::vector<std::string> values;
Expand Down Expand Up @@ -376,6 +383,19 @@ TEST(BlockParser, TruncatedData) {
}
}

TEST(BlockParser, TruncatedDataViews) {
// If non-last block is truncated, parsing stops
BlockParser parser(ParseOptions::Defaults(), /*num_cols=*/3);
AssertParsePartial(parser, Views({"a,b,", "c\n"}), 0);
// (XXX should we guarantee this one below?)
AssertParsePartial(parser, Views({"a,b,c\nd,", "e,f\n"}), 6);

// More sophisticated: non-last block ends on some newline inside a quoted string
// (terse reproducer of gh-39857)
AssertParsePartial(parser, Views({"a,b,\"c\n", "\"\n"}), 0);
AssertParsePartial(parser, Views({"a,b,c\n\"d\n", "\",e,f\n"}), 6);
}

TEST(BlockParser, Final) {
// Tests for ParseFinal()
BlockParser parser(ParseOptions::Defaults());
Expand Down
29 changes: 25 additions & 4 deletions cpp/src/arrow/csv/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -262,10 +262,6 @@ class SerialBlockReader : public BlockReader {
next_buffer](int64_t nbytes) -> Status {
DCHECK_GE(nbytes, 0);
auto offset = nbytes - bytes_before_buffer;
if (offset < 0) {
// Should not happen
return Status::Invalid("CSV parser got out of sync with chunker");
}
partial_ = SliceBuffer(buffer_, offset);
buffer_ = next_buffer;
return Status::OK();
Expand Down Expand Up @@ -400,6 +396,7 @@ class BlockParsingOperator {
count_rows_(first_row >= 0),
num_rows_seen_(first_row) {}

// TODO: this is almost entirely the same as ReaderMixin::Parse(). Refactor?
Result<ParsedBlock> operator()(const CSVBlock& block) {
constexpr int32_t max_num_rows = std::numeric_limits<int32_t>::max();
auto parser = std::make_shared<BlockParser>(
Expand Down Expand Up @@ -427,9 +424,24 @@ class BlockParsingOperator {
} else {
RETURN_NOT_OK(parser->Parse(views, &parsed_size));
}

// `partial + completion` should have been entirely consumed.
const int64_t bytes_before_buffer = block.partial->size() + block.completion->size();
if (static_cast<int64_t>(parsed_size) < bytes_before_buffer) {
// This can happen if `newlines_in_values` is not enabled and
// `partial + completion` ends with a newline inside a quoted string.
// In this case, the BlockParser stops at the truncated data in the first
// block (see gh-39857).
return Status::Invalid(
"CSV parser got out of sync with chunker. This can mean the data file "
"contains cell values spanning multiple lines; please consider enabling "
"the option 'newlines_in_values'.");
}

if (count_rows_) {
num_rows_seen_ += parser->total_num_rows();
}

RETURN_NOT_OK(block.consume_bytes(parsed_size));
return ParsedBlock{std::move(parser), block.block_index,
static_cast<int64_t>(parsed_size) + block.bytes_skipped};
Expand Down Expand Up @@ -738,6 +750,15 @@ class ReaderMixin {
} else {
RETURN_NOT_OK(parser->Parse(views, &parsed_size));
}
// See BlockParsingOperator for explanation.
const int64_t bytes_before_buffer = partial->size() + completion->size();
if (static_cast<int64_t>(parsed_size) < bytes_before_buffer) {
return Status::Invalid(
"CSV parser got out of sync with chunker. This can mean the data file "
"contains cell values spanning multiple lines; please consider enabling "
"the option 'newlines_in_values'.");
}

if (count_rows_) {
num_rows_seen_ += parser->total_num_rows();
}
Expand Down
22 changes: 22 additions & 0 deletions python/pyarrow/tests/test_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,28 @@ def row_num(x):
'b': ["e", "j"],
}

def test_chunker_out_of_sync(self):
# GH-39892: if there are newlines in values, the parser may become
# out of sync with the chunker. In this case, we try to produce an
# informative error message.
rows = b"""a,b,c\nd,e,"f\n"\ng,h,i\n"""
expected = {
'a': ["d", "g"],
'b': ["e", "h"],
'c': ["f\n", "i"],
}
for block_size in range(8, 15):
# Sanity check: parsing works with newlines_in_values=True
d = self.read_bytes(
rows, parse_options=ParseOptions(newlines_in_values=True),
read_options=ReadOptions(block_size=block_size)).to_pydict()
assert d == expected
for block_size in range(8, 11):
with pytest.raises(ValueError,
match="cell values spanning multiple lines"):
self.read_bytes(
rows, read_options=ReadOptions(block_size=block_size))


class BaseCSVTableRead(BaseTestCSV):

Expand Down

0 comments on commit 72c2695

Please sign in to comment.