Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-39962: [C++] Small CSV reader refactoring #39963

Merged
merged 2 commits into from
Feb 8, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 45 additions & 99 deletions cpp/src/arrow/csv/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -445,16 +445,20 @@ class BlockParsingOperator {
num_rows_seen_ += parser->total_num_rows();
}

RETURN_NOT_OK(block.consume_bytes(parsed_size));
if (block.consume_bytes) {
RETURN_NOT_OK(block.consume_bytes(parsed_size));
}
return ParsedBlock{std::move(parser), block.block_index,
static_cast<int64_t>(parsed_size) + block.bytes_skipped};
}

int num_csv_cols() const { return num_csv_cols_; }

private:
io::IOContext io_context_;
ParseOptions parse_options_;
int num_csv_cols_;
bool count_rows_;
const ParseOptions parse_options_;
const int num_csv_cols_;
const bool count_rows_;
int64_t num_rows_seen_;
};

Expand Down Expand Up @@ -570,7 +574,6 @@ class ReaderMixin {
parse_options_(parse_options),
convert_options_(convert_options),
count_rows_(count_rows),
num_rows_seen_(count_rows_ ? 1 : -1),
input_(std::move(input)) {}

protected:
Expand All @@ -581,6 +584,7 @@ class ReaderMixin {
const uint8_t* data = buf->data();
const auto data_end = data + buf->size();
DCHECK_GT(data_end - data, 0);
int64_t num_rows_seen = 1;

if (read_options_.skip_rows) {
// Skip initial rows (potentially invalid CSV data)
Expand All @@ -593,14 +597,14 @@ class ReaderMixin {
"either file is too short or header is larger than block size");
}
if (count_rows_) {
num_rows_seen_ += num_skipped_rows;
num_rows_seen += num_skipped_rows;
}
}

if (read_options_.column_names.empty()) {
// Parse one row (either to read column names or to know the number of columns)
BlockParser parser(io_context_.pool(), parse_options_, num_csv_cols_,
num_rows_seen_, 1);
BlockParser parser(io_context_.pool(), parse_options_, /*num_cols=*/-1,
/*first_row=*/num_rows_seen, /*max_num_rows=*/1);
uint32_t parsed_size = 0;
RETURN_NOT_OK(parser.Parse(
std::string_view(reinterpret_cast<const char*>(data), data_end - data),
Expand All @@ -627,7 +631,7 @@ class ReaderMixin {
// Skip parsed header row
data += parsed_size;
if (count_rows_) {
++num_rows_seen_;
++num_rows_seen;
}
}
} else {
Expand All @@ -636,14 +640,17 @@ class ReaderMixin {

if (count_rows_) {
// increase rows seen to skip past rows which will be skipped
num_rows_seen_ += read_options_.skip_rows_after_names;
num_rows_seen += read_options_.skip_rows_after_names;
}

auto bytes_consumed = data - buf->data();
*rest = SliceBuffer(buf, bytes_consumed);

num_csv_cols_ = static_cast<int32_t>(column_names_.size());
DCHECK_GT(num_csv_cols_, 0);
int32_t num_csv_cols = static_cast<int32_t>(column_names_.size());
DCHECK_GT(num_csv_cols, 0);
// Since we know the number of columns, we can instantiate the BlockParsingOperator
parsing_operator_.emplace(io_context_, parse_options_, num_csv_cols,
count_rows_ ? num_rows_seen : -1);

RETURN_NOT_OK(MakeConversionSchema());
return bytes_consumed;
Expand Down Expand Up @@ -691,7 +698,7 @@ class ReaderMixin {

if (convert_options_.include_columns.empty()) {
// Include all columns in CSV file order
for (int32_t col_index = 0; col_index < num_csv_cols_; ++col_index) {
for (int32_t col_index = 0; col_index < num_csv_cols(); ++col_index) {
append_csv_column(column_names_[col_index], col_index);
}
} else {
Expand Down Expand Up @@ -719,66 +726,25 @@ class ReaderMixin {
return Status::OK();
}

struct ParseResult {
std::shared_ptr<BlockParser> parser;
int64_t parsed_bytes;
};

Result<ParseResult> Parse(const std::shared_ptr<Buffer>& partial,
const std::shared_ptr<Buffer>& completion,
const std::shared_ptr<Buffer>& block, int64_t block_index,
bool is_final) {
static constexpr int32_t max_num_rows = std::numeric_limits<int32_t>::max();
auto parser = std::make_shared<BlockParser>(
io_context_.pool(), parse_options_, num_csv_cols_, num_rows_seen_, max_num_rows);

std::shared_ptr<Buffer> straddling;
std::vector<std::string_view> views;
if (partial->size() != 0 || completion->size() != 0) {
if (partial->size() == 0) {
straddling = completion;
} else if (completion->size() == 0) {
straddling = partial;
} else {
ARROW_ASSIGN_OR_RAISE(
straddling, ConcatenateBuffers({partial, completion}, io_context_.pool()));
}
views = {std::string_view(*straddling), std::string_view(*block)};
} else {
views = {std::string_view(*block)};
}
uint32_t parsed_size;
if (is_final) {
RETURN_NOT_OK(parser->ParseFinal(views, &parsed_size));
} else {
RETURN_NOT_OK(parser->Parse(views, &parsed_size));
}
// See BlockParsingOperator for explanation.
const int64_t bytes_before_buffer = partial->size() + completion->size();
if (static_cast<int64_t>(parsed_size) < bytes_before_buffer) {
return Status::Invalid(
"CSV parser got out of sync with chunker. This can mean the data file "
"contains cell values spanning multiple lines; please consider enabling "
"the option 'newlines_in_values'.");
}
Result<ParsedBlock> Parse(const CSVBlock& block) {
DCHECK(parsing_operator_.has_value());
bkietz marked this conversation as resolved.
Show resolved Hide resolved
return (*parsing_operator_)(block);
}

if (count_rows_) {
num_rows_seen_ += parser->total_num_rows();
}
return ParseResult{std::move(parser), static_cast<int64_t>(parsed_size)};
int num_csv_cols() const {
DCHECK(parsing_operator_.has_value());
return parsing_operator_->num_csv_cols();
}

io::IOContext io_context_;
ReadOptions read_options_;
ParseOptions parse_options_;
ConvertOptions convert_options_;

// Number of columns in the CSV file
int32_t num_csv_cols_ = -1;
// Whether num_rows_seen_ tracks the number of rows seen in the CSV being parsed
bool count_rows_;
// Number of rows seen in the csv. Not used if count_rows is false
int64_t num_rows_seen_;
const ReadOptions read_options_;
const ParseOptions parse_options_;
const ConvertOptions convert_options_;
// Whether to track the number of rows seen in the CSV being parsed
const bool count_rows_;

std::optional<BlockParsingOperator> parsing_operator_;

// Column names in the CSV file
std::vector<std::string> column_names_;
ConversionSchema conversion_schema_;
Expand Down Expand Up @@ -822,14 +788,10 @@ class BaseTableReader : public ReaderMixin, public csv::TableReader {
return Status::OK();
}

Result<int64_t> ParseAndInsert(const std::shared_ptr<Buffer>& partial,
const std::shared_ptr<Buffer>& completion,
const std::shared_ptr<Buffer>& block,
int64_t block_index, bool is_final) {
ARROW_ASSIGN_OR_RAISE(auto result,
Parse(partial, completion, block, block_index, is_final));
RETURN_NOT_OK(ProcessData(result.parser, block_index));
return result.parsed_bytes;
Status ParseAndInsert(const CSVBlock& block) {
ARROW_ASSIGN_OR_RAISE(auto result, Parse(block));
RETURN_NOT_OK(ProcessData(result.parser, result.block_index));
return Status::OK();
}

// Trigger conversion of parsed block data
Expand Down Expand Up @@ -921,17 +883,14 @@ class StreamingReaderImpl : public ReaderMixin,
ProcessHeader(first_buffer, &after_header));
bytes_decoded_->fetch_add(header_bytes_consumed);

auto parser_op =
BlockParsingOperator(io_context_, parse_options_, num_csv_cols_, num_rows_seen_);
ARROW_ASSIGN_OR_RAISE(
auto decoder_op,
BlockDecodingOperator::Make(io_context_, convert_options_, conversion_schema_));

auto block_gen = SerialBlockReader::MakeAsyncIterator(
std::move(buffer_generator), MakeChunker(parse_options_), std::move(after_header),
read_options_.skip_rows_after_names);
auto parsed_block_gen =
MakeMappedGenerator(std::move(block_gen), std::move(parser_op));
auto parsed_block_gen = MakeMappedGenerator(std::move(block_gen), *parsing_operator_);
auto rb_gen = MakeMappedGenerator(std::move(parsed_block_gen), std::move(decoder_op));

auto self = shared_from_this();
Expand Down Expand Up @@ -1035,11 +994,7 @@ class SerialTableReader : public BaseTableReader {
// EOF
break;
}
ARROW_ASSIGN_OR_RAISE(
int64_t parsed_bytes,
ParseAndInsert(maybe_block.partial, maybe_block.completion, maybe_block.buffer,
maybe_block.block_index, maybe_block.is_final));
RETURN_NOT_OK(maybe_block.consume_bytes(parsed_bytes));
RETURN_NOT_OK(ParseAndInsert(maybe_block));
}
// Finish conversion, create schema and table
RETURN_NOT_OK(task_group_->Finish());
Expand Down Expand Up @@ -1110,13 +1065,8 @@ class AsyncThreadedTableReader
DCHECK(!maybe_block.consume_bytes);

// Launch parse task
self->task_group_->Append([self, maybe_block] {
return self
->ParseAndInsert(maybe_block.partial, maybe_block.completion,
maybe_block.buffer, maybe_block.block_index,
maybe_block.is_final)
.status();
});
self->task_group_->Append(
[self, maybe_block] { return self->ParseAndInsert(maybe_block); });
return Status::OK();
};

Expand Down Expand Up @@ -1239,12 +1189,8 @@ class CSVRowCounter : public ReaderMixin,
// IterationEnd.
std::function<Result<std::optional<int64_t>>(const CSVBlock&)> count_cb =
[self](const CSVBlock& maybe_block) -> Result<std::optional<int64_t>> {
ARROW_ASSIGN_OR_RAISE(
auto parser,
self->Parse(maybe_block.partial, maybe_block.completion, maybe_block.buffer,
maybe_block.block_index, maybe_block.is_final));
RETURN_NOT_OK(maybe_block.consume_bytes(parser.parsed_bytes));
int32_t total_row_count = parser.parser->total_num_rows();
ARROW_ASSIGN_OR_RAISE(auto parsed_block, self->Parse(maybe_block));
int32_t total_row_count = parsed_block.parser->total_num_rows();
self->row_count_ += total_row_count;
return total_row_count;
};
Expand Down
Loading