Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PageStorage refine : Split the different version of PageStorage #3320

Merged
merged 11 commits into from
Nov 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions dbms/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,14 @@ add_headers_and_sources(dbms src/Storages/DeltaMerge/Delta)
add_headers_and_sources(dbms src/Storages/Distributed)
add_headers_and_sources(dbms src/Storages/MergeTree)
add_headers_and_sources(dbms src/Storages/Transaction)
add_headers_and_sources(dbms src/Storages/Page)
add_headers_and_sources(dbms src/Storages/Page/mvcc)
add_headers_and_sources(dbms src/Storages/Page/VersionSet)
add_headers_and_sources(dbms src/Storages/Page/gc)
add_headers_and_sources(dbms src/Storages/Page/stable)
add_headers_and_sources(dbms src/Storages/Page/stable/mvcc)
add_headers_and_sources(dbms src/Storages/Page/stable/VersionSet)
add_headers_and_sources(dbms src/Storages/Page/V1)
add_headers_and_sources(dbms src/Storages/Page/V1/mvcc)
add_headers_and_sources(dbms src/Storages/Page/V1/VersionSet)
add_headers_and_sources(dbms src/Storages/Page/V2)
add_headers_and_sources(dbms src/Storages/Page/V2/mvcc)
add_headers_and_sources(dbms src/Storages/Page/V2/VersionSet)
add_headers_and_sources(dbms src/Storages/Page/V2/gc)
add_headers_and_sources(dbms src/Storages/Page/)
add_headers_and_sources(dbms src/TiDB)
add_headers_and_sources(dbms src/Client)
add_headers_only(dbms src/Flash/Coprocessor)
Expand Down
24 changes: 12 additions & 12 deletions dbms/src/Server/tests/gtest_server_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -275,21 +275,21 @@ dt_page_gc_low_write_prob = 0.2
auto verifyStoragePoolReloadConfig = [&global_ctx](std::unique_ptr<DM::StoragePool> & storage_pool) {
DB::Settings & settings = global_ctx.getSettingsRef();

EXPECT_NE(storage_pool->data().config.gc_min_files, settings.dt_storage_pool_data_gc_min_file_num);
EXPECT_NE(storage_pool->data().config.gc_min_legacy_num, settings.dt_storage_pool_data_gc_min_legacy_num);
EXPECT_NE(storage_pool->data().config.gc_min_bytes, settings.dt_storage_pool_data_gc_min_bytes);
EXPECT_NE(storage_pool->data().config.gc_max_valid_rate, settings.dt_storage_pool_data_gc_max_valid_rate);
EXPECT_NE(storage_pool->data().config.open_file_max_idle_time, settings.dt_open_file_max_idle_seconds);
EXPECT_NE(storage_pool->data().config.prob_do_gc_when_write_is_low, settings.dt_page_gc_low_write_prob * 1000);
EXPECT_NE(storage_pool->data()->config.gc_min_files, settings.dt_storage_pool_data_gc_min_file_num);
EXPECT_NE(storage_pool->data()->config.gc_min_legacy_num, settings.dt_storage_pool_data_gc_min_legacy_num);
EXPECT_NE(storage_pool->data()->config.gc_min_bytes, settings.dt_storage_pool_data_gc_min_bytes);
EXPECT_NE(storage_pool->data()->config.gc_max_valid_rate, settings.dt_storage_pool_data_gc_max_valid_rate);
EXPECT_NE(storage_pool->data()->config.open_file_max_idle_time, settings.dt_open_file_max_idle_seconds);
EXPECT_NE(storage_pool->data()->config.prob_do_gc_when_write_is_low, settings.dt_page_gc_low_write_prob * 1000);

storage_pool->gc(settings, DM::StoragePool::Seconds(0));

EXPECT_EQ(storage_pool->data().config.gc_min_files, settings.dt_storage_pool_data_gc_min_file_num);
EXPECT_EQ(storage_pool->data().config.gc_min_legacy_num, settings.dt_storage_pool_data_gc_min_legacy_num);
EXPECT_EQ(storage_pool->data().config.gc_min_bytes, settings.dt_storage_pool_data_gc_min_bytes);
EXPECT_DOUBLE_EQ(storage_pool->data().config.gc_max_valid_rate, settings.dt_storage_pool_data_gc_max_valid_rate);
EXPECT_EQ(storage_pool->data().config.open_file_max_idle_time, settings.dt_open_file_max_idle_seconds);
EXPECT_EQ(storage_pool->data().config.prob_do_gc_when_write_is_low, settings.dt_page_gc_low_write_prob * 1000);
EXPECT_EQ(storage_pool->data()->config.gc_min_files, settings.dt_storage_pool_data_gc_min_file_num);
EXPECT_EQ(storage_pool->data()->config.gc_min_legacy_num, settings.dt_storage_pool_data_gc_min_legacy_num);
EXPECT_EQ(storage_pool->data()->config.gc_min_bytes, settings.dt_storage_pool_data_gc_min_bytes);
EXPECT_DOUBLE_EQ(storage_pool->data()->config.gc_max_valid_rate, settings.dt_storage_pool_data_gc_max_valid_rate);
EXPECT_EQ(storage_pool->data()->config.open_file_max_idle_time, settings.dt_open_file_max_idle_seconds);
EXPECT_EQ(storage_pool->data()->config.prob_do_gc_when_write_is_low, settings.dt_page_gc_low_write_prob * 1000);
};

for (size_t i = 0; i < tests.size(); ++i)
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ add_subdirectory (DeltaMerge/File/dtpb)
if (ENABLE_TESTS)
add_subdirectory (tests EXCLUDE_FROM_ALL)
add_subdirectory (Transaction/tests EXCLUDE_FROM_ALL)
add_subdirectory (Page/tests EXCLUDE_FROM_ALL)
add_subdirectory (Page/V2/tests EXCLUDE_FROM_ALL)
add_subdirectory (DeltaMerge/tests EXCLUDE_FROM_ALL)
endif ()

2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/Delta/CompactDelta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ bool DeltaValueSpace::compact(DMContext & context)
return true;
}

log_storage_snap = context.storage_pool.log().getSnapshot();
log_storage_snap = context.storage_pool.log()->getSnapshot();
snapshot_metrics.changeTo(1); // add metrics for snapshot
}

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/Delta/DeltaPackBlock.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#pragma once

#include <Storages/DeltaMerge/Delta/DeltaPack.h>

#include <Storages/Page/Page.h>
namespace DB
{
namespace DM
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/Delta/DeltaPackFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ DeltaPackPtr DeltaPackFile::deserializeMetadata(DMContext & context, //
readIntBinary(valid_rows, buf);
readIntBinary(valid_bytes, buf);

auto file_id = context.storage_pool.data().getNormalPageId(file_ref_id);
auto file_id = context.storage_pool.data()->getNormalPageId(file_ref_id);
auto file_parent_path = context.path_pool.getStableDiskDelegator().getDTFilePath(file_id);

auto dmfile = DMFile::restore(context.db_context.getFileProvider(), file_id, file_ref_id, file_parent_path, DMFile::ReadMetaMode::all());
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ void DeltaValueSpace::abandon(DMContext & context)

DeltaValueSpacePtr DeltaValueSpace::restore(DMContext & context, const RowKeyRange & segment_range, PageId id)
{
Page page = context.storage_pool.meta().read(id, nullptr);
Page page = context.storage_pool.meta()->read(id, nullptr);
ReadBufferFromMemory buf(page.data.begin(), page.data.size());
auto packs = deserializePacks(context, segment_range, buf);
return std::make_shared<DeltaValueSpace>(id, packs);
Expand Down
14 changes: 7 additions & 7 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ void DeltaMergeStore::setUpBackgroundTask(const DMContextPtr & dm_context)
}
}
};
storage_pool.data().registerExternalPagesCallbacks(dmfile_scanner, dmfile_remover);
storage_pool.data()->registerExternalPagesCallbacks(dmfile_scanner, dmfile_remover);

gc_handle = background_pool.addTask([this] { return storage_pool.gc(global_context.getSettingsRef()); });
background_task_handle = background_pool.addTask([this] { return handleBackgroundTask(false); });
Expand Down Expand Up @@ -2180,8 +2180,8 @@ DeltaMergeStoreStat DeltaMergeStore::getStat()
std::tie(stat.storage_stable_num_snapshots, //
stat.storage_stable_oldest_snapshot_lifetime,
stat.storage_stable_oldest_snapshot_thread_id)
= storage_pool.data().getSnapshotsStat();
PageStorage::SnapshotPtr stable_snapshot = storage_pool.data().getSnapshot();
= storage_pool.data()->getSnapshotsStat();
PageStorage::SnapshotPtr stable_snapshot = storage_pool.data()->getSnapshot();
stat.storage_stable_num_pages = stable_snapshot->version()->numPages();
stat.storage_stable_num_normal_pages = stable_snapshot->version()->numNormalPages();
stat.storage_stable_max_page_id = stable_snapshot->version()->maxId();
Expand All @@ -2190,8 +2190,8 @@ DeltaMergeStoreStat DeltaMergeStore::getStat()
std::tie(stat.storage_delta_num_snapshots, //
stat.storage_delta_oldest_snapshot_lifetime,
stat.storage_delta_oldest_snapshot_thread_id)
= storage_pool.log().getSnapshotsStat();
PageStorage::SnapshotPtr log_snapshot = storage_pool.log().getSnapshot();
= storage_pool.log()->getSnapshotsStat();
PageStorage::SnapshotPtr log_snapshot = storage_pool.log()->getSnapshot();
stat.storage_delta_num_pages = log_snapshot->version()->numPages();
stat.storage_delta_num_normal_pages = log_snapshot->version()->numNormalPages();
stat.storage_delta_max_page_id = log_snapshot->version()->maxId();
Expand All @@ -2200,8 +2200,8 @@ DeltaMergeStoreStat DeltaMergeStore::getStat()
std::tie(stat.storage_meta_num_snapshots, //
stat.storage_meta_oldest_snapshot_lifetime,
stat.storage_meta_oldest_snapshot_thread_id)
= storage_pool.meta().getSnapshotsStat();
PageStorage::SnapshotPtr meta_snapshot = storage_pool.meta().getSnapshot();
= storage_pool.meta()->getSnapshotsStat();
PageStorage::SnapshotPtr meta_snapshot = storage_pool.meta()->getSnapshot();
stat.storage_meta_num_pages = meta_snapshot->version()->numPages();
stat.storage_meta_num_normal_pages = meta_snapshot->version()->numNormalPages();
stat.storage_meta_max_page_id = meta_snapshot->version()->maxId();
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Storages/DeltaMerge/File/DMFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ DMFilePtr DMFile::create(UInt64 file_id, const String & parent_path, bool single
// Create a mark file to stop this dmfile from being removed by GC.
// We should create NGC file before creating the file under single file mode,
// or the file may be removed.
// FIXME : this should not use PageUtils.
PageUtil::touchFile(new_dmfile->ngcPath());
PageUtil::touchFile(path);
}
Expand All @@ -121,6 +122,7 @@ DMFilePtr DMFile::create(UInt64 file_id, const String & parent_path, bool single
// Create a mark file to stop this dmfile from being removed by GC.
// We should create NGC file after creating the directory under folder mode
// since the NGC file is a file under the folder.
// FIXME : this should not use PageUtils.
PageUtil::touchFile(new_dmfile->ngcPath());
}

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/Segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ SegmentPtr Segment::newSegment(

SegmentPtr Segment::restoreSegment(DMContext & context, PageId segment_id)
{
Page page = context.storage_pool.meta().read(segment_id, nullptr); // not limit restore
Page page = context.storage_pool.meta()->read(segment_id, nullptr); // not limit restore

ReadBufferFromMemory buf(page.data.begin(), page.data.size());
SegmentFormat::Version version;
Expand Down
9 changes: 3 additions & 6 deletions dbms/src/Storages/DeltaMerge/Segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,9 @@
#include <Storages/DeltaMerge/SkippableBlockInputStream.h>
#include <Storages/DeltaMerge/StableValueSpace.h>
#include <Storages/Page/PageDefines.h>
#include <Storages/Page/WriteBatch.h>

namespace DB
{
class WriteBatch;
namespace DM
namespace DB::DM
{
class Segment;
struct SegmentSnapshot;
Expand Down Expand Up @@ -376,5 +374,4 @@ class Segment : private boost::noncopyable
Poco::Logger * log;
};

} // namespace DM
} // namespace DB
} // namespace DB::DM
4 changes: 2 additions & 2 deletions dbms/src/Storages/DeltaMerge/StableValueSpace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ StableValueSpacePtr StableValueSpace::restore(DMContext & context, PageId id)
{
auto stable = std::make_shared<StableValueSpace>(id);

Page page = context.storage_pool.meta().read(id, nullptr); // not limit restore
Page page = context.storage_pool.meta()->read(id, nullptr); // not limit restore
ReadBufferFromMemory buf(page.data.begin(), page.data.size());
UInt64 version, valid_rows, valid_bytes, size;
readIntBinary(version, buf);
Expand All @@ -88,7 +88,7 @@ StableValueSpacePtr StableValueSpace::restore(DMContext & context, PageId id)
{
readIntBinary(ref_id, buf);

auto file_id = context.storage_pool.data().getNormalPageId(ref_id);
auto file_id = context.storage_pool.data()->getNormalPageId(ref_id);
auto file_parent_path = context.path_pool.getStableDiskDelegator().getDTFilePath(file_id);

auto dmfile = DMFile::restore(context.db_context.getFileProvider(), file_id, ref_id, file_parent_path, DMFile::ReadMetaMode::all());
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Storages/DeltaMerge/StableValueSpace.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
#include <Storages/DeltaMerge/File/DMFile.h>
#include <Storages/DeltaMerge/RowKeyRange.h>
#include <Storages/DeltaMerge/SkippableBlockInputStream.h>
#include <Storages/Page/Page.h>
#include <Storages/Page/PageStorage.h>

#include <Storages/Page/WriteBatch.h>

namespace DB
{
Expand Down
55 changes: 27 additions & 28 deletions dbms/src/Storages/DeltaMerge/StoragePool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,23 +53,22 @@ PageStorage::Config extractConfig(const Settings & settings, StorageType subtype

StoragePool::StoragePool(const String & name, StoragePathPool & path_pool, const Context & global_ctx, const Settings & settings)
: // The iops and bandwidth in log_storage are relatively high, use multi-disks if possible
log_storage(
name + ".log",
path_pool.getPSDiskDelegatorMulti("log"),
extractConfig(settings, StorageType::Log),
global_ctx.getFileProvider())
log_storage(PageStorage::create(name + ".log",
path_pool.getPSDiskDelegatorMulti("log"),
extractConfig(settings, StorageType::Log),
global_ctx.getFileProvider()))
,
// The iops in data_storage is low, only use the first disk for storing data
data_storage(name + ".data",
path_pool.getPSDiskDelegatorSingle("data"),
extractConfig(settings, StorageType::Data),
global_ctx.getFileProvider())
data_storage(PageStorage::create(name + ".data",
path_pool.getPSDiskDelegatorSingle("data"),
extractConfig(settings, StorageType::Data),
global_ctx.getFileProvider()))
,
// The iops in meta_storage is relatively high, use multi-disks if possible
meta_storage(name + ".meta",
path_pool.getPSDiskDelegatorMulti("meta"),
extractConfig(settings, StorageType::Meta),
global_ctx.getFileProvider())
meta_storage(PageStorage::create(name + ".meta",
path_pool.getPSDiskDelegatorMulti("meta"),
extractConfig(settings, StorageType::Meta),
global_ctx.getFileProvider()))
, max_log_page_id(0)
, max_data_page_id(0)
, max_meta_page_id(0)
Expand All @@ -78,20 +77,20 @@ StoragePool::StoragePool(const String & name, StoragePathPool & path_pool, const

void StoragePool::restore()
{
log_storage.restore();
data_storage.restore();
meta_storage.restore();
log_storage->restore();
data_storage->restore();
meta_storage->restore();

max_log_page_id = log_storage.getMaxId();
max_data_page_id = data_storage.getMaxId();
max_meta_page_id = meta_storage.getMaxId();
max_log_page_id = log_storage->getMaxId();
max_data_page_id = data_storage->getMaxId();
max_meta_page_id = meta_storage->getMaxId();
}

void StoragePool::drop()
{
meta_storage.drop();
data_storage.drop();
log_storage.drop();
meta_storage->drop();
data_storage->drop();
log_storage->drop();
}

PageId StoragePool::newDataPageIdForDTFile(StableDiskDelegator & delegator, const char * who)
Expand Down Expand Up @@ -140,16 +139,16 @@ bool StoragePool::gc(const Settings & settings, const Seconds & try_gc_period)
auto write_limiter = global_context.getWriteLimiter();
auto read_limiter = global_context.getReadLimiter();
auto config = extractConfig(settings, StorageType::Meta);
meta_storage.reloadSettings(config);
done_anything |= meta_storage.gc(/*not_skip*/ false, write_limiter, read_limiter);
meta_storage->reloadSettings(config);
done_anything |= meta_storage->gc(/*not_skip*/ false, write_limiter, read_limiter);

config = extractConfig(settings, StorageType::Data);
data_storage.reloadSettings(config);
done_anything |= data_storage.gc(/*not_skip*/ false, write_limiter, read_limiter);
data_storage->reloadSettings(config);
done_anything |= data_storage->gc(/*not_skip*/ false, write_limiter, read_limiter);

config = extractConfig(settings, StorageType::Log);
log_storage.reloadSettings(config);
done_anything |= log_storage.gc(/*not_skip*/ false, write_limiter, read_limiter);
log_storage->reloadSettings(config);
done_anything |= log_storage->gc(/*not_skip*/ false, write_limiter, read_limiter);

return done_anything;
}
Expand Down
19 changes: 10 additions & 9 deletions dbms/src/Storages/DeltaMerge/StoragePool.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,19 @@ class StoragePool : private boost::noncopyable

PageId newDataPageIdForDTFile(StableDiskDelegator & delegator, const char * who);

PageStorage & log() { return log_storage; }
PageStorage & data() { return data_storage; }
PageStorage & meta() { return meta_storage; }
PageStoragePtr log() { return log_storage; }
PageStoragePtr data() { return data_storage; }
PageStoragePtr meta() { return meta_storage; }

// Caller must cancel gc tasks before drop
void drop();

bool gc(const Settings & settings, const Seconds & try_gc_period = DELTA_MERGE_GC_PERIOD);

private:
PageStorage log_storage;
PageStorage data_storage;
PageStorage meta_storage;
PageStoragePtr log_storage;
PageStoragePtr data_storage;
PageStoragePtr meta_storage;

std::atomic<PageId> max_log_page_id;
std::atomic<PageId> max_data_page_id;
Expand All @@ -65,9 +65,9 @@ class StoragePool : private boost::noncopyable
struct StorageSnapshot : private boost::noncopyable
{
StorageSnapshot(StoragePool & storage, ReadLimiterPtr read_limiter, bool snapshot_read = true)
: log_reader(storage.log(), snapshot_read ? storage.log().getSnapshot() : nullptr, read_limiter)
, data_reader(storage.data(), snapshot_read ? storage.data().getSnapshot() : nullptr, read_limiter)
, meta_reader(storage.meta(), snapshot_read ? storage.meta().getSnapshot() : nullptr, read_limiter)
: log_reader(storage.log(), snapshot_read ? storage.log()->getSnapshot() : nullptr, read_limiter)
, data_reader(storage.data(), snapshot_read ? storage.data()->getSnapshot() : nullptr, read_limiter)
, meta_reader(storage.meta(), snapshot_read ? storage.meta()->getSnapshot() : nullptr, read_limiter)
{}

PageReader log_reader;
Expand All @@ -76,5 +76,6 @@ struct StorageSnapshot : private boost::noncopyable
};
using StorageSnapshotPtr = std::shared_ptr<StorageSnapshot>;


} // namespace DM
} // namespace DB
16 changes: 8 additions & 8 deletions dbms/src/Storages/DeltaMerge/WriteBatches.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ struct WriteBatches : private boost::noncopyable
for (auto & w : data.getWrites())
data_write_pages.push_back(w.page_id);

storage_pool.log().write(std::move(log), write_limiter);
storage_pool.data().write(std::move(data), write_limiter);
storage_pool.log()->write(std::move(log), write_limiter);
storage_pool.data()->write(std::move(data), write_limiter);

for (auto page_id : log_write_pages)
writtenLog.push_back(page_id);
Expand Down Expand Up @@ -129,8 +129,8 @@ struct WriteBatches : private boost::noncopyable
check(data_wb, "data_wb", logger);
}

storage_pool.log().write(std::move(log_wb), write_limiter);
storage_pool.data().write(std::move(data_wb), write_limiter);
storage_pool.log()->write(std::move(log_wb), write_limiter);
storage_pool.data()->write(std::move(data_wb), write_limiter);

writtenLog.clear();
writtenData.clear();
Expand All @@ -156,7 +156,7 @@ struct WriteBatches : private boost::noncopyable
check(meta, "meta", logger);
}

storage_pool.meta().write(std::move(meta), write_limiter);
storage_pool.meta()->write(std::move(meta), write_limiter);
meta.clear();
}

Expand All @@ -182,9 +182,9 @@ struct WriteBatches : private boost::noncopyable
check(removed_meta, "removed_meta", logger);
}

storage_pool.log().write(std::move(removed_log), write_limiter);
storage_pool.data().write(std::move(removed_data), write_limiter);
storage_pool.meta().write(std::move(removed_meta), write_limiter);
storage_pool.log()->write(std::move(removed_log), write_limiter);
storage_pool.data()->write(std::move(removed_data), write_limiter);
storage_pool.meta()->write(std::move(removed_meta), write_limiter);

removed_log.clear();
removed_data.clear();
Expand Down
Loading