From 63c4ea8450d959242fa7b1f10d139ddd5e84512c Mon Sep 17 00:00:00 2001 From: Yingchun Lai <405403881@qq.com> Date: Mon, 11 May 2020 22:26:18 +0800 Subject: [PATCH] feat(rocksdb): Support to config meta data read source (#532) --- src/server/meta_store.cpp | 24 ++++++++++++++++++------ src/server/meta_store.h | 8 +++++--- src/server/pegasus_server_impl.cpp | 20 +++++++------------- 3 files changed, 30 insertions(+), 22 deletions(-) diff --git a/src/server/meta_store.cpp b/src/server/meta_store.cpp index b8abcc3f74..a25b07a1b5 100644 --- a/src/server/meta_store.cpp +++ b/src/server/meta_store.cpp @@ -5,10 +5,19 @@ #include "meta_store.h" #include +#include namespace pegasus { namespace server { +DSN_DEFINE_string("pegasus.server", + get_meta_store_type, + "manifest", + "Where to get meta data, now support 'manifest' and 'metacf'"); +DSN_DEFINE_validator(get_meta_store_type, [](const char *type) { + return strcmp(type, "manifest") == 0 || strcmp(type, "metacf") == 0; +}); + const std::string meta_store::DATA_VERSION = "pegasus_data_version"; const std::string meta_store::LAST_FLUSHED_DECREE = "pegasus_last_flushed_decree"; const std::string meta_store::LAST_MANUAL_COMPACT_FINISH_TIME = @@ -21,11 +30,14 @@ meta_store::meta_store(pegasus_server_impl *server, { // disable write ahead logging as replication handles logging instead now _wt_opts.disableWAL = true; + _get_meta_store_type = + (strcmp(FLAGS_get_meta_store_type, "manifest") == 0 ? meta_store_type::kManifestOnly + : meta_store_type::kMetaCFOnly); } -uint64_t meta_store::get_last_flushed_decree(meta_store_type type) const +uint64_t meta_store::get_last_flushed_decree() const { - switch (type) { + switch (_get_meta_store_type) { case meta_store_type::kManifestOnly: return _db->GetLastFlushedDecree(); case meta_store_type::kMetaCFOnly: { @@ -39,9 +51,9 @@ uint64_t meta_store::get_last_flushed_decree(meta_store_type type) const } } -uint32_t meta_store::get_data_version(meta_store_type type) const +uint32_t meta_store::get_data_version() const { - switch (type) { + switch (_get_meta_store_type) { case meta_store_type::kManifestOnly: return _db->GetPegasusDataVersion(); case meta_store_type::kMetaCFOnly: { @@ -55,9 +67,9 @@ uint32_t meta_store::get_data_version(meta_store_type type) const } } -uint64_t meta_store::get_last_manual_compact_finish_time(meta_store_type type) const +uint64_t meta_store::get_last_manual_compact_finish_time() const { - switch (type) { + switch (_get_meta_store_type) { case meta_store_type::kManifestOnly: return _db->GetLastManualCompactFinishTime(); case meta_store_type::kMetaCFOnly: { diff --git a/src/server/meta_store.h b/src/server/meta_store.h index 2acbc44776..ea818fbae6 100644 --- a/src/server/meta_store.h +++ b/src/server/meta_store.h @@ -32,9 +32,9 @@ class meta_store : public dsn::replication::replica_base meta_store(pegasus_server_impl *server, rocksdb::DB *db, rocksdb::ColumnFamilyHandle *meta_cf); - uint64_t get_last_flushed_decree(meta_store_type type) const; - uint32_t get_data_version(meta_store_type type) const; - uint64_t get_last_manual_compact_finish_time(meta_store_type type) const; + uint64_t get_last_flushed_decree() const; + uint32_t get_data_version() const; + uint64_t get_last_manual_compact_finish_time() const; void set_last_flushed_decree(uint64_t decree) const; void set_data_version(uint32_t version) const; @@ -55,6 +55,8 @@ class meta_store : public dsn::replication::replica_base rocksdb::DB *_db; rocksdb::ColumnFamilyHandle *_meta_cf; rocksdb::WriteOptions _wt_opts; + + meta_store_type _get_meta_store_type; }; } // namespace server diff --git a/src/server/pegasus_server_impl.cpp b/src/server/pegasus_server_impl.cpp index 5081ace2db..8857b8fd28 100644 --- a/src/server/pegasus_server_impl.cpp +++ b/src/server/pegasus_server_impl.cpp @@ -1366,12 +1366,9 @@ ::dsn::error_code pegasus_server_impl::start(int argc, char **argv) // Create _meta_store which provide Pegasus meta data read and write. _meta_store = dsn::make_unique(this, _db, _meta_cf); - _last_committed_decree = - _meta_store->get_last_flushed_decree(meta_store::meta_store_type::kManifestOnly); - _pegasus_data_version = - _meta_store->get_data_version(meta_store::meta_store_type::kManifestOnly); - uint64_t last_manual_compact_finish_time = _meta_store->get_last_manual_compact_finish_time( - meta_store::meta_store_type::kManifestOnly); + _last_committed_decree = _meta_store->get_last_flushed_decree(); + _pegasus_data_version = _meta_store->get_data_version(); + uint64_t last_manual_compact_finish_time = _meta_store->get_last_manual_compact_finish_time(); if (_pegasus_data_version > PEGASUS_DATA_VERSION_MAX) { derror_replica("open app failed, unsupported data version {}", _pegasus_data_version); release_db(); @@ -1594,8 +1591,7 @@ ::dsn::error_code pegasus_server_impl::sync_checkpoint() { ::dsn::utils::auto_lock<::dsn::utils::ex_lock_nr> l(_checkpoints_lock); dcheck_gt_replica(last_commit, last_durable_decree()); - int64_t last_flushed = static_cast( - _meta_store->get_last_flushed_decree(meta_store::meta_store_type::kManifestOnly)); + int64_t last_flushed = static_cast(_meta_store->get_last_flushed_decree()); dcheck_eq_replica(last_commit, last_flushed); if (!_checkpoints.empty()) { dcheck_gt_replica(last_commit, _checkpoints.back()); @@ -1620,8 +1616,7 @@ ::dsn::error_code pegasus_server_impl::async_checkpoint(bool flush_memtable) return ::dsn::ERR_WRONG_TIMING; int64_t last_durable = last_durable_decree(); - int64_t last_flushed = static_cast( - _meta_store->get_last_flushed_decree(meta_store::meta_store_type::kManifestOnly)); + int64_t last_flushed = static_cast(_meta_store->get_last_flushed_decree()); int64_t last_commit = last_committed_decree(); dcheck_le_replica(last_durable, last_flushed); @@ -2170,7 +2165,7 @@ void pegasus_server_impl::update_app_envs(const std::mapget_last_flushed_decree(meta_store::meta_store_type::kManifestOnly); + return _meta_store->get_last_flushed_decree(); } void pegasus_server_impl::update_app_envs_before_open_db( @@ -2529,8 +2524,7 @@ uint64_t pegasus_server_impl::do_manual_compact(const rocksdb::CompactRangeOptio // update rocksdb statistics immediately update_replica_rocksdb_statistics(); - return _meta_store->get_last_manual_compact_finish_time( - meta_store::meta_store_type::kManifestOnly); + return _meta_store->get_last_manual_compact_finish_time(); } bool pegasus_server_impl::release_storage_after_manual_compact()