From 0e38d2069252b9642eec95e10edf149de2e9b0ed Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Tue, 6 Feb 2024 16:23:07 +0100 Subject: [PATCH] GH-39962: [C++] Small CSV reader refactoring --- cpp/src/arrow/csv/reader.cc | 146 ++++++++++++------------------------ 1 file changed, 47 insertions(+), 99 deletions(-) diff --git a/cpp/src/arrow/csv/reader.cc b/cpp/src/arrow/csv/reader.cc index 1ac25e290a814..27619a28ad59a 100644 --- a/cpp/src/arrow/csv/reader.cc +++ b/cpp/src/arrow/csv/reader.cc @@ -445,16 +445,22 @@ class BlockParsingOperator { num_rows_seen_ += parser->total_num_rows(); } - RETURN_NOT_OK(block.consume_bytes(parsed_size)); + if (block.consume_bytes) { + RETURN_NOT_OK(block.consume_bytes(parsed_size)); + } return ParsedBlock{std::move(parser), block.block_index, static_cast(parsed_size) + block.bytes_skipped}; } + int64_t num_rows_seen() const { return num_rows_seen_; } + + int num_csv_cols() const { return num_csv_cols_; } + private: io::IOContext io_context_; - ParseOptions parse_options_; - int num_csv_cols_; - bool count_rows_; + const ParseOptions parse_options_; + const int num_csv_cols_; + const bool count_rows_; int64_t num_rows_seen_; }; @@ -570,7 +576,6 @@ class ReaderMixin { parse_options_(parse_options), convert_options_(convert_options), count_rows_(count_rows), - num_rows_seen_(count_rows_ ? 1 : -1), input_(std::move(input)) {} protected: @@ -581,6 +586,7 @@ class ReaderMixin { const uint8_t* data = buf->data(); const auto data_end = data + buf->size(); DCHECK_GT(data_end - data, 0); + int64_t num_rows_seen = 1; if (read_options_.skip_rows) { // Skip initial rows (potentially invalid CSV data) @@ -593,14 +599,14 @@ class ReaderMixin { "either file is too short or header is larger than block size"); } if (count_rows_) { - num_rows_seen_ += num_skipped_rows; + num_rows_seen += num_skipped_rows; } } if (read_options_.column_names.empty()) { // Parse one row (either to read column names or to know the number of columns) - BlockParser parser(io_context_.pool(), parse_options_, num_csv_cols_, - num_rows_seen_, 1); + BlockParser parser(io_context_.pool(), parse_options_, /*num_cols=*/-1, + /*first_row=*/num_rows_seen, /*max_num_rows=*/1); uint32_t parsed_size = 0; RETURN_NOT_OK(parser.Parse( std::string_view(reinterpret_cast(data), data_end - data), @@ -627,7 +633,7 @@ class ReaderMixin { // Skip parsed header row data += parsed_size; if (count_rows_) { - ++num_rows_seen_; + ++num_rows_seen; } } } else { @@ -636,14 +642,17 @@ class ReaderMixin { if (count_rows_) { // increase rows seen to skip past rows which will be skipped - num_rows_seen_ += read_options_.skip_rows_after_names; + num_rows_seen += read_options_.skip_rows_after_names; } auto bytes_consumed = data - buf->data(); *rest = SliceBuffer(buf, bytes_consumed); - num_csv_cols_ = static_cast(column_names_.size()); - DCHECK_GT(num_csv_cols_, 0); + int32_t num_csv_cols = static_cast(column_names_.size()); + DCHECK_GT(num_csv_cols, 0); + // Since we know the number of columns, we can instantiate the BlockParsingOperator + parsing_operator_.emplace(io_context_, parse_options_, num_csv_cols, + count_rows_ ? num_rows_seen : -1); RETURN_NOT_OK(MakeConversionSchema()); return bytes_consumed; @@ -691,7 +700,7 @@ class ReaderMixin { if (convert_options_.include_columns.empty()) { // Include all columns in CSV file order - for (int32_t col_index = 0; col_index < num_csv_cols_; ++col_index) { + for (int32_t col_index = 0; col_index < num_csv_cols(); ++col_index) { append_csv_column(column_names_[col_index], col_index); } } else { @@ -719,66 +728,25 @@ class ReaderMixin { return Status::OK(); } - struct ParseResult { - std::shared_ptr parser; - int64_t parsed_bytes; - }; - - Result Parse(const std::shared_ptr& partial, - const std::shared_ptr& completion, - const std::shared_ptr& block, int64_t block_index, - bool is_final) { - static constexpr int32_t max_num_rows = std::numeric_limits::max(); - auto parser = std::make_shared( - io_context_.pool(), parse_options_, num_csv_cols_, num_rows_seen_, max_num_rows); - - std::shared_ptr straddling; - std::vector views; - if (partial->size() != 0 || completion->size() != 0) { - if (partial->size() == 0) { - straddling = completion; - } else if (completion->size() == 0) { - straddling = partial; - } else { - ARROW_ASSIGN_OR_RAISE( - straddling, ConcatenateBuffers({partial, completion}, io_context_.pool())); - } - views = {std::string_view(*straddling), std::string_view(*block)}; - } else { - views = {std::string_view(*block)}; - } - uint32_t parsed_size; - if (is_final) { - RETURN_NOT_OK(parser->ParseFinal(views, &parsed_size)); - } else { - RETURN_NOT_OK(parser->Parse(views, &parsed_size)); - } - // See BlockParsingOperator for explanation. - const int64_t bytes_before_buffer = partial->size() + completion->size(); - if (static_cast(parsed_size) < bytes_before_buffer) { - return Status::Invalid( - "CSV parser got out of sync with chunker. This can mean the data file " - "contains cell values spanning multiple lines; please consider enabling " - "the option 'newlines_in_values'."); - } + Result Parse(const CSVBlock& block) { + DCHECK(parsing_operator_.has_value()); + return (*parsing_operator_)(block); + } - if (count_rows_) { - num_rows_seen_ += parser->total_num_rows(); - } - return ParseResult{std::move(parser), static_cast(parsed_size)}; + int num_csv_cols() const { + DCHECK(parsing_operator_.has_value()); + return parsing_operator_->num_csv_cols(); } io::IOContext io_context_; - ReadOptions read_options_; - ParseOptions parse_options_; - ConvertOptions convert_options_; - - // Number of columns in the CSV file - int32_t num_csv_cols_ = -1; - // Whether num_rows_seen_ tracks the number of rows seen in the CSV being parsed - bool count_rows_; - // Number of rows seen in the csv. Not used if count_rows is false - int64_t num_rows_seen_; + const ReadOptions read_options_; + const ParseOptions parse_options_; + const ConvertOptions convert_options_; + // Whether to track the number of rows seen in the CSV being parsed + const bool count_rows_; + + std::optional parsing_operator_; + // Column names in the CSV file std::vector column_names_; ConversionSchema conversion_schema_; @@ -822,14 +790,10 @@ class BaseTableReader : public ReaderMixin, public csv::TableReader { return Status::OK(); } - Result ParseAndInsert(const std::shared_ptr& partial, - const std::shared_ptr& completion, - const std::shared_ptr& block, - int64_t block_index, bool is_final) { - ARROW_ASSIGN_OR_RAISE(auto result, - Parse(partial, completion, block, block_index, is_final)); - RETURN_NOT_OK(ProcessData(result.parser, block_index)); - return result.parsed_bytes; + Status ParseAndInsert(const CSVBlock& block) { + ARROW_ASSIGN_OR_RAISE(auto result, Parse(block)); + RETURN_NOT_OK(ProcessData(result.parser, result.block_index)); + return Status::OK(); } // Trigger conversion of parsed block data @@ -921,8 +885,6 @@ class StreamingReaderImpl : public ReaderMixin, ProcessHeader(first_buffer, &after_header)); bytes_decoded_->fetch_add(header_bytes_consumed); - auto parser_op = - BlockParsingOperator(io_context_, parse_options_, num_csv_cols_, num_rows_seen_); ARROW_ASSIGN_OR_RAISE( auto decoder_op, BlockDecodingOperator::Make(io_context_, convert_options_, conversion_schema_)); @@ -930,8 +892,7 @@ class StreamingReaderImpl : public ReaderMixin, auto block_gen = SerialBlockReader::MakeAsyncIterator( std::move(buffer_generator), MakeChunker(parse_options_), std::move(after_header), read_options_.skip_rows_after_names); - auto parsed_block_gen = - MakeMappedGenerator(std::move(block_gen), std::move(parser_op)); + auto parsed_block_gen = MakeMappedGenerator(std::move(block_gen), *parsing_operator_); auto rb_gen = MakeMappedGenerator(std::move(parsed_block_gen), std::move(decoder_op)); auto self = shared_from_this(); @@ -1035,11 +996,7 @@ class SerialTableReader : public BaseTableReader { // EOF break; } - ARROW_ASSIGN_OR_RAISE( - int64_t parsed_bytes, - ParseAndInsert(maybe_block.partial, maybe_block.completion, maybe_block.buffer, - maybe_block.block_index, maybe_block.is_final)); - RETURN_NOT_OK(maybe_block.consume_bytes(parsed_bytes)); + RETURN_NOT_OK(ParseAndInsert(maybe_block)); } // Finish conversion, create schema and table RETURN_NOT_OK(task_group_->Finish()); @@ -1110,13 +1067,8 @@ class AsyncThreadedTableReader DCHECK(!maybe_block.consume_bytes); // Launch parse task - self->task_group_->Append([self, maybe_block] { - return self - ->ParseAndInsert(maybe_block.partial, maybe_block.completion, - maybe_block.buffer, maybe_block.block_index, - maybe_block.is_final) - .status(); - }); + self->task_group_->Append( + [self, maybe_block] { return self->ParseAndInsert(maybe_block); }); return Status::OK(); }; @@ -1239,12 +1191,8 @@ class CSVRowCounter : public ReaderMixin, // IterationEnd. std::function>(const CSVBlock&)> count_cb = [self](const CSVBlock& maybe_block) -> Result> { - ARROW_ASSIGN_OR_RAISE( - auto parser, - self->Parse(maybe_block.partial, maybe_block.completion, maybe_block.buffer, - maybe_block.block_index, maybe_block.is_final)); - RETURN_NOT_OK(maybe_block.consume_bytes(parser.parsed_bytes)); - int32_t total_row_count = parser.parser->total_num_rows(); + ARROW_ASSIGN_OR_RAISE(auto parsed_block, self->Parse(maybe_block)); + int32_t total_row_count = parsed_block.parser->total_num_rows(); self->row_count_ += total_row_count; return total_row_count; };