Skip to content

Commit

Permalink
Fix several issues related with disaggregate remote read (#7053)
Browse files Browse the repository at this point in the history
ref #6827
  • Loading branch information
breezewish authored Mar 13, 2023
1 parent 835ad98 commit a7642ef
Show file tree
Hide file tree
Showing 18 changed files with 264 additions and 105 deletions.
14 changes: 11 additions & 3 deletions dbms/src/Flash/Disaggregated/RNPagePreparer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,15 @@ RNPagePreparer::RNPagePreparer(
for (size_t index = 0; index < threads_num; ++index)
{
auto task = std::make_shared<std::packaged_task<void()>>([this, index] {
persistLoop(index);
try
{
prepareLoop(index);
}
catch (...)
{
auto ex = getCurrentExceptionMessage(true);
LOG_ERROR(Logger::get(), "PagePrepareThread#{} read loop meet exception and exited, ex={}", index, ex);
}
});
persist_threads.emplace_back(task->get_future());

Expand Down Expand Up @@ -82,9 +90,9 @@ RNPagePreparer::~RNPagePreparer() noexcept
}
}

void RNPagePreparer::persistLoop(size_t idx)
void RNPagePreparer::prepareLoop(size_t idx)
{
LoggerPtr log = exc_log->getChild(fmt::format("persist{}", idx));
LoggerPtr log = exc_log->getChild(fmt::format("PagePrepareThread#{}", idx));


bool meet_error = false;
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Disaggregated/RNPagePreparer.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class RNPagePreparer
~RNPagePreparer() noexcept;

private:
void persistLoop(size_t idx);
void prepareLoop(size_t idx);

bool consumeOneResult(const LoggerPtr & log);

Expand Down
45 changes: 38 additions & 7 deletions dbms/src/Flash/Disaggregated/RNPageReceiverContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
#include <Flash/Coprocessor/GenSchemaAndColumn.h>
#include <Flash/Disaggregated/RNPageReceiverContext.h>
#include <Flash/Mpp/GRPCCompletionQueuePool.h>
#include <Interpreters/Context.h>
#include <Interpreters/SharedContexts/Disagg.h>
#include <Storages/DeltaMerge/Remote/RNLocalPageCache.h>
#include <Storages/DeltaMerge/Remote/RNRemoteReadTask.h>
#include <Storages/Transaction/TMTContext.h>
#include <grpcpp/completion_queue.h>
Expand Down Expand Up @@ -99,9 +102,42 @@ FetchPagesRequest::FetchPagesRequest(DM::RNRemoteSegmentReadTaskPtr seg_task_)
*req->mutable_snapshot_id() = seg_task->snapshot_id.toMeta();
req->set_table_id(seg_task->table_id);
req->set_segment_id(seg_task->segment_id);
for (auto page_id : seg_task->cacheMissPageIds())

{
req->add_page_ids(page_id);
std::vector<DM::Remote::PageOID> cf_tiny_oids;
cf_tiny_oids.reserve(seg_task->delta_tinycf_page_ids.size());
for (const auto & page_id : seg_task->delta_tinycf_page_ids)
{
auto page_oid = DM::Remote::PageOID{
.store_id = seg_task->store_id,
.table_id = seg_task->table_id,
.page_id = page_id,
};
cf_tiny_oids.emplace_back(page_oid);
}

// Note: We must occupySpace segment by segment, because we need to read
// at least the complete data of one segment in order to drive everything forward.
// Currently we call occupySpace for each FetchPagesRequest, which is fine,
// because we send one request each seg_task. If we want to split
// FetchPagesRequest into multiples in future, then we need to change
// the moment of calling `occupySpace`.
auto page_cache = seg_task->dm_context->db_context.getSharedContextDisagg()->rn_page_cache;
auto occupy_result = page_cache->occupySpace(cf_tiny_oids, seg_task->delta_tinycf_page_sizes);
for (auto page_id : occupy_result.pages_not_in_cache)
req->add_page_ids(page_id.page_id);

auto cftiny_total = seg_task->delta_tinycf_page_ids.size();
auto cftiny_fetch = occupy_result.pages_not_in_cache.size();
LOG_INFO(
Logger::get(),
"read task local cache hit rate: {}, pages_not_in_cache={}",
cftiny_total == 0 ? "N/A" : fmt::format("{:.2f}%", 100.0 - 100.0 * cftiny_fetch / cftiny_total),
occupy_result.pages_not_in_cache);
GET_METRIC(tiflash_disaggregated_details, type_cftiny_read).Increment(cftiny_total);
GET_METRIC(tiflash_disaggregated_details, type_cftiny_fetch).Increment(cftiny_fetch);

seg_task->initColumnFileDataProvider(occupy_result.pages_guard);
}
}

Expand All @@ -127,11 +163,6 @@ void GRPCPagesReceiverContext::finishTaskReceive(const DM::RNRemoteSegmentReadTa
remote_read_tasks->updateTaskState(seg_task, DM::SegmentReadTaskState::DataReady, false);
}

void GRPCPagesReceiverContext::finishAllReceivingTasks(const String & err_msg)
{
remote_read_tasks->allDataReceive(err_msg);
}

void GRPCPagesReceiverContext::cancelDisaggTaskOnTiFlashStorageNode(LoggerPtr /*log*/)
{
// TODO cancel
Expand Down
2 changes: 0 additions & 2 deletions dbms/src/Flash/Disaggregated/RNPageReceiverContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,6 @@ class GRPCPagesReceiverContext

void finishTaskReceive(const DM::RNRemoteSegmentReadTaskPtr & seg_task);

void finishAllReceivingTasks(const String & err_msg);

private:
// The remote segment task pool
DM::RNRemoteReadTaskPtr remote_read_tasks;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,6 @@ class ColumnFileSetSnapshot : public std::enable_shared_from_this<ColumnFileSetS
friend struct Remote::Serializer;

private:
IColumnFileDataProviderPtr data_provider;

ColumnFiles column_files;
size_t rows{0};
size_t bytes{0};
Expand All @@ -71,6 +69,13 @@ class ColumnFileSetSnapshot : public std::enable_shared_from_this<ColumnFileSetS
size_t rowkey_column_size{0};

public:
/// This field is public writeable intentionally. It allows us to build a snapshot first,
/// then change how these data can be read later.
/// In disaggregated mode, we first restore the snapshot from remote proto without a specific data provider (NopProvider),
/// and then assign the correct data provider according to the data in the snapshot.
/// Why we don't know the data provider at that time? Because when we have remote proto, data is not yet received.
IColumnFileDataProviderPtr data_provider = nullptr;

explicit ColumnFileSetSnapshot(const IColumnFileDataProviderPtr & data_provider_)
: data_provider{data_provider_}
{}
Expand Down
9 changes: 9 additions & 0 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ class ColumnFileTiny : public ColumnFilePersisted
/// The id of data page which stores the data of this pack.
PageIdU64 data_page_id;

/// HACK: Currently this field is only available when ColumnFileTiny is restored from remote proto.
/// It is not available when ColumnFileTiny is constructed or restored locally.
/// Maybe we should just drop this field, and store the data_page_size in somewhere else.
UInt64 data_page_size;

/// The members below are not serialized.

/// The cache data in memory.
Expand Down Expand Up @@ -102,6 +107,10 @@ class ColumnFileTiny : public ColumnFilePersisted

PageIdU64 getDataPageId() const { return data_page_id; }

/// WARNING: DO NOT USE THIS MEMBER FUNCTION UNLESS YOU KNOW WHAT YOU ARE DOING.
/// This function will be refined and dropped soon.
UInt64 getDataPageSize() const { return data_page_size; }

Block readBlockForMinorCompaction(const PageReader & page_reader) const;

static ColumnFileTinyPtr writeColumnFile(const DMContext & context, const Block & block, size_t offset, size_t limit, WriteBatches & wbs, const CachePtr & cache = nullptr);
Expand Down
10 changes: 7 additions & 3 deletions dbms/src/Storages/DeltaMerge/Remote/Proto/remote.proto
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,15 @@ message ColumnFileInMemory {

message ColumnFileTiny {
uint64 page_id = 1;
uint64 page_size = 2;

// serialized schema
bytes schema = 2;
bytes schema = 5;

uint64 rows = 3;
uint64 bytes = 4;
uint64 rows = 6;
uint64 bytes = 7;
}

message ColumnFileBig {
uint64 page_id = 1;
uint64 file_id = 2;
Expand All @@ -85,6 +88,7 @@ message ColumnFileBig {
uint64 valid_rows = 10;
uint64 valid_bytes = 11;
}

message ColumnFileDeleteRange {
bytes key_range = 1;
}
Expand Down
44 changes: 44 additions & 0 deletions dbms/src/Storages/DeltaMerge/Remote/RNDataProvider.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Storages/DeltaMerge/Remote/RNDataProvider.h>
#include <Storages/DeltaMerge/Remote/RNLocalPageCache.h>

namespace DB::DM::Remote
{

Page ColumnFileDataProviderRNLocalPageCache::readTinyData(
PageId page_id,
const std::optional<std::vector<size_t>> & fields) const
{
RUNTIME_CHECK_MSG(
fields.has_value(),
"ColumnFileDataProviderRNLocalPageCache currently does not support read all data from a page");

auto oid = Remote::PageOID{
.store_id = store_id,
.table_id = table_id,
.page_id = page_id,
};
return page_cache->getPage(oid, *fields);
}

size_t ColumnFileDataProviderRNLocalPageCache::getTinyDataSize(PageId) const
{
RUNTIME_CHECK_MSG(
false,
"ColumnFileDataProviderRNLocalPageCache currently does not support getTinyDataSize");
}

} // namespace DB::DM::Remote
52 changes: 52 additions & 0 deletions dbms/src/Storages/DeltaMerge/Remote/RNDataProvider.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Storages/DeltaMerge/ColumnFile/ColumnFileDataProvider.h>
#include <Storages/DeltaMerge/Remote/RNDataProvider_fwd.h>
#include <Storages/DeltaMerge/Remote/RNLocalPageCache_fwd.h>

namespace DB::DM::Remote
{

class ColumnFileDataProviderRNLocalPageCache : public IColumnFileDataProvider
{
private:
RNLocalPageCachePtr page_cache;
RNLocalPageCacheGuardPtr pages_guard; // Only keep for maintaining lifetime for related keys

UInt64 store_id;
Int64 table_id;

public:
explicit ColumnFileDataProviderRNLocalPageCache(
RNLocalPageCachePtr page_cache_,
RNLocalPageCacheGuardPtr pages_guard_,
UInt64 store_id_,
Int64 table_id_)
: page_cache(page_cache_)
, pages_guard(pages_guard_)
, store_id(store_id_)
, table_id(table_id_)
{}

Page readTinyData(
PageId page_id,
const std::optional<std::vector<size_t>> & fields) const override;

size_t getTinyDataSize(PageId page_id) const override;
};

} // namespace DB::DM::Remote
22 changes: 22 additions & 0 deletions dbms/src/Storages/DeltaMerge/Remote/RNDataProvider_fwd.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

namespace DB::DM::Remote
{

class ColumnFileDataProviderLocalPageCache;

}
Loading

0 comments on commit a7642ef

Please sign in to comment.