Skip to content

Commit 3e64a93

Browse files
hubgeterkaka11chen
andcommitted
[Opt](orc)Optimize the merge io when orc reader read multiple tiny stripes. (apache#42004)
### What problem does this PR solve? When reading orc files, we may encounter a scenario where the stripe byte size is very small but the number of stripes is very large. This pr introduces three session variables `orc_tiny_stripe_threshold_bytes`, `orc_once_max_read_bytes`, and `orc_max_merge_distance_bytes` to optimize io reading for the above scenarios. If a stripe byte size is less than `orc_tiny_stripe_threshold_bytes`, we will consider it as a tiny stripe. For multiple tiny stripes, we will perform IO merge reading according to the `orc_once_max_read_bytes` and `orc_max_merge_distance_bytes` parameters. Among them, `orc_once_max_read_bytes` indicates the maximum size of the merged IO. You should not set `orc_once_max_read_bytes` less than `orc_tiny_stripe_threshold_bytes`, although we will not force an error. When using tiny stripe reading optimization, since tiny stripes are not necessarily continuous, when the distance between two tiny stripes is greater than `orc_max_merge_distance_bytes`, we will not merge them into one IO. If you don't want to use this optimization, you can `set orc_tiny_stripe_threshold_bytes = 0`. Default parameters: ```mysql orc_tiny_stripe_threshold_bytes = 8388608 (8M) orc_once_max_read_bytes = 8388608 (8M) orc_max_merge_distance_bytes = 1048576 (1M) ``` We also add relevant profiles for this purpose so that parameters can be adjusted to optimize reading. `RangeCacheFileReader`: 1. `CacheRefreshCount`: how many IOs are merged 2. `ReadToCacheBytes`: how much data is actually read after merging 3. `ReadToCacheTime`: how long it takes to read data after merging 4. `RequestBytes`: how many bytes does the apache-orc library actually need to read the orc file 5. `RequestIO`: how many times the apache-orc library calls this read interface 6. `RequestTime`: how long it takes the apache-orc library to call this read interface It should be noted that `RangeCacheFileReader` is a wrapper of the reader that actually reads data, such as the hdfs reader, so strictly speaking, `CacheRefreshCount` is not equal to how many IOs are initiated to hdfs, because each time the hdfs reader is requested, the hdfs reader may not be able to read all the data at once. This pr also involves changes to the apache-orc third-party library: apache/doris-thirdparty#244. Reference implementation: https://github.com/trinodb/trino/blob/master/lib/trino-orc/src/main/java/io/trino/orc/OrcDataSourceUtils.java#L36 #### Summary: ```mysql set orc_tiny_stripe_threshold_bytes = xxx; set orc_once_max_read_bytes = xxx; set orc_max_merge_distance_bytes = xxx; # xxx is the size in bytes ``` ### Release note Introduces three session variables `orc_tiny_stripe_threshold_bytes`, `orc_once_max_read_bytes`, and `orc_max_merge_distance_bytes` to optimize io reading of scenarios where the orc stripe byte size is very small but the number of stripes is very large. Co-authored-by: kaka11chen <kaka11.chen@gmail.com> Co-authored-by: daidai <changyuwei@selectdb.com>
1 parent b63b97a commit 3e64a93

File tree

15 files changed

+3038
-16
lines changed

15 files changed

+3038
-16
lines changed

be/src/io/fs/buffered_reader.cpp

+102
Original file line numberDiff line numberDiff line change
@@ -874,5 +874,107 @@ Status DelegateReader::create_file_reader(RuntimeProfile* profile,
874874
}
875875
return Status();
876876
}
877+
878+
Status LinearProbeRangeFinder::get_range_for(int64_t desired_offset,
879+
io::PrefetchRange& result_range) {
880+
while (index < _ranges.size()) {
881+
io::PrefetchRange& range = _ranges[index];
882+
if (range.end_offset > desired_offset) {
883+
if (range.start_offset > desired_offset) [[unlikely]] {
884+
return Status::InvalidArgument("Invalid desiredOffset");
885+
}
886+
result_range = range;
887+
return Status::OK();
888+
}
889+
++index;
890+
}
891+
return Status::InvalidArgument("Invalid desiredOffset");
892+
}
893+
894+
RangeCacheFileReader::RangeCacheFileReader(RuntimeProfile* profile, io::FileReaderSPtr inner_reader,
895+
std::shared_ptr<RangeFinder> range_finder)
896+
: _profile(profile),
897+
_inner_reader(std::move(inner_reader)),
898+
_range_finder(std::move(range_finder)) {
899+
_size = _inner_reader->size();
900+
uint64_t max_cache_size =
901+
std::max((uint64_t)4096, (uint64_t)_range_finder->get_max_range_size());
902+
_cache = OwnedSlice(max_cache_size);
903+
904+
if (_profile != nullptr) {
905+
const char* random_profile = "RangeCacheFileReader";
906+
ADD_TIMER_WITH_LEVEL(_profile, random_profile, 1);
907+
_request_io =
908+
ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "RequestIO", TUnit::UNIT, random_profile, 1);
909+
_request_bytes = ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "RequestBytes", TUnit::BYTES,
910+
random_profile, 1);
911+
_request_time = ADD_CHILD_TIMER_WITH_LEVEL(_profile, "RequestTime", random_profile, 1);
912+
_read_to_cache_time =
913+
ADD_CHILD_TIMER_WITH_LEVEL(_profile, "ReadToCacheTime", random_profile, 1);
914+
_cache_refresh_count = ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "CacheRefreshCount",
915+
TUnit::UNIT, random_profile, 1);
916+
_read_to_cache_bytes = ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "ReadToCacheBytes",
917+
TUnit::BYTES, random_profile, 1);
918+
}
919+
}
920+
921+
Status RangeCacheFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read,
922+
const IOContext* io_ctx) {
923+
auto request_size = result.size;
924+
925+
_cache_statistics.request_io++;
926+
_cache_statistics.request_bytes += request_size;
927+
SCOPED_RAW_TIMER(&_cache_statistics.request_time);
928+
929+
PrefetchRange range;
930+
if (_range_finder->get_range_for(offset, range)) [[likely]] {
931+
if (_current_start_offset != range.start_offset) { // need read new range to cache.
932+
auto range_size = range.end_offset - range.start_offset;
933+
934+
_cache_statistics.cache_refresh_count++;
935+
_cache_statistics.read_to_cache_bytes += range_size;
936+
SCOPED_RAW_TIMER(&_cache_statistics.read_to_cache_time);
937+
938+
Slice cache_slice = {_cache.data(), range_size};
939+
RETURN_IF_ERROR(
940+
_inner_reader->read_at(range.start_offset, cache_slice, bytes_read, io_ctx));
941+
942+
if (*bytes_read != range_size) [[unlikely]] {
943+
return Status::InternalError(
944+
"RangeCacheFileReader use inner reader read bytes {} not eq expect size {}",
945+
*bytes_read, range_size);
946+
}
947+
948+
_current_start_offset = range.start_offset;
949+
}
950+
951+
int64_t buffer_offset = offset - _current_start_offset;
952+
memcpy(result.data, _cache.data() + buffer_offset, request_size);
953+
*bytes_read = request_size;
954+
955+
return Status::OK();
956+
} else {
957+
return Status::InternalError("RangeCacheFileReader read not in Ranges. Offset = {}",
958+
offset);
959+
// RETURN_IF_ERROR(_inner_reader->read_at(offset, result , bytes_read, io_ctx));
960+
// return Status::OK();
961+
// think return error is ok,otherwise it will cover up the error.
962+
}
963+
}
964+
965+
void RangeCacheFileReader::_collect_profile_before_close() {
966+
if (_profile != nullptr) {
967+
COUNTER_UPDATE(_request_io, _cache_statistics.request_io);
968+
COUNTER_UPDATE(_request_bytes, _cache_statistics.request_bytes);
969+
COUNTER_UPDATE(_request_time, _cache_statistics.request_time);
970+
COUNTER_UPDATE(_read_to_cache_time, _cache_statistics.read_to_cache_time);
971+
COUNTER_UPDATE(_cache_refresh_count, _cache_statistics.cache_refresh_count);
972+
COUNTER_UPDATE(_read_to_cache_bytes, _cache_statistics.read_to_cache_bytes);
973+
if (_inner_reader != nullptr) {
974+
_inner_reader->collect_profile_before_close();
975+
}
976+
}
977+
}
978+
877979
} // namespace io
878980
} // namespace doris

be/src/io/fs/buffered_reader.h

+141
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,147 @@ struct PrefetchRange {
5353
: start_offset(start_offset), end_offset(end_offset) {}
5454

5555
PrefetchRange() : start_offset(0), end_offset(0) {}
56+
57+
bool operator==(const PrefetchRange& other) const {
58+
return (start_offset == other.start_offset) && (end_offset == other.end_offset);
59+
}
60+
61+
bool operator!=(const PrefetchRange& other) const { return !(*this == other); }
62+
63+
PrefetchRange span(const PrefetchRange& other) const {
64+
return {std::min(start_offset, other.end_offset), std::max(start_offset, other.end_offset)};
65+
}
66+
PrefetchRange seq_span(const PrefetchRange& other) const {
67+
return {start_offset, other.end_offset};
68+
}
69+
70+
//Ranges needs to be sorted.
71+
static std::vector<PrefetchRange> merge_adjacent_seq_ranges(
72+
const std::vector<PrefetchRange>& seq_ranges, int64_t max_merge_distance_bytes,
73+
int64_t once_max_read_bytes) {
74+
if (seq_ranges.empty()) {
75+
return {};
76+
}
77+
// Merge overlapping ranges
78+
std::vector<PrefetchRange> result;
79+
PrefetchRange last = seq_ranges.front();
80+
for (size_t i = 1; i < seq_ranges.size(); ++i) {
81+
PrefetchRange current = seq_ranges[i];
82+
PrefetchRange merged = last.seq_span(current);
83+
if (merged.end_offset <= once_max_read_bytes + merged.start_offset &&
84+
last.end_offset + max_merge_distance_bytes >= current.start_offset) {
85+
last = merged;
86+
} else {
87+
result.push_back(last);
88+
last = current;
89+
}
90+
}
91+
result.push_back(last);
92+
return result;
93+
}
94+
};
95+
96+
class RangeFinder {
97+
public:
98+
virtual ~RangeFinder() = default;
99+
virtual Status get_range_for(int64_t desired_offset, io::PrefetchRange& result_range) = 0;
100+
virtual size_t get_max_range_size() const = 0;
101+
};
102+
103+
class LinearProbeRangeFinder : public RangeFinder {
104+
public:
105+
LinearProbeRangeFinder(std::vector<io::PrefetchRange>&& ranges) : _ranges(std::move(ranges)) {}
106+
107+
Status get_range_for(int64_t desired_offset, io::PrefetchRange& result_range) override;
108+
109+
size_t get_max_range_size() const override {
110+
size_t max_range_size = 0;
111+
for (const auto& range : _ranges) {
112+
max_range_size = std::max(max_range_size, range.end_offset - range.start_offset);
113+
}
114+
return max_range_size;
115+
}
116+
117+
~LinearProbeRangeFinder() override = default;
118+
119+
private:
120+
std::vector<io::PrefetchRange> _ranges;
121+
size_t index {0};
122+
};
123+
124+
/**
125+
* The reader provides a solution to read one range at a time. You can customize RangeFinder to meet your scenario.
126+
* For me, since there will be tiny stripes when reading orc files, in order to reduce the requests to hdfs,
127+
* I first merge the access to the orc files to be read (of course there is a problem of read amplification,
128+
* but in my scenario, compared with reading hdfs multiple times, it is faster to read more data on hdfs at one time),
129+
* and then because the actual reading of orc files is in order from front to back, I provide LinearProbeRangeFinder.
130+
*/
131+
class RangeCacheFileReader : public io::FileReader {
132+
struct RangeCacheReaderStatistics {
133+
int64_t request_io = 0;
134+
int64_t request_bytes = 0;
135+
int64_t request_time = 0;
136+
int64_t read_to_cache_time = 0;
137+
int64_t cache_refresh_count = 0;
138+
int64_t read_to_cache_bytes = 0;
139+
};
140+
141+
public:
142+
RangeCacheFileReader(RuntimeProfile* profile, io::FileReaderSPtr inner_reader,
143+
std::shared_ptr<RangeFinder> range_finder);
144+
145+
~RangeCacheFileReader() override = default;
146+
147+
Status close() override {
148+
if (!_closed) {
149+
_closed = true;
150+
}
151+
return Status::OK();
152+
}
153+
154+
const io::Path& path() const override { return _inner_reader->path(); }
155+
156+
size_t size() const override { return _size; }
157+
158+
bool closed() const override { return _closed; }
159+
160+
protected:
161+
Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
162+
const IOContext* io_ctx) override;
163+
164+
void _collect_profile_before_close() override;
165+
166+
private:
167+
RuntimeProfile* _profile = nullptr;
168+
io::FileReaderSPtr _inner_reader;
169+
std::shared_ptr<RangeFinder> _range_finder;
170+
171+
OwnedSlice _cache;
172+
int64_t _current_start_offset = -1;
173+
174+
size_t _size;
175+
bool _closed = false;
176+
177+
RuntimeProfile::Counter* _request_io = nullptr;
178+
RuntimeProfile::Counter* _request_bytes = nullptr;
179+
RuntimeProfile::Counter* _request_time = nullptr;
180+
RuntimeProfile::Counter* _read_to_cache_time = nullptr;
181+
RuntimeProfile::Counter* _cache_refresh_count = nullptr;
182+
RuntimeProfile::Counter* _read_to_cache_bytes = nullptr;
183+
RangeCacheReaderStatistics _cache_statistics;
184+
/**
185+
* `RangeCacheFileReader`:
186+
* 1. `CacheRefreshCount`: how many IOs are merged
187+
* 2. `ReadToCacheBytes`: how much data is actually read after merging
188+
* 3. `ReadToCacheTime`: how long it takes to read data after merging
189+
* 4. `RequestBytes`: how many bytes does the apache-orc library actually need to read the orc file
190+
* 5. `RequestIO`: how many times the apache-orc library calls this read interface
191+
* 6. `RequestTime`: how long it takes the apache-orc library to call this read interface
192+
*
193+
* It should be noted that `RangeCacheFileReader` is a wrapper of the reader that actually reads data,such as
194+
* the hdfs reader, so strictly speaking, `CacheRefreshCount` is not equal to how many IOs are initiated to hdfs,
195+
* because each time the hdfs reader is requested, the hdfs reader may not be able to read all the data at once.
196+
*/
56197
};
57198

58199
/**

be/src/vec/exec/format/orc/vorc_reader.cpp

+67-13
Original file line numberDiff line numberDiff line change
@@ -857,28 +857,79 @@ Status OrcReader::set_fill_columns(
857857
if (_colname_to_value_range == nullptr || !_init_search_argument(_colname_to_value_range)) {
858858
_lazy_read_ctx.can_lazy_read = false;
859859
}
860+
try {
861+
_row_reader_options.range(_range_start_offset, _range_size);
862+
_row_reader_options.setTimezoneName(_ctz == "CST" ? "Asia/Shanghai" : _ctz);
863+
_row_reader_options.include(_read_cols);
864+
_row_reader_options.setEnableLazyDecoding(true);
860865

861-
if (!_lazy_read_ctx.can_lazy_read) {
862-
for (auto& kv : _lazy_read_ctx.predicate_partition_columns) {
863-
_lazy_read_ctx.partition_columns.emplace(kv.first, kv.second);
866+
uint64_t number_of_stripes = _reader->getNumberOfStripes();
867+
auto all_stripes_needed = _reader->getNeedReadStripes(_row_reader_options);
868+
869+
int64_t range_end_offset = _range_start_offset + _range_size;
870+
871+
// If you set "orc_tiny_stripe_threshold_bytes" = 0, the use tiny stripes merge io optimization will not be used.
872+
int64_t orc_tiny_stripe_threshold_bytes = 8L * 1024L * 1024L;
873+
int64_t orc_once_max_read_bytes = 8L * 1024L * 1024L;
874+
int64_t orc_max_merge_distance_bytes = 1L * 1024L * 1024L;
875+
876+
if (_state != nullptr) {
877+
orc_tiny_stripe_threshold_bytes =
878+
_state->query_options().orc_tiny_stripe_threshold_bytes;
879+
orc_once_max_read_bytes = _state->query_options().orc_once_max_read_bytes;
880+
orc_max_merge_distance_bytes = _state->query_options().orc_max_merge_distance_bytes;
864881
}
865-
for (auto& kv : _lazy_read_ctx.predicate_missing_columns) {
866-
_lazy_read_ctx.missing_columns.emplace(kv.first, kv.second);
882+
883+
bool all_tiny_stripes = true;
884+
std::vector<io::PrefetchRange> tiny_stripe_ranges;
885+
886+
for (uint64_t i = 0; i < number_of_stripes; i++) {
887+
std::unique_ptr<orc::StripeInformation> strip_info = _reader->getStripe(i);
888+
uint64_t strip_start_offset = strip_info->getOffset();
889+
uint64_t strip_end_offset = strip_start_offset + strip_info->getLength();
890+
891+
if (strip_start_offset >= range_end_offset || strip_end_offset < _range_start_offset ||
892+
!all_stripes_needed[i]) {
893+
continue;
894+
}
895+
if (strip_info->getLength() > orc_tiny_stripe_threshold_bytes) {
896+
all_tiny_stripes = false;
897+
break;
898+
}
899+
900+
tiny_stripe_ranges.emplace_back(strip_start_offset, strip_end_offset);
867901
}
868-
}
902+
if (all_tiny_stripes && number_of_stripes > 0) {
903+
std::vector<io::PrefetchRange> prefetch_merge_ranges =
904+
io::PrefetchRange::merge_adjacent_seq_ranges(tiny_stripe_ranges,
905+
orc_max_merge_distance_bytes,
906+
orc_once_max_read_bytes);
907+
auto range_finder =
908+
std::make_shared<io::LinearProbeRangeFinder>(std::move(prefetch_merge_ranges));
869909

870-
_fill_all_columns = true;
910+
auto* orc_input_stream_ptr = static_cast<ORCFileInputStream*>(_reader->getStream());
911+
orc_input_stream_ptr->set_all_tiny_stripes();
912+
auto& orc_file_reader = orc_input_stream_ptr->get_file_reader();
913+
auto orc_inner_reader = orc_input_stream_ptr->get_inner_reader();
914+
orc_file_reader = std::make_shared<io::RangeCacheFileReader>(_profile, orc_inner_reader,
915+
range_finder);
916+
}
871917

872-
// create orc row reader
873-
try {
874-
_row_reader_options.range(_range_start_offset, _range_size);
875-
_row_reader_options.setTimezoneName(_ctz == "CST" ? "Asia/Shanghai" : _ctz);
876-
_row_reader_options.include(_read_cols);
918+
if (!_lazy_read_ctx.can_lazy_read) {
919+
for (auto& kv : _lazy_read_ctx.predicate_partition_columns) {
920+
_lazy_read_ctx.partition_columns.emplace(kv.first, kv.second);
921+
}
922+
for (auto& kv : _lazy_read_ctx.predicate_missing_columns) {
923+
_lazy_read_ctx.missing_columns.emplace(kv.first, kv.second);
924+
}
925+
}
926+
927+
_fill_all_columns = true;
928+
// create orc row reader
877929
if (_lazy_read_ctx.can_lazy_read) {
878930
_row_reader_options.filter(_lazy_read_ctx.predicate_orc_columns);
879931
_orc_filter = std::unique_ptr<ORCFilterImpl>(new ORCFilterImpl(this));
880932
}
881-
_row_reader_options.setEnableLazyDecoding(true);
882933
if (!_lazy_read_ctx.conjuncts.empty()) {
883934
_string_dict_filter = std::make_unique<StringDictFilterImpl>(this);
884935
}
@@ -2416,6 +2467,9 @@ MutableColumnPtr OrcReader::_convert_dict_column_to_string_column(
24162467
void ORCFileInputStream::beforeReadStripe(
24172468
std::unique_ptr<orc::StripeInformation> current_strip_information,
24182469
std::vector<bool> selected_columns) {
2470+
if (_is_all_tiny_stripes) {
2471+
return;
2472+
}
24192473
if (_file_reader != nullptr) {
24202474
_file_reader->collect_profile_before_close();
24212475
}

0 commit comments

Comments
 (0)