Skip to content

Commit

Permalink
Feature:Online Query and Dynamic Modification of Table-level RocksDB …
Browse files Browse the repository at this point in the history
…Options (#1511)

#1488

Complete the dynamic setting function of num_levels and write_buffer_size option.
I use rocksdb.write_buffer_size as a dynamically modifiable parameter and rocksdb.num_levels
as a non-dynamically modifiable parameter to test my idea.
  • Loading branch information
ruojieranyishen authored Aug 11, 2023
1 parent 10a5435 commit fc61628
Show file tree
Hide file tree
Showing 13 changed files with 520 additions and 66 deletions.
49 changes: 49 additions & 0 deletions src/base/pegasus_const.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@

#include "pegasus_const.h"

#include <rocksdb/options.h>
#include <stddef.h>
#include <stdint.h>

#include "utils/string_conv.h"

namespace pegasus {

// should be same with items in dsn::backup_restore_constant
Expand Down Expand Up @@ -101,4 +107,47 @@ const std::string USER_SPECIFIED_COMPACTION("user_specified_compaction");
const std::string READ_SIZE_THROTTLING("replica.read_throttling_by_size");

const std::string ROCKSDB_ALLOW_INGEST_BEHIND("rocksdb.allow_ingest_behind");

const std::string ROCKSDB_WRITE_BUFFER_SIZE("rocksdb.write_buffer_size");

const std::string ROCKSDB_NUM_LEVELS("rocksdb.num_levels");

const std::set<std::string> ROCKSDB_DYNAMIC_OPTIONS = {
ROCKSDB_WRITE_BUFFER_SIZE,
};
const std::set<std::string> ROCKSDB_STATIC_OPTIONS = {
ROCKSDB_NUM_LEVELS,
};

const std::unordered_map<std::string, cf_opts_setter> cf_opts_setters = {
{ROCKSDB_WRITE_BUFFER_SIZE,
[](const std::string &str, rocksdb::ColumnFamilyOptions &option) -> bool {
uint64_t val = 0;
if (!dsn::buf2uint64(str, val)) {
return false;
}
option.write_buffer_size = static_cast<size_t>(val);
return true;
}},
{ROCKSDB_NUM_LEVELS,
[](const std::string &str, rocksdb::ColumnFamilyOptions &option) -> bool {
int32_t val = 0;
if (!dsn::buf2int32(str, val)) {
return false;
}
option.num_levels = val;
return true;
}},
};

const std::unordered_map<std::string, cf_opts_getter> cf_opts_getters = {
{ROCKSDB_WRITE_BUFFER_SIZE,
[](const rocksdb::ColumnFamilyOptions &option, /*out*/ std::string &str) {
str = std::to_string(option.write_buffer_size);
}},
{ROCKSDB_NUM_LEVELS,
[](const rocksdb::ColumnFamilyOptions &option, /*out*/ std::string &str) {
str = std::to_string(option.num_levels);
}},
};
} // namespace pegasus
22 changes: 22 additions & 0 deletions src/base/pegasus_const.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,14 @@

#pragma once

#include <functional>
#include <set>
#include <string>
#include <unordered_map>

namespace rocksdb {
struct ColumnFamilyOptions;
} // namespace rocksdb

namespace pegasus {

Expand Down Expand Up @@ -72,4 +79,19 @@ extern const std::string USER_SPECIFIED_COMPACTION;
extern const std::string READ_SIZE_THROTTLING;

extern const std::string ROCKSDB_ALLOW_INGEST_BEHIND;

extern const std::string ROCKSDB_WRITE_BUFFER_SIZE;

extern const std::string ROCKSDB_NUM_LEVELS;

extern const std::set<std::string> ROCKSDB_DYNAMIC_OPTIONS;

extern const std::set<std::string> ROCKSDB_STATIC_OPTIONS;

using cf_opts_setter = std::function<bool(const std::string &, rocksdb::ColumnFamilyOptions &)>;
extern const std::unordered_map<std::string, cf_opts_setter> cf_opts_setters;

using cf_opts_getter =
std::function<void(const rocksdb::ColumnFamilyOptions &, /*out*/ std::string &)>;
extern const std::unordered_map<std::string, cf_opts_getter> cf_opts_getters;
} // namespace pegasus
6 changes: 6 additions & 0 deletions src/common/replica_envs.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

#include <cstdint>
#include <string>
#include <set>

namespace dsn {
namespace replication {
Expand Down Expand Up @@ -64,6 +65,11 @@ class replica_envs
static const std::string USER_SPECIFIED_COMPACTION;
static const std::string ROCKSDB_ALLOW_INGEST_BEHIND;
static const std::string UPDATE_MAX_REPLICA_COUNT;
static const std::string ROCKSDB_WRITE_BUFFER_SIZE;
static const std::string ROCKSDB_NUM_LEVELS;

static const std::set<std::string> ROCKSDB_DYNAMIC_OPTIONS;
static const std::set<std::string> ROCKSDB_STATIC_OPTIONS;
};

} // namespace replication
Expand Down
9 changes: 9 additions & 0 deletions src/common/replication_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <algorithm>
#include <fstream>
#include <memory>
#include <set>

#include "common/gpid.h"
#include "common/replica_envs.h"
Expand Down Expand Up @@ -394,6 +395,14 @@ const std::string replica_envs::USER_SPECIFIED_COMPACTION("user_specified_compac
const std::string replica_envs::BACKUP_REQUEST_QPS_THROTTLING("replica.backup_request_throttling");
const std::string replica_envs::ROCKSDB_ALLOW_INGEST_BEHIND("rocksdb.allow_ingest_behind");
const std::string replica_envs::UPDATE_MAX_REPLICA_COUNT("max_replica_count.update");
const std::string replica_envs::ROCKSDB_WRITE_BUFFER_SIZE("rocksdb.write_buffer_size");
const std::string replica_envs::ROCKSDB_NUM_LEVELS("rocksdb.num_levels");

const std::set<std::string> replica_envs::ROCKSDB_DYNAMIC_OPTIONS = {
replica_envs::ROCKSDB_WRITE_BUFFER_SIZE,
};
const std::set<std::string> replica_envs::ROCKSDB_STATIC_OPTIONS = {
replica_envs::ROCKSDB_NUM_LEVELS,
};
} // namespace replication
} // namespace dsn
58 changes: 57 additions & 1 deletion src/meta/app_env_validator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <fmt/core.h>
#include <stdint.h>
#include <memory>
#include <set>
#include <utility>
#include <vector>

Expand All @@ -31,6 +32,26 @@

namespace dsn {
namespace replication {
bool validate_app_envs(const std::map<std::string, std::string> &envs)
{
// only check rocksdb app envs currently

for (const auto &it : envs) {
if (replica_envs::ROCKSDB_STATIC_OPTIONS.find(it.first) ==
replica_envs::ROCKSDB_STATIC_OPTIONS.end() &&
replica_envs::ROCKSDB_DYNAMIC_OPTIONS.find(it.first) ==
replica_envs::ROCKSDB_DYNAMIC_OPTIONS.end()) {
continue;
}
std::string hint_message;
if (!validate_app_env(it.first, it.second, hint_message)) {
LOG_WARNING(
"app env {}={} is invaild, hint_message:{}", it.first, it.second, hint_message);
return false;
}
}
return true;
}

bool validate_app_env(const std::string &env_name,
const std::string &env_value,
Expand Down Expand Up @@ -151,6 +172,36 @@ bool check_bool_value(const std::string &env_value, std::string &hint_message)
return true;
}

bool check_rocksdb_write_buffer_size(const std::string &env_value, std::string &hint_message)
{
uint64_t val = 0;

if (!dsn::buf2uint64(env_value, val)) {
hint_message = fmt::format("rocksdb.write_buffer_size cannot set this val: {}", env_value);
return false;
}
if (val < (16 << 20) || val > (512 << 20)) {
hint_message =
fmt::format("rocksdb.write_buffer_size suggest set val in range [16777216, 536870912]");
return false;
}
return true;
}
bool check_rocksdb_num_levels(const std::string &env_value, std::string &hint_message)
{
int32_t val = 0;

if (!dsn::buf2int32(env_value, val)) {
hint_message = fmt::format("rocksdb.num_levels cannot set this val: {}", env_value);
return false;
}
if (val < 1 || val > 10) {
hint_message = fmt::format("rocksdb.num_levels suggest set val in range [1 , 10]");
return false;
}
return true;
}

bool app_env_validator::validate_app_env(const std::string &env_name,
const std::string &env_value,
std::string &hint_message)
Expand Down Expand Up @@ -198,6 +249,10 @@ void app_env_validator::register_all_validators()
std::bind(&check_bool_value, std::placeholders::_1, std::placeholders::_2)},
{replica_envs::DENY_CLIENT_REQUEST,
std::bind(&check_deny_client, std::placeholders::_1, std::placeholders::_2)},
{replica_envs::ROCKSDB_WRITE_BUFFER_SIZE,
std::bind(&check_rocksdb_write_buffer_size, std::placeholders::_1, std::placeholders::_2)},
{replica_envs::ROCKSDB_NUM_LEVELS,
std::bind(&check_rocksdb_num_levels, std::placeholders::_1, std::placeholders::_2)},
// TODO(zhaoliwei): not implemented
{replica_envs::BUSINESS_INFO, nullptr},
{replica_envs::TABLE_LEVEL_DEFAULT_TTL, nullptr},
Expand All @@ -213,7 +268,8 @@ void app_env_validator::register_all_validators()
{replica_envs::MANUAL_COMPACT_PERIODIC_TARGET_LEVEL, nullptr},
{replica_envs::MANUAL_COMPACT_PERIODIC_BOTTOMMOST_LEVEL_COMPACTION, nullptr},
{replica_envs::REPLICA_ACCESS_CONTROLLER_ALLOWED_USERS, nullptr},
{replica_envs::REPLICA_ACCESS_CONTROLLER_RANGER_POLICIES, nullptr}};
{replica_envs::REPLICA_ACCESS_CONTROLLER_RANGER_POLICIES, nullptr},
};
}

} // namespace replication
Expand Down
2 changes: 2 additions & 0 deletions src/meta/app_env_validator.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
namespace dsn {
namespace replication {

bool validate_app_envs(const std::map<std::string, std::string> &envs);

bool validate_app_env(const std::string &env_name,
const std::string &env_value,
std::string &hint_message);
Expand Down
3 changes: 3 additions & 0 deletions src/meta/server_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1151,6 +1151,9 @@ void server_state::create_app(dsn::message_ex *msg)
!validate_target_max_replica_count(request.options.replica_count)) {
response.err = ERR_INVALID_PARAMETERS;
will_create_app = false;
} else if (!validate_app_envs(request.options.envs)) {
response.err = ERR_INVALID_PARAMETERS;
will_create_app = false;
} else {
zauto_write_lock l(_lock);
app = get_app(request.app_name);
Expand Down
24 changes: 24 additions & 0 deletions src/meta/test/meta_app_envs_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <gtest/gtest.h>
#include <map>
#include <memory>
#include <set>
#include <string>

#include "common/replica_envs.h"
Expand Down Expand Up @@ -142,6 +143,17 @@ TEST_F(meta_app_envs_test, update_app_envs_test)
{replica_envs::MANUAL_COMPACT_ONCE_TARGET_LEVEL, "80", ERR_OK, "", "80"},
{replica_envs::MANUAL_COMPACT_PERIODIC_TRIGGER_TIME, "90", ERR_OK, "", "90"},
{replica_envs::MANUAL_COMPACT_PERIODIC_TARGET_LEVEL, "100", ERR_OK, "", "100"},
{replica_envs::ROCKSDB_WRITE_BUFFER_SIZE,
"100",
ERR_INVALID_PARAMETERS,
"rocksdb.write_buffer_size suggest set val in range [16777216, 536870912]",
"67108864"},
{replica_envs::ROCKSDB_WRITE_BUFFER_SIZE,
"636870912",
ERR_INVALID_PARAMETERS,
"rocksdb.write_buffer_size suggest set val in range [16777216, 536870912]",
"536870912"},
{replica_envs::ROCKSDB_WRITE_BUFFER_SIZE, "67108864", ERR_OK, "", "67108864"},
{replica_envs::MANUAL_COMPACT_PERIODIC_BOTTOMMOST_LEVEL_COMPACTION,
"200",
ERR_OK,
Expand Down Expand Up @@ -192,6 +204,18 @@ TEST_F(meta_app_envs_test, update_app_envs_test)
ASSERT_EQ(app->envs.at(test.env_key), test.expect_value);
}
}

{
// Make sure all rocksdb options of ROCKSDB_DYNAMIC_OPTIONS are tested.
// Hint: Mainly verify the update_rocksdb_dynamic_options function.
std::map<std::string, std::string> all_test_envs;
for (const auto &test : tests) {
all_test_envs[test.env_key] = test.env_value;
}
for (const auto &option : replica_envs::ROCKSDB_DYNAMIC_OPTIONS) {
ASSERT_TRUE(all_test_envs.find(option) != all_test_envs.end());
}
}
}

} // namespace replication
Expand Down
Loading

0 comments on commit fc61628

Please sign in to comment.