Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Opt](orc)Optimize the merge io when orc reader read multiple tiny stripes. #42004

Merged
merged 17 commits into from
Nov 7, 2024
2 changes: 1 addition & 1 deletion be/src/apache-orc
102 changes: 102 additions & 0 deletions be/src/io/fs/buffered_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -869,5 +869,107 @@ Result<io::FileReaderSPtr> DelegateReader::create_file_reader(
return reader;
});
}

Status LinearProbeRangeFinder::get_range_for(int64_t desired_offset,
io::PrefetchRange& result_range) {
while (index < _ranges.size()) {
io::PrefetchRange& range = _ranges[index];
if (range.end_offset > desired_offset) {
if (range.start_offset > desired_offset) [[unlikely]] {
return Status::InvalidArgument("Invalid desiredOffset");
}
result_range = range;
return Status::OK();
}
++index;
}
return Status::InvalidArgument("Invalid desiredOffset");
}

RangeCacheFileReader::RangeCacheFileReader(RuntimeProfile* profile, io::FileReaderSPtr inner_reader,
std::shared_ptr<RangeFinder> range_finder)
: _profile(profile),
_inner_reader(std::move(inner_reader)),
_range_finder(std::move(range_finder)) {
_size = _inner_reader->size();
uint64_t max_cache_size =
std::max((uint64_t)4096, (uint64_t)_range_finder->get_max_range_size());
_cache = OwnedSlice(max_cache_size);

if (_profile != nullptr) {
const char* random_profile = "RangeCacheFileReader";
ADD_TIMER_WITH_LEVEL(_profile, random_profile, 1);
_request_io =
ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "RequestIO", TUnit::UNIT, random_profile, 1);
_request_bytes = ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "RequestBytes", TUnit::BYTES,
random_profile, 1);
_request_time = ADD_CHILD_TIMER_WITH_LEVEL(_profile, "RequestTime", random_profile, 1);
_read_to_cache_time =
ADD_CHILD_TIMER_WITH_LEVEL(_profile, "ReadToCacheTime", random_profile, 1);
_cache_refresh_count = ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "CacheRefreshCount",
TUnit::UNIT, random_profile, 1);
_read_to_cache_bytes = ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "ReadToCacheBytes",
TUnit::BYTES, random_profile, 1);
}
}

Status RangeCacheFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read,
const IOContext* io_ctx) {
auto request_size = result.size;

_cache_statistics.request_io++;
_cache_statistics.request_bytes += request_size;
SCOPED_RAW_TIMER(&_cache_statistics.request_time);

PrefetchRange range;
if (_range_finder->get_range_for(offset, range)) [[likely]] {
if (_current_start_offset != range.start_offset) { // need read new range to cache.
auto range_size = range.end_offset - range.start_offset;

_cache_statistics.cache_refresh_count++;
_cache_statistics.read_to_cache_bytes += range_size;
SCOPED_RAW_TIMER(&_cache_statistics.read_to_cache_time);

Slice cache_slice = {_cache.data(), range_size};
RETURN_IF_ERROR(
_inner_reader->read_at(range.start_offset, cache_slice, bytes_read, io_ctx));

if (*bytes_read != range_size) [[unlikely]] {
return Status::InternalError(
"RangeCacheFileReader use inner reader read bytes {} not eq expect size {}",
*bytes_read, range_size);
}

_current_start_offset = range.start_offset;
}

int64_t buffer_offset = offset - _current_start_offset;
memcpy(result.data, _cache.data() + buffer_offset, request_size);
*bytes_read = request_size;

return Status::OK();
} else {
return Status::InternalError("RangeCacheFileReader read not in Ranges. Offset = {}",
offset);
// RETURN_IF_ERROR(_inner_reader->read_at(offset, result , bytes_read, io_ctx));
// return Status::OK();
// think return error is ok,otherwise it will cover up the error.
}
}

void RangeCacheFileReader::_collect_profile_before_close() {
if (_profile != nullptr) {
COUNTER_UPDATE(_request_io, _cache_statistics.request_io);
COUNTER_UPDATE(_request_bytes, _cache_statistics.request_bytes);
COUNTER_UPDATE(_request_time, _cache_statistics.request_time);
COUNTER_UPDATE(_read_to_cache_time, _cache_statistics.read_to_cache_time);
COUNTER_UPDATE(_cache_refresh_count, _cache_statistics.cache_refresh_count);
COUNTER_UPDATE(_read_to_cache_bytes, _cache_statistics.read_to_cache_bytes);
if (_inner_reader != nullptr) {
_inner_reader->collect_profile_before_close();
}
}
}

} // namespace io
} // namespace doris
141 changes: 141 additions & 0 deletions be/src/io/fs/buffered_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,147 @@ struct PrefetchRange {
: start_offset(start_offset), end_offset(end_offset) {}

PrefetchRange() : start_offset(0), end_offset(0) {}

bool operator==(const PrefetchRange& other) const {
return (start_offset == other.start_offset) && (end_offset == other.end_offset);
}

bool operator!=(const PrefetchRange& other) const { return !(*this == other); }

PrefetchRange span(const PrefetchRange& other) const {
return {std::min(start_offset, other.end_offset), std::max(start_offset, other.end_offset)};
}
PrefetchRange seq_span(const PrefetchRange& other) const {
return {start_offset, other.end_offset};
}

//Ranges needs to be sorted.
static std::vector<PrefetchRange> merge_adjacent_seq_ranges(
const std::vector<PrefetchRange>& seq_ranges, int64_t max_merge_distance_bytes,
int64_t once_max_read_bytes) {
if (seq_ranges.empty()) {
return {};
}
// Merge overlapping ranges
std::vector<PrefetchRange> result;
PrefetchRange last = seq_ranges.front();
for (size_t i = 1; i < seq_ranges.size(); ++i) {
PrefetchRange current = seq_ranges[i];
PrefetchRange merged = last.seq_span(current);
if (merged.end_offset <= once_max_read_bytes + merged.start_offset &&
last.end_offset + max_merge_distance_bytes >= current.start_offset) {
last = merged;
} else {
result.push_back(last);
last = current;
}
}
result.push_back(last);
return result;
}
};

class RangeFinder {
public:
virtual ~RangeFinder() = default;
virtual Status get_range_for(int64_t desired_offset, io::PrefetchRange& result_range) = 0;
virtual size_t get_max_range_size() const = 0;
};

class LinearProbeRangeFinder : public RangeFinder {
public:
LinearProbeRangeFinder(std::vector<io::PrefetchRange>&& ranges) : _ranges(std::move(ranges)) {}

Status get_range_for(int64_t desired_offset, io::PrefetchRange& result_range) override;

size_t get_max_range_size() const override {
size_t max_range_size = 0;
for (const auto& range : _ranges) {
max_range_size = std::max(max_range_size, range.end_offset - range.start_offset);
}
return max_range_size;
}

~LinearProbeRangeFinder() override = default;

private:
std::vector<io::PrefetchRange> _ranges;
size_t index {0};
};

/**
* The reader provides a solution to read one range at a time. You can customize RangeFinder to meet your scenario.
* For me, since there will be tiny stripes when reading orc files, in order to reduce the requests to hdfs,
* I first merge the access to the orc files to be read (of course there is a problem of read amplification,
* but in my scenario, compared with reading hdfs multiple times, it is faster to read more data on hdfs at one time),
* and then because the actual reading of orc files is in order from front to back, I provide LinearProbeRangeFinder.
*/
class RangeCacheFileReader : public io::FileReader {
struct RangeCacheReaderStatistics {
int64_t request_io = 0;
int64_t request_bytes = 0;
int64_t request_time = 0;
int64_t read_to_cache_time = 0;
int64_t cache_refresh_count = 0;
int64_t read_to_cache_bytes = 0;
};

public:
RangeCacheFileReader(RuntimeProfile* profile, io::FileReaderSPtr inner_reader,
std::shared_ptr<RangeFinder> range_finder);

~RangeCacheFileReader() override = default;

Status close() override {
if (!_closed) {
_closed = true;
}
return Status::OK();
}

const io::Path& path() const override { return _inner_reader->path(); }

size_t size() const override { return _size; }

bool closed() const override { return _closed; }

protected:
Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
const IOContext* io_ctx) override;

void _collect_profile_before_close() override;

private:
RuntimeProfile* _profile = nullptr;
io::FileReaderSPtr _inner_reader;
std::shared_ptr<RangeFinder> _range_finder;

OwnedSlice _cache;
int64_t _current_start_offset = -1;

size_t _size;
bool _closed = false;

RuntimeProfile::Counter* _request_io = nullptr;
RuntimeProfile::Counter* _request_bytes = nullptr;
RuntimeProfile::Counter* _request_time = nullptr;
RuntimeProfile::Counter* _read_to_cache_time = nullptr;
RuntimeProfile::Counter* _cache_refresh_count = nullptr;
RuntimeProfile::Counter* _read_to_cache_bytes = nullptr;
RangeCacheReaderStatistics _cache_statistics;
/**
* `RangeCacheFileReader`:
* 1. `CacheRefreshCount`: how many IOs are merged
* 2. `ReadToCacheBytes`: how much data is actually read after merging
* 3. `ReadToCacheTime`: how long it takes to read data after merging
* 4. `RequestBytes`: how many bytes does the apache-orc library actually need to read the orc file
* 5. `RequestIO`: how many times the apache-orc library calls this read interface
* 6. `RequestTime`: how long it takes the apache-orc library to call this read interface
*
* It should be noted that `RangeCacheFileReader` is a wrapper of the reader that actually reads data,such as
* the hdfs reader, so strictly speaking, `CacheRefreshCount` is not equal to how many IOs are initiated to hdfs,
* because each time the hdfs reader is requested, the hdfs reader may not be able to read all the data at once.
*/
};

/**
Expand Down
80 changes: 67 additions & 13 deletions be/src/vec/exec/format/orc/vorc_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -857,28 +857,79 @@ Status OrcReader::set_fill_columns(
if (_colname_to_value_range == nullptr || !_init_search_argument(_colname_to_value_range)) {
_lazy_read_ctx.can_lazy_read = false;
}
try {
_row_reader_options.range(_range_start_offset, _range_size);
_row_reader_options.setTimezoneName(_ctz == "CST" ? "Asia/Shanghai" : _ctz);
_row_reader_options.include(_read_cols);
_row_reader_options.setEnableLazyDecoding(true);

if (!_lazy_read_ctx.can_lazy_read) {
for (auto& kv : _lazy_read_ctx.predicate_partition_columns) {
_lazy_read_ctx.partition_columns.emplace(kv.first, kv.second);
uint64_t number_of_stripes = _reader->getNumberOfStripes();
auto all_stripes_needed = _reader->getNeedReadStripes(_row_reader_options);

int64_t range_end_offset = _range_start_offset + _range_size;

// If you set "orc_tiny_stripe_threshold_bytes" = 0, the use tiny stripes merge io optimization will not be used.
int64_t orc_tiny_stripe_threshold_bytes = 8L * 1024L * 1024L;
int64_t orc_once_max_read_bytes = 8L * 1024L * 1024L;
int64_t orc_max_merge_distance_bytes = 1L * 1024L * 1024L;

if (_state != nullptr) {
orc_tiny_stripe_threshold_bytes =
_state->query_options().orc_tiny_stripe_threshold_bytes;
orc_once_max_read_bytes = _state->query_options().orc_once_max_read_bytes;
orc_max_merge_distance_bytes = _state->query_options().orc_max_merge_distance_bytes;
}
for (auto& kv : _lazy_read_ctx.predicate_missing_columns) {
_lazy_read_ctx.missing_columns.emplace(kv.first, kv.second);

bool all_tiny_stripes = true;
std::vector<io::PrefetchRange> tiny_stripe_ranges;

for (uint64_t i = 0; i < number_of_stripes; i++) {
std::unique_ptr<orc::StripeInformation> strip_info = _reader->getStripe(i);
uint64_t strip_start_offset = strip_info->getOffset();
uint64_t strip_end_offset = strip_start_offset + strip_info->getLength();

if (strip_start_offset >= range_end_offset || strip_end_offset < _range_start_offset ||
!all_stripes_needed[i]) {
continue;
}
if (strip_info->getLength() > orc_tiny_stripe_threshold_bytes) {
all_tiny_stripes = false;
break;
}

tiny_stripe_ranges.emplace_back(strip_start_offset, strip_end_offset);
}
}
if (all_tiny_stripes && number_of_stripes > 0) {
std::vector<io::PrefetchRange> prefetch_merge_ranges =
io::PrefetchRange::merge_adjacent_seq_ranges(tiny_stripe_ranges,
orc_max_merge_distance_bytes,
orc_once_max_read_bytes);
auto range_finder =
std::make_shared<io::LinearProbeRangeFinder>(std::move(prefetch_merge_ranges));

_fill_all_columns = true;
auto* orc_input_stream_ptr = static_cast<ORCFileInputStream*>(_reader->getStream());
orc_input_stream_ptr->set_all_tiny_stripes();
auto& orc_file_reader = orc_input_stream_ptr->get_file_reader();
auto orc_inner_reader = orc_input_stream_ptr->get_inner_reader();
orc_file_reader = std::make_shared<io::RangeCacheFileReader>(_profile, orc_inner_reader,
range_finder);
}

// create orc row reader
try {
_row_reader_options.range(_range_start_offset, _range_size);
_row_reader_options.setTimezoneName(_ctz == "CST" ? "Asia/Shanghai" : _ctz);
_row_reader_options.include(_read_cols);
if (!_lazy_read_ctx.can_lazy_read) {
for (auto& kv : _lazy_read_ctx.predicate_partition_columns) {
_lazy_read_ctx.partition_columns.emplace(kv.first, kv.second);
}
for (auto& kv : _lazy_read_ctx.predicate_missing_columns) {
_lazy_read_ctx.missing_columns.emplace(kv.first, kv.second);
}
}

_fill_all_columns = true;
// create orc row reader
if (_lazy_read_ctx.can_lazy_read) {
_row_reader_options.filter(_lazy_read_ctx.predicate_orc_columns);
_orc_filter = std::unique_ptr<ORCFilterImpl>(new ORCFilterImpl(this));
}
_row_reader_options.setEnableLazyDecoding(true);
if (!_lazy_read_ctx.conjuncts.empty()) {
_string_dict_filter = std::make_unique<StringDictFilterImpl>(this);
}
Expand Down Expand Up @@ -2415,6 +2466,9 @@ MutableColumnPtr OrcReader::_convert_dict_column_to_string_column(
void ORCFileInputStream::beforeReadStripe(
std::unique_ptr<orc::StripeInformation> current_strip_information,
std::vector<bool> selected_columns) {
if (_is_all_tiny_stripes) {
return;
}
if (_file_reader != nullptr) {
_file_reader->collect_profile_before_close();
}
Expand Down
Loading
Loading