Skip to content

Commit

Permalink
apacheGH-39857: [C++] Improve error message for "chunker out of sync"…
Browse files Browse the repository at this point in the history
… condition (apache#39892)

### Rationale for this change

When writing the CSV reader, we thought that the parser not finding the same line limits as the chunker should never happen, hence the terse "chunker out of sync" error message.

It turns out that, if the input contains multiline cell values and the `newlines_in_values` option was not enabled, the chunker can happily delimit a block on a newline that's inside a quoted string. The parser will then see truncated data and will stop parsing, yielding a parsed size that's smaller than the first block (see added comment in the code).

### What changes are included in this PR?

* Add some parser tests that showcase the condition encountered in apacheGH-39857
* Improve error message to guide users towards the solution

### Are these changes tested?

There's no functional change, the error message itself isn't tested.

### Are there any user-facing changes?

No.

* Closes: apache#39857

Authored-by: Antoine Pitrou <antoine@python.org>
Signed-off-by: Antoine Pitrou <antoine@python.org>
  • Loading branch information
pitrou authored and thisisnic committed Mar 8, 2024
1 parent e714539 commit db71a29
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 5 deletions.
22 changes: 22 additions & 0 deletions cpp/src/arrow/csv/parser_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,13 @@ void AssertParsePartial(BlockParser& parser, const std::string& str,
ASSERT_EQ(parsed_size, expected_size);
}

void AssertParsePartial(BlockParser& parser, const std::vector<std::string_view>& data,
uint32_t expected_size) {
uint32_t parsed_size = static_cast<uint32_t>(-1);
ASSERT_OK(parser.Parse(data, &parsed_size));
ASSERT_EQ(parsed_size, expected_size);
}

void AssertLastRowEq(const BlockParser& parser,
const std::vector<std::string>& expected) {
std::vector<std::string> values;
Expand Down Expand Up @@ -376,6 +383,21 @@ TEST(BlockParser, TruncatedData) {
}
}

TEST(BlockParser, TruncatedDataViews) {
// The BlockParser API mandates that, when passing a vector of views,
// only the last view may be a truncated CSV block.
// In the current implementation, receiving a truncated non-last view
// simply stops parsing after that view.
BlockParser parser(ParseOptions::Defaults(), /*num_cols=*/3);
AssertParsePartial(parser, Views({"a,b,", "c\n"}), 0);
AssertParsePartial(parser, Views({"a,b,c\nd,", "e,f\n"}), 6);

// More sophisticated: non-last block ends on some newline inside a quoted string
// (terse reproducer of gh-39857)
AssertParsePartial(parser, Views({"a,b,\"c\n", "\"\n"}), 0);
AssertParsePartial(parser, Views({"a,b,c\n\"d\n", "\",e,f\n"}), 6);
}

TEST(BlockParser, Final) {
// Tests for ParseFinal()
BlockParser parser(ParseOptions::Defaults());
Expand Down
34 changes: 29 additions & 5 deletions cpp/src/arrow/csv/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -261,11 +261,10 @@ class SerialBlockReader : public BlockReader {
auto consume_bytes = [this, bytes_before_buffer,
next_buffer](int64_t nbytes) -> Status {
DCHECK_GE(nbytes, 0);
auto offset = nbytes - bytes_before_buffer;
if (offset < 0) {
// Should not happen
return Status::Invalid("CSV parser got out of sync with chunker");
}
int64_t offset = nbytes - bytes_before_buffer;
// All data before the buffer should have been consumed.
// This is checked in Parse() and BlockParsingOperator::operator().
DCHECK_GE(offset, 0);
partial_ = SliceBuffer(buffer_, offset);
buffer_ = next_buffer;
return Status::OK();
Expand Down Expand Up @@ -400,6 +399,7 @@ class BlockParsingOperator {
count_rows_(first_row >= 0),
num_rows_seen_(first_row) {}

// TODO: this is almost entirely the same as ReaderMixin::Parse(). Refactor?
Result<ParsedBlock> operator()(const CSVBlock& block) {
constexpr int32_t max_num_rows = std::numeric_limits<int32_t>::max();
auto parser = std::make_shared<BlockParser>(
Expand Down Expand Up @@ -427,9 +427,24 @@ class BlockParsingOperator {
} else {
RETURN_NOT_OK(parser->Parse(views, &parsed_size));
}

// `partial + completion` should have been entirely consumed.
const int64_t bytes_before_buffer = block.partial->size() + block.completion->size();
if (static_cast<int64_t>(parsed_size) < bytes_before_buffer) {
// This can happen if `newlines_in_values` is not enabled and
// `partial + completion` ends with a newline inside a quoted string.
// In this case, the BlockParser stops at the truncated data in the first
// block (see gh-39857).
return Status::Invalid(
"CSV parser got out of sync with chunker. This can mean the data file "
"contains cell values spanning multiple lines; please consider enabling "
"the option 'newlines_in_values'.");
}

if (count_rows_) {
num_rows_seen_ += parser->total_num_rows();
}

RETURN_NOT_OK(block.consume_bytes(parsed_size));
return ParsedBlock{std::move(parser), block.block_index,
static_cast<int64_t>(parsed_size) + block.bytes_skipped};
Expand Down Expand Up @@ -738,6 +753,15 @@ class ReaderMixin {
} else {
RETURN_NOT_OK(parser->Parse(views, &parsed_size));
}
// See BlockParsingOperator for explanation.
const int64_t bytes_before_buffer = partial->size() + completion->size();
if (static_cast<int64_t>(parsed_size) < bytes_before_buffer) {
return Status::Invalid(
"CSV parser got out of sync with chunker. This can mean the data file "
"contains cell values spanning multiple lines; please consider enabling "
"the option 'newlines_in_values'.");
}

if (count_rows_) {
num_rows_seen_ += parser->total_num_rows();
}
Expand Down
25 changes: 25 additions & 0 deletions python/pyarrow/tests/test_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,31 @@ def row_num(x):
'b': ["e", "j"],
}

def test_chunker_out_of_sync(self):
# GH-39892: if there are newlines in values, the parser may become
# out of sync with the chunker. In this case, we try to produce an
# informative error message.
rows = b"""a,b,c\nd,e,"f\n"\ng,h,i\n"""
expected = {
'a': ["d", "g"],
'b': ["e", "h"],
'c': ["f\n", "i"],
}
for block_size in range(8, 15):
# Sanity check: parsing works with newlines_in_values=True
d = self.read_bytes(
rows, parse_options=ParseOptions(newlines_in_values=True),
read_options=ReadOptions(block_size=block_size)).to_pydict()
assert d == expected
# With these block sizes, a block would end on the physical newline
# inside the quoted cell value, leading to a mismatch between
# CSV chunker and parser.
for block_size in range(8, 11):
with pytest.raises(ValueError,
match="cell values spanning multiple lines"):
self.read_bytes(
rows, read_options=ReadOptions(block_size=block_size))


class BaseCSVTableRead(BaseTestCSV):

Expand Down

0 comments on commit db71a29

Please sign in to comment.