Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CSVRow::current_row_start(): track row start position of input stream #243

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
5 changes: 3 additions & 2 deletions include/internal/basic_csv_parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,8 @@ namespace csv {
case ParseFlags::NEWLINE:
this->data_pos++;

// Catches CRLF (or LFLF, CRCRLF, or any other non-sensical combination of newlines)
while (this->data_pos < in.size() && parse_flag(in[this->data_pos]) == ParseFlags::NEWLINE)
// Catches CRLF Only (not skip LFLF, CRCRLF, or any other non-sensical combination of newlines)
if (this->data_pos < in.size() && in[this->data_pos-1] == '\r' and in[this->data_pos] == '\n')
this->data_pos++;

// End of record -> Write record
Expand Down Expand Up @@ -235,6 +235,7 @@ namespace csv {
this->field_start = UNINITIALIZED_FIELD;
this->field_length = 0;
this->reset_data_ptr();
this->data_ptr->_stream_pos = this->mmap_pos;

// Create memory map
size_t length = std::min(this->source_size - this->mmap_pos, bytes);
Expand Down
4 changes: 4 additions & 0 deletions include/internal/basic_csv_parser.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,11 @@ namespace csv {
void next(size_t bytes = ITERATION_CHUNK_SIZE) override {
if (this->eof()) return;

// Reset parser state
this->field_start = UNINITIALIZED_FIELD;
this->field_length = 0;
this->reset_data_ptr();
this->data_ptr->_stream_pos = this->stream_pos;
this->data_ptr->_data = std::make_shared<std::string>();

if (source_size == 0) {
Expand Down
8 changes: 5 additions & 3 deletions include/internal/csv_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -280,10 +280,12 @@ namespace csv {
return false;
else {
// Reading thread is not active => start another one
if (this->read_csv_worker.joinable())
this->read_csv_worker.join();
// if (this->read_csv_worker.joinable())
// this->read_csv_worker.join();

// this->read_csv_worker = std::thread(&CSVReader::read_csv, this, internals::ITERATION_CHUNK_SIZE);
this->read_csv(internals::ITERATION_CHUNK_SIZE);

this->read_csv_worker = std::thread(&CSVReader::read_csv, this, internals::ITERATION_CHUNK_SIZE);
}
}
else if (this->records->front().size() != this->n_cols &&
Expand Down
13 changes: 7 additions & 6 deletions include/internal/csv_reader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,9 @@ namespace csv {
CSVReader& operator=(const CSVReader&) = delete; // No copy assignment
CSVReader& operator=(CSVReader&& other) = default;
~CSVReader() {
if (this->read_csv_worker.joinable()) {
this->read_csv_worker.join();
}
// if (this->read_csv_worker.joinable()) {
// this->read_csv_worker.join();
// }
}

/** @name Retrieving CSV Rows */
Expand Down Expand Up @@ -216,13 +216,14 @@ namespace csv {

/** @name Multi-Threaded File Reading: Flags and State */
///@{
std::thread read_csv_worker; /**< Worker thread for read_csv() */
// std::thread read_csv_worker; /**< Worker thread for read_csv() */
///@}

/** Read initial chunk to get metadata */
void initial_read() {
this->read_csv_worker = std::thread(&CSVReader::read_csv, this, internals::ITERATION_CHUNK_SIZE);
this->read_csv_worker.join();
// this->read_csv_worker = std::thread(&CSVReader::read_csv, this, internals::ITERATION_CHUNK_SIZE);
// this->read_csv_worker.join();
this->read_csv(internals::ITERATION_CHUNK_SIZE);
}

void trim_header();
Expand Down
5 changes: 3 additions & 2 deletions include/internal/csv_reader_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ namespace csv {
/** Return an iterator to the first row in the reader */
CSV_INLINE CSVReader::iterator CSVReader::begin() {
if (this->records->empty()) {
this->read_csv_worker = std::thread(&CSVReader::read_csv, this, internals::ITERATION_CHUNK_SIZE);
this->read_csv_worker.join();
// this->read_csv_worker = std::thread(&CSVReader::read_csv, this, internals::ITERATION_CHUNK_SIZE);
// this->read_csv_worker.join();
this->read_csv(internals::ITERATION_CHUNK_SIZE);

// Still empty => return end iterator
if (this->records->empty()) return this->end();
Expand Down
6 changes: 6 additions & 0 deletions include/internal/csv_row.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ namespace csv {
internals::ColNamesPtr col_names = nullptr;
internals::ParseFlagMap parse_flags;
internals::WhitespaceMap ws_flags;

/** where in Stream we start */
uint64_t _stream_pos = {};
};

using RawCSVDataPtr = std::shared_ptr<RawCSVData>;
Expand Down Expand Up @@ -324,6 +327,9 @@ namespace csv {
/** Return the number of fields in this row */
CONSTEXPR size_t size() const noexcept { return row_length; }

/** Where in the Stream we start */
size_t current_row_start() const { return data->_stream_pos + data_start; }

/** @name Value Retrieval */
///@{
CSVField operator[](size_t n) const;
Expand Down
Loading