From a7642efc6816a7c2f670ac56f394fc975ddbe510 Mon Sep 17 00:00:00 2001 From: Wenxuan Date: Mon, 13 Mar 2023 12:26:37 +0800 Subject: [PATCH] Fix several issues related with disaggregate remote read (#7053) ref pingcap/tiflash#6827 --- .../Flash/Disaggregated/RNPagePreparer.cpp | 14 ++- dbms/src/Flash/Disaggregated/RNPagePreparer.h | 2 +- .../Disaggregated/RNPageReceiverContext.cpp | 45 +++++++-- .../Disaggregated/RNPageReceiverContext.h | 2 - .../ColumnFile/ColumnFileSetSnapshot.h | 9 +- .../DeltaMerge/ColumnFile/ColumnFileTiny.h | 9 ++ .../DeltaMerge/Remote/Proto/remote.proto | 10 +- .../DeltaMerge/Remote/RNDataProvider.cpp | 44 +++++++++ .../DeltaMerge/Remote/RNDataProvider.h | 52 ++++++++++ .../DeltaMerge/Remote/RNDataProvider_fwd.h | 22 +++++ .../DeltaMerge/Remote/RNRemoteReadTask.cpp | 95 +++++++------------ .../DeltaMerge/Remote/RNRemoteReadTask.h | 30 +++--- .../Storages/DeltaMerge/Remote/Serializer.cpp | 14 ++- .../Storages/DeltaMerge/Remote/Serializer.h | 3 +- .../V3/CheckpointFile/Proto/data_file.proto | 4 +- .../CheckpointFile/Proto/manifest_file.proto | 8 +- dbms/src/Storages/S3/S3Common.cpp | 1 + dbms/src/Storages/Transaction/TiKVKeyValue.h | 5 +- 18 files changed, 264 insertions(+), 105 deletions(-) create mode 100644 dbms/src/Storages/DeltaMerge/Remote/RNDataProvider.cpp create mode 100644 dbms/src/Storages/DeltaMerge/Remote/RNDataProvider.h create mode 100644 dbms/src/Storages/DeltaMerge/Remote/RNDataProvider_fwd.h diff --git a/dbms/src/Flash/Disaggregated/RNPagePreparer.cpp b/dbms/src/Flash/Disaggregated/RNPagePreparer.cpp index 739acb3b57c..e9db1ba5ab4 100644 --- a/dbms/src/Flash/Disaggregated/RNPagePreparer.cpp +++ b/dbms/src/Flash/Disaggregated/RNPagePreparer.cpp @@ -53,7 +53,15 @@ RNPagePreparer::RNPagePreparer( for (size_t index = 0; index < threads_num; ++index) { auto task = std::make_shared>([this, index] { - persistLoop(index); + try + { + prepareLoop(index); + } + catch (...) + { + auto ex = getCurrentExceptionMessage(true); + LOG_ERROR(Logger::get(), "PagePrepareThread#{} read loop meet exception and exited, ex={}", index, ex); + } }); persist_threads.emplace_back(task->get_future()); @@ -82,9 +90,9 @@ RNPagePreparer::~RNPagePreparer() noexcept } } -void RNPagePreparer::persistLoop(size_t idx) +void RNPagePreparer::prepareLoop(size_t idx) { - LoggerPtr log = exc_log->getChild(fmt::format("persist{}", idx)); + LoggerPtr log = exc_log->getChild(fmt::format("PagePrepareThread#{}", idx)); bool meet_error = false; diff --git a/dbms/src/Flash/Disaggregated/RNPagePreparer.h b/dbms/src/Flash/Disaggregated/RNPagePreparer.h index 796a7fb31e1..aae4c1eb89d 100644 --- a/dbms/src/Flash/Disaggregated/RNPagePreparer.h +++ b/dbms/src/Flash/Disaggregated/RNPagePreparer.h @@ -47,7 +47,7 @@ class RNPagePreparer ~RNPagePreparer() noexcept; private: - void persistLoop(size_t idx); + void prepareLoop(size_t idx); bool consumeOneResult(const LoggerPtr & log); diff --git a/dbms/src/Flash/Disaggregated/RNPageReceiverContext.cpp b/dbms/src/Flash/Disaggregated/RNPageReceiverContext.cpp index 95721817b10..eebbc5d2ef3 100644 --- a/dbms/src/Flash/Disaggregated/RNPageReceiverContext.cpp +++ b/dbms/src/Flash/Disaggregated/RNPageReceiverContext.cpp @@ -16,6 +16,9 @@ #include #include #include +#include +#include +#include #include #include #include @@ -99,9 +102,42 @@ FetchPagesRequest::FetchPagesRequest(DM::RNRemoteSegmentReadTaskPtr seg_task_) *req->mutable_snapshot_id() = seg_task->snapshot_id.toMeta(); req->set_table_id(seg_task->table_id); req->set_segment_id(seg_task->segment_id); - for (auto page_id : seg_task->cacheMissPageIds()) + { - req->add_page_ids(page_id); + std::vector cf_tiny_oids; + cf_tiny_oids.reserve(seg_task->delta_tinycf_page_ids.size()); + for (const auto & page_id : seg_task->delta_tinycf_page_ids) + { + auto page_oid = DM::Remote::PageOID{ + .store_id = seg_task->store_id, + .table_id = seg_task->table_id, + .page_id = page_id, + }; + cf_tiny_oids.emplace_back(page_oid); + } + + // Note: We must occupySpace segment by segment, because we need to read + // at least the complete data of one segment in order to drive everything forward. + // Currently we call occupySpace for each FetchPagesRequest, which is fine, + // because we send one request each seg_task. If we want to split + // FetchPagesRequest into multiples in future, then we need to change + // the moment of calling `occupySpace`. + auto page_cache = seg_task->dm_context->db_context.getSharedContextDisagg()->rn_page_cache; + auto occupy_result = page_cache->occupySpace(cf_tiny_oids, seg_task->delta_tinycf_page_sizes); + for (auto page_id : occupy_result.pages_not_in_cache) + req->add_page_ids(page_id.page_id); + + auto cftiny_total = seg_task->delta_tinycf_page_ids.size(); + auto cftiny_fetch = occupy_result.pages_not_in_cache.size(); + LOG_INFO( + Logger::get(), + "read task local cache hit rate: {}, pages_not_in_cache={}", + cftiny_total == 0 ? "N/A" : fmt::format("{:.2f}%", 100.0 - 100.0 * cftiny_fetch / cftiny_total), + occupy_result.pages_not_in_cache); + GET_METRIC(tiflash_disaggregated_details, type_cftiny_read).Increment(cftiny_total); + GET_METRIC(tiflash_disaggregated_details, type_cftiny_fetch).Increment(cftiny_fetch); + + seg_task->initColumnFileDataProvider(occupy_result.pages_guard); } } @@ -127,11 +163,6 @@ void GRPCPagesReceiverContext::finishTaskReceive(const DM::RNRemoteSegmentReadTa remote_read_tasks->updateTaskState(seg_task, DM::SegmentReadTaskState::DataReady, false); } -void GRPCPagesReceiverContext::finishAllReceivingTasks(const String & err_msg) -{ - remote_read_tasks->allDataReceive(err_msg); -} - void GRPCPagesReceiverContext::cancelDisaggTaskOnTiFlashStorageNode(LoggerPtr /*log*/) { // TODO cancel diff --git a/dbms/src/Flash/Disaggregated/RNPageReceiverContext.h b/dbms/src/Flash/Disaggregated/RNPageReceiverContext.h index c33953886b4..cb80b5a2566 100644 --- a/dbms/src/Flash/Disaggregated/RNPageReceiverContext.h +++ b/dbms/src/Flash/Disaggregated/RNPageReceiverContext.h @@ -85,8 +85,6 @@ class GRPCPagesReceiverContext void finishTaskReceive(const DM::RNRemoteSegmentReadTaskPtr & seg_task); - void finishAllReceivingTasks(const String & err_msg); - private: // The remote segment task pool DM::RNRemoteReadTaskPtr remote_read_tasks; diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetSnapshot.h b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetSnapshot.h index 2c7ec1d6d04..781af5a6dc5 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetSnapshot.h +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetSnapshot.h @@ -60,8 +60,6 @@ class ColumnFileSetSnapshot : public std::enable_shared_from_this +#include + +namespace DB::DM::Remote +{ + +Page ColumnFileDataProviderRNLocalPageCache::readTinyData( + PageId page_id, + const std::optional> & fields) const +{ + RUNTIME_CHECK_MSG( + fields.has_value(), + "ColumnFileDataProviderRNLocalPageCache currently does not support read all data from a page"); + + auto oid = Remote::PageOID{ + .store_id = store_id, + .table_id = table_id, + .page_id = page_id, + }; + return page_cache->getPage(oid, *fields); +} + +size_t ColumnFileDataProviderRNLocalPageCache::getTinyDataSize(PageId) const +{ + RUNTIME_CHECK_MSG( + false, + "ColumnFileDataProviderRNLocalPageCache currently does not support getTinyDataSize"); +} + +} // namespace DB::DM::Remote diff --git a/dbms/src/Storages/DeltaMerge/Remote/RNDataProvider.h b/dbms/src/Storages/DeltaMerge/Remote/RNDataProvider.h new file mode 100644 index 00000000000..2b71cfd98c2 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/Remote/RNDataProvider.h @@ -0,0 +1,52 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include + +namespace DB::DM::Remote +{ + +class ColumnFileDataProviderRNLocalPageCache : public IColumnFileDataProvider +{ +private: + RNLocalPageCachePtr page_cache; + RNLocalPageCacheGuardPtr pages_guard; // Only keep for maintaining lifetime for related keys + + UInt64 store_id; + Int64 table_id; + +public: + explicit ColumnFileDataProviderRNLocalPageCache( + RNLocalPageCachePtr page_cache_, + RNLocalPageCacheGuardPtr pages_guard_, + UInt64 store_id_, + Int64 table_id_) + : page_cache(page_cache_) + , pages_guard(pages_guard_) + , store_id(store_id_) + , table_id(table_id_) + {} + + Page readTinyData( + PageId page_id, + const std::optional> & fields) const override; + + size_t getTinyDataSize(PageId page_id) const override; +}; + +} // namespace DB::DM::Remote diff --git a/dbms/src/Storages/DeltaMerge/Remote/RNDataProvider_fwd.h b/dbms/src/Storages/DeltaMerge/Remote/RNDataProvider_fwd.h new file mode 100644 index 00000000000..5a64296a535 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/Remote/RNDataProvider_fwd.h @@ -0,0 +1,22 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +namespace DB::DM::Remote +{ + +class ColumnFileDataProviderLocalPageCache; + +} diff --git a/dbms/src/Storages/DeltaMerge/Remote/RNRemoteReadTask.cpp b/dbms/src/Storages/DeltaMerge/Remote/RNRemoteReadTask.cpp index c3d2b180094..c00b01165c6 100644 --- a/dbms/src/Storages/DeltaMerge/Remote/RNRemoteReadTask.cpp +++ b/dbms/src/Storages/DeltaMerge/Remote/RNRemoteReadTask.cpp @@ -23,12 +23,14 @@ #include #include #include +#include #include #include #include #include #include #include +#include #include #include #include @@ -54,8 +56,6 @@ RNRemoteReadTask::RNRemoteReadTask(std::vector && task : num_segments(0) , log(Logger::get()) { - size_t total_num_cftiny = 0; - size_t total_num_cftiny_to_fetch = 0; for (const auto & table_task : tasks_) { if (!table_task) @@ -72,27 +72,10 @@ RNRemoteReadTask::RNRemoteReadTask(std::vector && task // blocks on write node's mem-table, then we // can simply skip the fetch page pharse and // push it into ready queue - total_num_cftiny += task->totalCFTinys(); - total_num_cftiny_to_fetch += task->cacheMissPageIds().size(); - - if (auto iter = ready_segment_tasks.find(task->state); iter != ready_segment_tasks.end()) - { - iter->second.push_back(task); - } - else - { - ready_segment_tasks.emplace(task->state, std::list{task}); - } + ready_segment_tasks[task->state].push_back(task); } } curr_store = tasks.begin(); - - LOG_INFO( - log, - "read task local cache hit rate: {}", - total_num_cftiny == 0 ? "N/A" : fmt::format("{:.2f}%", 100.0 - 100.0 * total_num_cftiny_to_fetch / total_num_cftiny)); - GET_METRIC(tiflash_disaggregated_details, type_cftiny_read).Increment(total_num_cftiny); - GET_METRIC(tiflash_disaggregated_details, type_cftiny_fetch).Increment(total_num_cftiny_to_fetch); } RNRemoteReadTask::~RNRemoteReadTask() @@ -174,12 +157,6 @@ void RNRemoteReadTask::allDataReceive(const String & end_err_msg) if (err_msg.empty() && !end_err_msg.empty()) err_msg = end_err_msg; - if (auto state_iter = ready_segment_tasks.find(SegmentReadTaskState::DataReady); - state_iter == ready_segment_tasks.end()) - { - ready_segment_tasks.emplace(SegmentReadTaskState::DataReady, std::list{}); - } - for (auto iter = ready_segment_tasks.begin(); iter != ready_segment_tasks.end(); /* empty */) { const auto state = iter->first; @@ -390,9 +367,6 @@ RNRemoteSegmentReadTask::RNRemoteSegmentReadTask( , table_id(table_id_) , segment_id(segment_id_) , address(std::move(address_)) - , total_num_cftiny(0) - , num_msg_to_consume(0) - , num_msg_consumed(0) , log(std::move(log_)) { } @@ -454,52 +428,51 @@ RNRemoteSegmentReadTaskPtr RNRemoteSegmentReadTask::buildFrom( table_id, proto); + // Note: At this moment, we still cannot read from `task->segment_snap`, + // because they are constructed using ColumnFileDataProviderNop. + { - size_t total_persisted_size = 0; auto persisted_cfs = task->segment_snap->delta->getPersistedFileSetSnapshot(); - std::vector all_persisted_ids; - all_persisted_ids.reserve(persisted_cfs->getColumnFileCount()); - std::vector page_sizes; - page_sizes.reserve(persisted_cfs->getColumnFileCount()); + std::vector persisted_ids; + std::vector persisted_sizes; + persisted_ids.reserve(persisted_cfs->getColumnFileCount()); + persisted_sizes.reserve(persisted_cfs->getColumnFileCount()); for (const auto & cfs : persisted_cfs->getColumnFiles()) { if (auto * tiny = cfs->tryToTinyFile(); tiny) { - auto page_oid = Remote::PageOID{ - .store_id = store_id, - .table_id = table_id, - .page_id = tiny->getDataPageId(), - }; - all_persisted_ids.emplace_back(page_oid); - page_sizes.emplace_back(tiny->getBytes()); - task->total_num_cftiny += 1; - total_persisted_size += tiny->getBytes(); + persisted_ids.emplace_back(tiny->getDataPageId()); + persisted_sizes.emplace_back(tiny->getDataPageSize()); } } - // FIXME: this could block for a long time, refine it later - auto occupy_space_res = db_context.getSharedContextDisagg()->rn_page_cache->occupySpace(all_persisted_ids, page_sizes); - task->page_ids_cache_miss.reserve(occupy_space_res.pages_not_in_cache.size()); - for (const auto & oid : occupy_space_res.pages_not_in_cache) - { - task->page_ids_cache_miss.emplace_back(oid.page_id); - } - task->local_cache_guard = occupy_space_res.pages_guard; - LOG_INFO( - log, - "mem-table cfs: {}, persisted cfs: {} (size={}), local cache hit rate: {}, cache_miss_oids: {}, all_oids: {}", - task->segment_snap->delta->getMemTableSetSnapshot()->getColumnFileCount(), - task->segment_snap->delta->getPersistedFileSetSnapshot()->getColumnFileCount(), - total_persisted_size, - (all_persisted_ids.empty() ? "N/A" : fmt::format("{:.2f}%", 100.0 - 100.0 * task->page_ids_cache_miss.size() / all_persisted_ids.size())), - task->cacheMissPageIds(), - all_persisted_ids); - } + task->delta_tinycf_page_ids = persisted_ids; + task->delta_tinycf_page_sizes = persisted_sizes; + LOG_INFO(log, + "Build RemoteSegmentReadTask, store_id={} table_id={} memtable_cfs={} persisted_cfs={}", + task->store_id, + task->table_id, + task->segment_snap->delta->getMemTableSetSnapshot()->getColumnFileCount(), + task->segment_snap->delta->getPersistedFileSetSnapshot()->getColumnFileCount()); + } return task; } +void RNRemoteSegmentReadTask::initColumnFileDataProvider(Remote::RNLocalPageCacheGuardPtr pages_guard) +{ + auto & data_provider = segment_snap->delta->getPersistedFileSetSnapshot()->data_provider; + RUNTIME_CHECK(std::dynamic_pointer_cast(data_provider)); + + auto page_cache = dm_context->db_context.getSharedContextDisagg()->rn_page_cache; + data_provider = std::make_shared( + page_cache, + pages_guard, + store_id, + table_id); +} + void RNRemoteSegmentReadTask::receivePage(RemotePb::RemotePage && remote_page) { std::lock_guard lock(mtx_queue); diff --git a/dbms/src/Storages/DeltaMerge/Remote/RNRemoteReadTask.h b/dbms/src/Storages/DeltaMerge/Remote/RNRemoteReadTask.h index 1a9d5b92a7c..b4894cc68fc 100644 --- a/dbms/src/Storages/DeltaMerge/Remote/RNRemoteReadTask.h +++ b/dbms/src/Storages/DeltaMerge/Remote/RNRemoteReadTask.h @@ -34,6 +34,7 @@ namespace DB { class Context; +struct FetchPagesRequest; namespace DM { namespace tests @@ -100,6 +101,7 @@ class RNRemoteReadTask const String & getErrorMessage() const; friend class tests::RemoteReadTaskTest; + friend struct DB::FetchPagesRequest; private: void insertTask(const RNRemoteSegmentReadTaskPtr & seg_task, std::unique_lock &); @@ -199,11 +201,6 @@ class RNRemoteSegmentReadTask const String & address, const LoggerPtr & log); - // The page ids that is absent from local cache - const PageIdU64s & cacheMissPageIds() const { return page_ids_cache_miss; } - - size_t totalCFTinys() const { return total_num_cftiny; } - RowKeyRanges getReadRanges() const { return read_ranges; } BlockInputStreamPtr getInputStream( @@ -214,13 +211,19 @@ class RNRemoteSegmentReadTask size_t expected_block_size); void addPendingMsg() { num_msg_to_consume += 1; } + + /// Returns true if there are more pending messages. bool addConsumedMsg() { num_msg_consumed += 1; + RUNTIME_CHECK(num_msg_consumed <= num_msg_to_consume); + // return there are more pending msg or not - return num_msg_consumed == num_msg_to_consume; + return num_msg_consumed < num_msg_to_consume; } + void initColumnFileDataProvider(Remote::RNLocalPageCacheGuardPtr pages_guard); + void receivePage(RemotePb::RemotePage && remote_page); void receiveMemTable(Block && block) @@ -233,6 +236,8 @@ class RNRemoteSegmentReadTask void prepare(); friend class tests::RemoteReadTaskTest; + friend struct DB::FetchPagesRequest; + friend class RNRemoteReadTask; // Only used by buildFrom RNRemoteSegmentReadTask( @@ -252,8 +257,8 @@ class RNRemoteSegmentReadTask const String address; private: - // The snapshot of reading ids acquired from write node - std::vector delta_page_ids; + std::vector delta_tinycf_page_ids; + std::vector delta_tinycf_page_sizes; std::vector stable_files; DMContextPtr dm_context; @@ -261,14 +266,9 @@ class RNRemoteSegmentReadTask RowKeyRanges read_ranges; SegmentSnapshotPtr segment_snap; - // The page ids need to fetch from write node - std::vector page_ids_cache_miss; - size_t total_num_cftiny; - Remote::RNLocalPageCacheGuardPtr local_cache_guard; - public: - std::atomic num_msg_to_consume; - std::atomic num_msg_consumed; + std::atomic num_msg_to_consume{0}; + std::atomic num_msg_consumed{0}; private: std::mutex mtx_queue; diff --git a/dbms/src/Storages/DeltaMerge/Remote/Serializer.cpp b/dbms/src/Storages/DeltaMerge/Remote/Serializer.cpp index 7b8cf4355c6..926caf21d70 100644 --- a/dbms/src/Storages/DeltaMerge/Remote/Serializer.cpp +++ b/dbms/src/Storages/DeltaMerge/Remote/Serializer.cpp @@ -131,6 +131,9 @@ SegmentSnapshotPtr Serializer::deserializeSegmentSnapshotFrom( table_id, segment_range); + // Note: At this moment, we still cannot read from `delta_snap->mem_table_snap` and `delta_snap->persisted_files_snap`, + // because they are constructed using ColumnFileDataProviderNop. + auto delta_index_cache = dm_context.db_context.getSharedContextDisagg()->rn_delta_index_cache; if (delta_index_cache) { @@ -184,7 +187,7 @@ Serializer::serializeTo(const ColumnFileSetSnapshotPtr & snap) } else if (auto * cf_tiny = file->tryToTinyFile(); cf_tiny) { - ret.Add(serializeTo(*cf_tiny)); + ret.Add(serializeTo(*cf_tiny, snap->getDataProvider())); } else if (auto * cf_delete_range = file->tryToDeleteRange(); cf_delete_range) { @@ -317,11 +320,13 @@ ColumnFileInMemoryPtr Serializer::deserializeCFInMemory(const RemotePb::ColumnFi return std::make_shared(schema, cache); } -RemotePb::ColumnFileRemote Serializer::serializeTo(const ColumnFileTiny & cf_tiny) +RemotePb::ColumnFileRemote Serializer::serializeTo(const ColumnFileTiny & cf_tiny, IColumnFileDataProviderPtr data_provider) { RemotePb::ColumnFileRemote ret; auto * remote_tiny = ret.mutable_tiny(); remote_tiny->set_page_id(cf_tiny.data_page_id); + // Note: We cannot use cf_tiny.data_page_size, because it is only available after restored. + remote_tiny->set_page_size(data_provider->getTinyDataSize(cf_tiny.data_page_id)); { auto wb = WriteBufferFromString(*remote_tiny->mutable_schema()); serializeSchema(wb, cf_tiny.schema->getSchema()); // defined in ColumnFilePersisted.h @@ -342,7 +347,10 @@ ColumnFileTinyPtr Serializer::deserializeCFTiny(const RemotePb::ColumnFileTiny & // We do not try to reuse the CFSchema from `SharedBlockSchemas`, because the ColumnFile will be freed immediately after the request. auto schema = std::make_shared(*block_schema); - return std::make_shared(schema, proto.rows(), proto.bytes(), proto.page_id()); + auto cf = std::make_shared(schema, proto.rows(), proto.bytes(), proto.page_id()); + cf->data_page_size = proto.page_size(); + + return cf; } RemotePb::ColumnFileRemote Serializer::serializeTo(const ColumnFileDeleteRange & cf_delete_range) diff --git a/dbms/src/Storages/DeltaMerge/Remote/Serializer.h b/dbms/src/Storages/DeltaMerge/Remote/Serializer.h index 8df1ec38a8f..540b6ecdbc2 100644 --- a/dbms/src/Storages/DeltaMerge/Remote/Serializer.h +++ b/dbms/src/Storages/DeltaMerge/Remote/Serializer.h @@ -15,6 +15,7 @@ #include #include #include +#include #include #include #include @@ -92,7 +93,7 @@ struct Serializer static RemotePb::ColumnFileRemote serializeTo(const ColumnFileInMemory & cf_in_mem); static ColumnFileInMemoryPtr deserializeCFInMemory(const RemotePb::ColumnFileInMemory & proto); - static RemotePb::ColumnFileRemote serializeTo(const ColumnFileTiny & cf_tiny); + static RemotePb::ColumnFileRemote serializeTo(const ColumnFileTiny & cf_tiny, IColumnFileDataProviderPtr data_provider); static ColumnFileTinyPtr deserializeCFTiny(const RemotePb::ColumnFileTiny & proto); static RemotePb::ColumnFileRemote serializeTo(const ColumnFileDeleteRange & cf_delete_range); diff --git a/dbms/src/Storages/Page/V3/CheckpointFile/Proto/data_file.proto b/dbms/src/Storages/Page/V3/CheckpointFile/Proto/data_file.proto index c53aa015f4d..d9740982316 100644 --- a/dbms/src/Storages/Page/V3/CheckpointFile/Proto/data_file.proto +++ b/dbms/src/Storages/Page/V3/CheckpointFile/Proto/data_file.proto @@ -31,7 +31,7 @@ message DataFilePrefix { // The first Checkpoint Manifest File ID that contains this data file. // The referred manifest file has identical `local_sequence` and `writer_info` apparently. - string manifest_file_id = 12; + bytes manifest_file_id = 12; // When data file is too large, one data file will be separated into multiples. // This records which one it is. @@ -54,7 +54,7 @@ message DataFileSuffix { } message EntryEditRecord { - string page_id = 1; + bytes page_id = 1; uint64 version_sequence = 2; uint64 version_epoch = 3; diff --git a/dbms/src/Storages/Page/V3/CheckpointFile/Proto/manifest_file.proto b/dbms/src/Storages/Page/V3/CheckpointFile/Proto/manifest_file.proto index 491df9dc858..dd9cdd20b88 100644 --- a/dbms/src/Storages/Page/V3/CheckpointFile/Proto/manifest_file.proto +++ b/dbms/src/Storages/Page/V3/CheckpointFile/Proto/manifest_file.proto @@ -60,7 +60,7 @@ enum EditType { message EntryDataLocation { // There could be heavy duplicates. We rely on compression to reduce file size. - string data_file_id = 1; + bytes data_file_id = 1; uint64 offset_in_file = 2; uint64 size_in_file = 3; @@ -68,8 +68,8 @@ message EntryDataLocation { message EditRecord { EditType type = 1; - string page_id = 2; - string ori_page_id = 3; + bytes page_id = 2; + bytes ori_page_id = 3; uint64 version_sequence = 4; uint64 version_epoch = 5; @@ -83,6 +83,6 @@ message EditRecord { message LockFile { // There could be heavy duplicates. We rely on compression to reduce file size. - string name = 1; + bytes name = 1; } diff --git a/dbms/src/Storages/S3/S3Common.cpp b/dbms/src/Storages/S3/S3Common.cpp index f7a5cabcb32..e63f0cc6150 100644 --- a/dbms/src/Storages/S3/S3Common.cpp +++ b/dbms/src/Storages/S3/S3Common.cpp @@ -386,6 +386,7 @@ void ensureLifecycleRuleExist(const Aws::S3::S3Client & client, const String & b Aws::Vector old_rules; { Aws::S3::Model::GetBucketLifecycleConfigurationRequest req; + req.SetBucket(bucket); auto outcome = client.GetBucketLifecycleConfiguration(req); if (!outcome.IsSuccess()) { diff --git a/dbms/src/Storages/Transaction/TiKVKeyValue.h b/dbms/src/Storages/Transaction/TiKVKeyValue.h index 5b86d2219d0..1c0dfee81fc 100644 --- a/dbms/src/Storages/Transaction/TiKVKeyValue.h +++ b/dbms/src/Storages/Transaction/TiKVKeyValue.h @@ -111,7 +111,7 @@ struct DecodedTiKVKey : std::string static_assert(sizeof(DecodedTiKVKey) == sizeof(std::string)); -struct RawTiDBPK : std::shared_ptr +struct RawTiDBPK : private std::shared_ptr { using Base = std::shared_ptr; @@ -123,6 +123,9 @@ struct RawTiDBPK : std::shared_ptr bool operator==(const RawTiDBPK & y) const { return (**this) == (*y); } bool operator!=(const RawTiDBPK & y) const { return !((*this) == y); } bool operator<(const RawTiDBPK & y) const { return (**this) < (*y); } + bool operator>(const RawTiDBPK & y) const { return (**this) > (*y); } + const std::string * operator->() const { return get(); } + const std::string & operator*() const { return *get(); } RawTiDBPK(const Base & o) : Base(o)