Skip to content

Commit

Permalink
feat(rocksdb): Support to config meta data read source (#532)
Browse files Browse the repository at this point in the history
  • Loading branch information
acelyc111 authored May 11, 2020
1 parent e9e20c4 commit 63c4ea8
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 22 deletions.
24 changes: 18 additions & 6 deletions src/server/meta_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,19 @@
#include "meta_store.h"

#include <dsn/dist/fmt_logging.h>
#include <dsn/utility/flags.h>

namespace pegasus {
namespace server {

DSN_DEFINE_string("pegasus.server",
get_meta_store_type,
"manifest",
"Where to get meta data, now support 'manifest' and 'metacf'");
DSN_DEFINE_validator(get_meta_store_type, [](const char *type) {
return strcmp(type, "manifest") == 0 || strcmp(type, "metacf") == 0;
});

const std::string meta_store::DATA_VERSION = "pegasus_data_version";
const std::string meta_store::LAST_FLUSHED_DECREE = "pegasus_last_flushed_decree";
const std::string meta_store::LAST_MANUAL_COMPACT_FINISH_TIME =
Expand All @@ -21,11 +30,14 @@ meta_store::meta_store(pegasus_server_impl *server,
{
// disable write ahead logging as replication handles logging instead now
_wt_opts.disableWAL = true;
_get_meta_store_type =
(strcmp(FLAGS_get_meta_store_type, "manifest") == 0 ? meta_store_type::kManifestOnly
: meta_store_type::kMetaCFOnly);
}

uint64_t meta_store::get_last_flushed_decree(meta_store_type type) const
uint64_t meta_store::get_last_flushed_decree() const
{
switch (type) {
switch (_get_meta_store_type) {
case meta_store_type::kManifestOnly:
return _db->GetLastFlushedDecree();
case meta_store_type::kMetaCFOnly: {
Expand All @@ -39,9 +51,9 @@ uint64_t meta_store::get_last_flushed_decree(meta_store_type type) const
}
}

uint32_t meta_store::get_data_version(meta_store_type type) const
uint32_t meta_store::get_data_version() const
{
switch (type) {
switch (_get_meta_store_type) {
case meta_store_type::kManifestOnly:
return _db->GetPegasusDataVersion();
case meta_store_type::kMetaCFOnly: {
Expand All @@ -55,9 +67,9 @@ uint32_t meta_store::get_data_version(meta_store_type type) const
}
}

uint64_t meta_store::get_last_manual_compact_finish_time(meta_store_type type) const
uint64_t meta_store::get_last_manual_compact_finish_time() const
{
switch (type) {
switch (_get_meta_store_type) {
case meta_store_type::kManifestOnly:
return _db->GetLastManualCompactFinishTime();
case meta_store_type::kMetaCFOnly: {
Expand Down
8 changes: 5 additions & 3 deletions src/server/meta_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ class meta_store : public dsn::replication::replica_base

meta_store(pegasus_server_impl *server, rocksdb::DB *db, rocksdb::ColumnFamilyHandle *meta_cf);

uint64_t get_last_flushed_decree(meta_store_type type) const;
uint32_t get_data_version(meta_store_type type) const;
uint64_t get_last_manual_compact_finish_time(meta_store_type type) const;
uint64_t get_last_flushed_decree() const;
uint32_t get_data_version() const;
uint64_t get_last_manual_compact_finish_time() const;

void set_last_flushed_decree(uint64_t decree) const;
void set_data_version(uint32_t version) const;
Expand All @@ -55,6 +55,8 @@ class meta_store : public dsn::replication::replica_base
rocksdb::DB *_db;
rocksdb::ColumnFamilyHandle *_meta_cf;
rocksdb::WriteOptions _wt_opts;

meta_store_type _get_meta_store_type;
};

} // namespace server
Expand Down
20 changes: 7 additions & 13 deletions src/server/pegasus_server_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1366,12 +1366,9 @@ ::dsn::error_code pegasus_server_impl::start(int argc, char **argv)
// Create _meta_store which provide Pegasus meta data read and write.
_meta_store = dsn::make_unique<meta_store>(this, _db, _meta_cf);

_last_committed_decree =
_meta_store->get_last_flushed_decree(meta_store::meta_store_type::kManifestOnly);
_pegasus_data_version =
_meta_store->get_data_version(meta_store::meta_store_type::kManifestOnly);
uint64_t last_manual_compact_finish_time = _meta_store->get_last_manual_compact_finish_time(
meta_store::meta_store_type::kManifestOnly);
_last_committed_decree = _meta_store->get_last_flushed_decree();
_pegasus_data_version = _meta_store->get_data_version();
uint64_t last_manual_compact_finish_time = _meta_store->get_last_manual_compact_finish_time();
if (_pegasus_data_version > PEGASUS_DATA_VERSION_MAX) {
derror_replica("open app failed, unsupported data version {}", _pegasus_data_version);
release_db();
Expand Down Expand Up @@ -1594,8 +1591,7 @@ ::dsn::error_code pegasus_server_impl::sync_checkpoint()
{
::dsn::utils::auto_lock<::dsn::utils::ex_lock_nr> l(_checkpoints_lock);
dcheck_gt_replica(last_commit, last_durable_decree());
int64_t last_flushed = static_cast<int64_t>(
_meta_store->get_last_flushed_decree(meta_store::meta_store_type::kManifestOnly));
int64_t last_flushed = static_cast<int64_t>(_meta_store->get_last_flushed_decree());
dcheck_eq_replica(last_commit, last_flushed);
if (!_checkpoints.empty()) {
dcheck_gt_replica(last_commit, _checkpoints.back());
Expand All @@ -1620,8 +1616,7 @@ ::dsn::error_code pegasus_server_impl::async_checkpoint(bool flush_memtable)
return ::dsn::ERR_WRONG_TIMING;

int64_t last_durable = last_durable_decree();
int64_t last_flushed = static_cast<int64_t>(
_meta_store->get_last_flushed_decree(meta_store::meta_store_type::kManifestOnly));
int64_t last_flushed = static_cast<int64_t>(_meta_store->get_last_flushed_decree());
int64_t last_commit = last_committed_decree();

dcheck_le_replica(last_durable, last_flushed);
Expand Down Expand Up @@ -2170,7 +2165,7 @@ void pegasus_server_impl::update_app_envs(const std::map<std::string, std::strin

int64_t pegasus_server_impl::last_flushed_decree() const
{
return _meta_store->get_last_flushed_decree(meta_store::meta_store_type::kManifestOnly);
return _meta_store->get_last_flushed_decree();
}

void pegasus_server_impl::update_app_envs_before_open_db(
Expand Down Expand Up @@ -2529,8 +2524,7 @@ uint64_t pegasus_server_impl::do_manual_compact(const rocksdb::CompactRangeOptio
// update rocksdb statistics immediately
update_replica_rocksdb_statistics();

return _meta_store->get_last_manual_compact_finish_time(
meta_store::meta_store_type::kManifestOnly);
return _meta_store->get_last_manual_compact_finish_time();
}

bool pegasus_server_impl::release_storage_after_manual_compact()
Expand Down

0 comments on commit 63c4ea8

Please sign in to comment.