Skip to content

Commit 702abbf

Browse files
hubgeterkaka11chen
andauthored
[Opt](orc)Optimize the merge io when orc reader read multiple tiny stripes. (#42004) (#44239)
bp #42004 Co-authored-by: kaka11chen <kaka11.chen@gmail.com>
1 parent 75f8323 commit 702abbf

File tree

15 files changed

+3042
-16
lines changed

15 files changed

+3042
-16
lines changed

be/src/io/fs/buffered_reader.cpp

+102
Original file line numberDiff line numberDiff line change
@@ -874,5 +874,107 @@ Status DelegateReader::create_file_reader(RuntimeProfile* profile,
874874
}
875875
return Status();
876876
}
877+
878+
Status LinearProbeRangeFinder::get_range_for(int64_t desired_offset,
879+
io::PrefetchRange& result_range) {
880+
while (index < _ranges.size()) {
881+
io::PrefetchRange& range = _ranges[index];
882+
if (range.end_offset > desired_offset) {
883+
if (range.start_offset > desired_offset) [[unlikely]] {
884+
return Status::InvalidArgument("Invalid desiredOffset");
885+
}
886+
result_range = range;
887+
return Status::OK();
888+
}
889+
++index;
890+
}
891+
return Status::InvalidArgument("Invalid desiredOffset");
892+
}
893+
894+
RangeCacheFileReader::RangeCacheFileReader(RuntimeProfile* profile, io::FileReaderSPtr inner_reader,
895+
std::shared_ptr<RangeFinder> range_finder)
896+
: _profile(profile),
897+
_inner_reader(std::move(inner_reader)),
898+
_range_finder(std::move(range_finder)) {
899+
_size = _inner_reader->size();
900+
uint64_t max_cache_size =
901+
std::max((uint64_t)4096, (uint64_t)_range_finder->get_max_range_size());
902+
_cache = OwnedSlice(max_cache_size);
903+
904+
if (_profile != nullptr) {
905+
const char* random_profile = "RangeCacheFileReader";
906+
ADD_TIMER_WITH_LEVEL(_profile, random_profile, 1);
907+
_request_io =
908+
ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "RequestIO", TUnit::UNIT, random_profile, 1);
909+
_request_bytes = ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "RequestBytes", TUnit::BYTES,
910+
random_profile, 1);
911+
_request_time = ADD_CHILD_TIMER_WITH_LEVEL(_profile, "RequestTime", random_profile, 1);
912+
_read_to_cache_time =
913+
ADD_CHILD_TIMER_WITH_LEVEL(_profile, "ReadToCacheTime", random_profile, 1);
914+
_cache_refresh_count = ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "CacheRefreshCount",
915+
TUnit::UNIT, random_profile, 1);
916+
_read_to_cache_bytes = ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "ReadToCacheBytes",
917+
TUnit::BYTES, random_profile, 1);
918+
}
919+
}
920+
921+
Status RangeCacheFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read,
922+
const IOContext* io_ctx) {
923+
auto request_size = result.size;
924+
925+
_cache_statistics.request_io++;
926+
_cache_statistics.request_bytes += request_size;
927+
SCOPED_RAW_TIMER(&_cache_statistics.request_time);
928+
929+
PrefetchRange range;
930+
if (_range_finder->get_range_for(offset, range)) [[likely]] {
931+
if (_current_start_offset != range.start_offset) { // need read new range to cache.
932+
auto range_size = range.end_offset - range.start_offset;
933+
934+
_cache_statistics.cache_refresh_count++;
935+
_cache_statistics.read_to_cache_bytes += range_size;
936+
SCOPED_RAW_TIMER(&_cache_statistics.read_to_cache_time);
937+
938+
Slice cache_slice = {_cache.data(), range_size};
939+
RETURN_IF_ERROR(
940+
_inner_reader->read_at(range.start_offset, cache_slice, bytes_read, io_ctx));
941+
942+
if (*bytes_read != range_size) [[unlikely]] {
943+
return Status::InternalError(
944+
"RangeCacheFileReader use inner reader read bytes {} not eq expect size {}",
945+
*bytes_read, range_size);
946+
}
947+
948+
_current_start_offset = range.start_offset;
949+
}
950+
951+
int64_t buffer_offset = offset - _current_start_offset;
952+
memcpy(result.data, _cache.data() + buffer_offset, request_size);
953+
*bytes_read = request_size;
954+
955+
return Status::OK();
956+
} else {
957+
return Status::InternalError("RangeCacheFileReader read not in Ranges. Offset = {}",
958+
offset);
959+
// RETURN_IF_ERROR(_inner_reader->read_at(offset, result , bytes_read, io_ctx));
960+
// return Status::OK();
961+
// think return error is ok,otherwise it will cover up the error.
962+
}
963+
}
964+
965+
void RangeCacheFileReader::_collect_profile_before_close() {
966+
if (_profile != nullptr) {
967+
COUNTER_UPDATE(_request_io, _cache_statistics.request_io);
968+
COUNTER_UPDATE(_request_bytes, _cache_statistics.request_bytes);
969+
COUNTER_UPDATE(_request_time, _cache_statistics.request_time);
970+
COUNTER_UPDATE(_read_to_cache_time, _cache_statistics.read_to_cache_time);
971+
COUNTER_UPDATE(_cache_refresh_count, _cache_statistics.cache_refresh_count);
972+
COUNTER_UPDATE(_read_to_cache_bytes, _cache_statistics.read_to_cache_bytes);
973+
if (_inner_reader != nullptr) {
974+
_inner_reader->collect_profile_before_close();
975+
}
976+
}
977+
}
978+
877979
} // namespace io
878980
} // namespace doris

be/src/io/fs/buffered_reader.h

+143
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,149 @@ struct PrefetchRange {
5353
: start_offset(start_offset), end_offset(end_offset) {}
5454

5555
PrefetchRange() : start_offset(0), end_offset(0) {}
56+
57+
bool operator==(const PrefetchRange& other) const {
58+
return (start_offset == other.start_offset) && (end_offset == other.end_offset);
59+
}
60+
61+
bool operator!=(const PrefetchRange& other) const { return !(*this == other); }
62+
63+
PrefetchRange span(const PrefetchRange& other) const {
64+
return {std::min(start_offset, other.end_offset), std::max(start_offset, other.end_offset)};
65+
}
66+
PrefetchRange seq_span(const PrefetchRange& other) const {
67+
return {start_offset, other.end_offset};
68+
}
69+
70+
//Ranges needs to be sorted.
71+
static std::vector<PrefetchRange> merge_adjacent_seq_ranges(
72+
const std::vector<PrefetchRange>& seq_ranges, int64_t max_merge_distance_bytes,
73+
int64_t once_max_read_bytes) {
74+
if (seq_ranges.empty()) {
75+
return {};
76+
}
77+
// Merge overlapping ranges
78+
std::vector<PrefetchRange> result;
79+
PrefetchRange last = seq_ranges.front();
80+
for (size_t i = 1; i < seq_ranges.size(); ++i) {
81+
PrefetchRange current = seq_ranges[i];
82+
PrefetchRange merged = last.seq_span(current);
83+
if (merged.end_offset <= once_max_read_bytes + merged.start_offset &&
84+
last.end_offset + max_merge_distance_bytes >= current.start_offset) {
85+
last = merged;
86+
} else {
87+
result.push_back(last);
88+
last = current;
89+
}
90+
}
91+
result.push_back(last);
92+
return result;
93+
}
94+
};
95+
96+
class RangeFinder {
97+
public:
98+
virtual ~RangeFinder() = default;
99+
virtual Status get_range_for(int64_t desired_offset, io::PrefetchRange& result_range) = 0;
100+
virtual size_t get_max_range_size() const = 0;
101+
};
102+
103+
class LinearProbeRangeFinder : public RangeFinder {
104+
public:
105+
LinearProbeRangeFinder(std::vector<io::PrefetchRange>&& ranges) : _ranges(std::move(ranges)) {}
106+
107+
Status get_range_for(int64_t desired_offset, io::PrefetchRange& result_range) override;
108+
109+
size_t get_max_range_size() const override {
110+
size_t max_range_size = 0;
111+
for (const auto& range : _ranges) {
112+
max_range_size = std::max(max_range_size, range.end_offset - range.start_offset);
113+
}
114+
return max_range_size;
115+
}
116+
117+
~LinearProbeRangeFinder() override = default;
118+
119+
private:
120+
std::vector<io::PrefetchRange> _ranges;
121+
size_t index {0};
122+
};
123+
124+
/**
125+
* The reader provides a solution to read one range at a time. You can customize RangeFinder to meet your scenario.
126+
* For me, since there will be tiny stripes when reading orc files, in order to reduce the requests to hdfs,
127+
* I first merge the access to the orc files to be read (of course there is a problem of read amplification,
128+
* but in my scenario, compared with reading hdfs multiple times, it is faster to read more data on hdfs at one time),
129+
* and then because the actual reading of orc files is in order from front to back, I provide LinearProbeRangeFinder.
130+
*/
131+
class RangeCacheFileReader : public io::FileReader {
132+
struct RangeCacheReaderStatistics {
133+
int64_t request_io = 0;
134+
int64_t request_bytes = 0;
135+
int64_t request_time = 0;
136+
int64_t read_to_cache_time = 0;
137+
int64_t cache_refresh_count = 0;
138+
int64_t read_to_cache_bytes = 0;
139+
};
140+
141+
public:
142+
RangeCacheFileReader(RuntimeProfile* profile, io::FileReaderSPtr inner_reader,
143+
std::shared_ptr<RangeFinder> range_finder);
144+
145+
~RangeCacheFileReader() override = default;
146+
147+
Status close() override {
148+
if (!_closed) {
149+
_closed = true;
150+
}
151+
return Status::OK();
152+
}
153+
154+
const io::Path& path() const override { return _inner_reader->path(); }
155+
156+
size_t size() const override { return _size; }
157+
158+
bool closed() const override { return _closed; }
159+
160+
std::shared_ptr<io::FileSystem> fs() const override { return _inner_reader->fs(); }
161+
162+
protected:
163+
Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
164+
const IOContext* io_ctx) override;
165+
166+
void _collect_profile_before_close() override;
167+
168+
private:
169+
RuntimeProfile* _profile = nullptr;
170+
io::FileReaderSPtr _inner_reader;
171+
std::shared_ptr<RangeFinder> _range_finder;
172+
173+
OwnedSlice _cache;
174+
int64_t _current_start_offset = -1;
175+
176+
size_t _size;
177+
bool _closed = false;
178+
179+
RuntimeProfile::Counter* _request_io = nullptr;
180+
RuntimeProfile::Counter* _request_bytes = nullptr;
181+
RuntimeProfile::Counter* _request_time = nullptr;
182+
RuntimeProfile::Counter* _read_to_cache_time = nullptr;
183+
RuntimeProfile::Counter* _cache_refresh_count = nullptr;
184+
RuntimeProfile::Counter* _read_to_cache_bytes = nullptr;
185+
RangeCacheReaderStatistics _cache_statistics;
186+
/**
187+
* `RangeCacheFileReader`:
188+
* 1. `CacheRefreshCount`: how many IOs are merged
189+
* 2. `ReadToCacheBytes`: how much data is actually read after merging
190+
* 3. `ReadToCacheTime`: how long it takes to read data after merging
191+
* 4. `RequestBytes`: how many bytes does the apache-orc library actually need to read the orc file
192+
* 5. `RequestIO`: how many times the apache-orc library calls this read interface
193+
* 6. `RequestTime`: how long it takes the apache-orc library to call this read interface
194+
*
195+
* It should be noted that `RangeCacheFileReader` is a wrapper of the reader that actually reads data,such as
196+
* the hdfs reader, so strictly speaking, `CacheRefreshCount` is not equal to how many IOs are initiated to hdfs,
197+
* because each time the hdfs reader is requested, the hdfs reader may not be able to read all the data at once.
198+
*/
56199
};
57200

58201
/**

be/src/vec/exec/format/orc/vorc_reader.cpp

+67-13
Original file line numberDiff line numberDiff line change
@@ -857,28 +857,79 @@ Status OrcReader::set_fill_columns(
857857
if (_colname_to_value_range == nullptr || !_init_search_argument(_colname_to_value_range)) {
858858
_lazy_read_ctx.can_lazy_read = false;
859859
}
860+
try {
861+
_row_reader_options.range(_range_start_offset, _range_size);
862+
_row_reader_options.setTimezoneName(_ctz == "CST" ? "Asia/Shanghai" : _ctz);
863+
_row_reader_options.include(_read_cols);
864+
_row_reader_options.setEnableLazyDecoding(true);
860865

861-
if (!_lazy_read_ctx.can_lazy_read) {
862-
for (auto& kv : _lazy_read_ctx.predicate_partition_columns) {
863-
_lazy_read_ctx.partition_columns.emplace(kv.first, kv.second);
866+
uint64_t number_of_stripes = _reader->getNumberOfStripes();
867+
auto all_stripes_needed = _reader->getNeedReadStripes(_row_reader_options);
868+
869+
int64_t range_end_offset = _range_start_offset + _range_size;
870+
871+
// If you set "orc_tiny_stripe_threshold_bytes" = 0, the use tiny stripes merge io optimization will not be used.
872+
int64_t orc_tiny_stripe_threshold_bytes = 8L * 1024L * 1024L;
873+
int64_t orc_once_max_read_bytes = 8L * 1024L * 1024L;
874+
int64_t orc_max_merge_distance_bytes = 1L * 1024L * 1024L;
875+
876+
if (_state != nullptr) {
877+
orc_tiny_stripe_threshold_bytes =
878+
_state->query_options().orc_tiny_stripe_threshold_bytes;
879+
orc_once_max_read_bytes = _state->query_options().orc_once_max_read_bytes;
880+
orc_max_merge_distance_bytes = _state->query_options().orc_max_merge_distance_bytes;
864881
}
865-
for (auto& kv : _lazy_read_ctx.predicate_missing_columns) {
866-
_lazy_read_ctx.missing_columns.emplace(kv.first, kv.second);
882+
883+
bool all_tiny_stripes = true;
884+
std::vector<io::PrefetchRange> tiny_stripe_ranges;
885+
886+
for (uint64_t i = 0; i < number_of_stripes; i++) {
887+
std::unique_ptr<orc::StripeInformation> strip_info = _reader->getStripe(i);
888+
uint64_t strip_start_offset = strip_info->getOffset();
889+
uint64_t strip_end_offset = strip_start_offset + strip_info->getLength();
890+
891+
if (strip_start_offset >= range_end_offset || strip_end_offset < _range_start_offset ||
892+
!all_stripes_needed[i]) {
893+
continue;
894+
}
895+
if (strip_info->getLength() > orc_tiny_stripe_threshold_bytes) {
896+
all_tiny_stripes = false;
897+
break;
898+
}
899+
900+
tiny_stripe_ranges.emplace_back(strip_start_offset, strip_end_offset);
867901
}
868-
}
902+
if (all_tiny_stripes && number_of_stripes > 0) {
903+
std::vector<io::PrefetchRange> prefetch_merge_ranges =
904+
io::PrefetchRange::merge_adjacent_seq_ranges(tiny_stripe_ranges,
905+
orc_max_merge_distance_bytes,
906+
orc_once_max_read_bytes);
907+
auto range_finder =
908+
std::make_shared<io::LinearProbeRangeFinder>(std::move(prefetch_merge_ranges));
869909

870-
_fill_all_columns = true;
910+
auto* orc_input_stream_ptr = static_cast<ORCFileInputStream*>(_reader->getStream());
911+
orc_input_stream_ptr->set_all_tiny_stripes();
912+
auto& orc_file_reader = orc_input_stream_ptr->get_file_reader();
913+
auto orc_inner_reader = orc_input_stream_ptr->get_inner_reader();
914+
orc_file_reader = std::make_shared<io::RangeCacheFileReader>(_profile, orc_inner_reader,
915+
range_finder);
916+
}
871917

872-
// create orc row reader
873-
try {
874-
_row_reader_options.range(_range_start_offset, _range_size);
875-
_row_reader_options.setTimezoneName(_ctz == "CST" ? "Asia/Shanghai" : _ctz);
876-
_row_reader_options.include(_read_cols);
918+
if (!_lazy_read_ctx.can_lazy_read) {
919+
for (auto& kv : _lazy_read_ctx.predicate_partition_columns) {
920+
_lazy_read_ctx.partition_columns.emplace(kv.first, kv.second);
921+
}
922+
for (auto& kv : _lazy_read_ctx.predicate_missing_columns) {
923+
_lazy_read_ctx.missing_columns.emplace(kv.first, kv.second);
924+
}
925+
}
926+
927+
_fill_all_columns = true;
928+
// create orc row reader
877929
if (_lazy_read_ctx.can_lazy_read) {
878930
_row_reader_options.filter(_lazy_read_ctx.predicate_orc_columns);
879931
_orc_filter = std::unique_ptr<ORCFilterImpl>(new ORCFilterImpl(this));
880932
}
881-
_row_reader_options.setEnableLazyDecoding(true);
882933
if (!_lazy_read_ctx.conjuncts.empty()) {
883934
_string_dict_filter = std::make_unique<StringDictFilterImpl>(this);
884935
}
@@ -2416,6 +2467,9 @@ MutableColumnPtr OrcReader::_convert_dict_column_to_string_column(
24162467
void ORCFileInputStream::beforeReadStripe(
24172468
std::unique_ptr<orc::StripeInformation> current_strip_information,
24182469
std::vector<bool> selected_columns) {
2470+
if (_is_all_tiny_stripes) {
2471+
return;
2472+
}
24192473
if (_file_reader != nullptr) {
24202474
_file_reader->collect_profile_before_close();
24212475
}

0 commit comments

Comments
 (0)