Skip to content

Commit

Permalink
refactor:Feature:Online Query and Dynamic Modification of Table-level…
Browse files Browse the repository at this point in the history
… RocksDB Options (#1488)

#1488

Complete the dynamic setting function of num_levels and write_buffer_size option.I use rocksdb.write_buffer_size as a dynamically modifiable parameter and rocksdb.num_levels as a non-dynamically modifiable parameter to test my idea.

Pegasus shell case:
create lpf -e rocksdb.num_levels=12,rocksdb.write_buffer_size=100
create lpf -e rocksdb.num_levels=5,rocksdb.write_buffer_size=100
create lpf -e rocksdb.num_levels=5,rocksdb.write_buffer_size=33554432
set_app_envs rocksdb.write_buffer_size  67108864
set_app_envs rocksdb.num_levels 4
get_app_envs
  • Loading branch information
ruojieranyishen committed Jun 5, 2023
1 parent 86181aa commit 78080ac
Show file tree
Hide file tree
Showing 11 changed files with 182 additions and 2 deletions.
11 changes: 11 additions & 0 deletions src/base/pegasus_const.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,4 +101,15 @@ const std::string USER_SPECIFIED_COMPACTION("user_specified_compaction");
const std::string READ_SIZE_THROTTLING("replica.read_throttling_by_size");

const std::string ROCKSDB_ALLOW_INGEST_BEHIND("rocksdb.allow_ingest_behind");

const std::string ROCKSDB_WRITE_BUFFER_SIZE("rocksdb.write_buffer_size");

const std::string ROCKSDB_NUM_LEVELS("rocksdb.num_levels");

const std::set<std::string> ROCKSDB_DYNAMIC_OPTIONS = {
ROCKSDB_WRITE_BUFFER_SIZE,
};
const std::set<std::string> ROCKSDB_STATIC_OPTIONS = {
ROCKSDB_NUM_LEVELS,
};
} // namespace pegasus
9 changes: 9 additions & 0 deletions src/base/pegasus_const.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#pragma once

#include <set>
#include <string>

namespace pegasus {
Expand Down Expand Up @@ -72,4 +73,12 @@ extern const std::string USER_SPECIFIED_COMPACTION;
extern const std::string READ_SIZE_THROTTLING;

extern const std::string ROCKSDB_ALLOW_INGEST_BEHIND;

extern const std::string ROCKSDB_WRITE_BUFFER_SIZE;

extern const std::string ROCKSDB_NUM_LEVELS;

extern const std::set<std::string> ROCKSDB_DYNAMIC_OPTIONS;

extern const std::set<std::string> ROCKSDB_STATIC_OPTIONS;
} // namespace pegasus
7 changes: 7 additions & 0 deletions src/common/replica_envs.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

#include <cstdint>
#include <string>
#include <set>

namespace dsn {
namespace replication {
Expand Down Expand Up @@ -64,6 +65,12 @@ class replica_envs
static const std::string USER_SPECIFIED_COMPACTION;
static const std::string ROCKSDB_ALLOW_INGEST_BEHIND;
static const std::string UPDATE_MAX_REPLICA_COUNT;
static const std::string ROCKSDB_WRITE_BUFFER_SIZE;
static const std::string ROCKSDB_NUM_LEVELS;
static const std::string VALUE_VERSION;

static const std::set<std::string> ROCKSDB_DYNAMIC_OPTIONS;
static const std::set<std::string> ROCKSDB_STATIC_OPTIONS;
};

} // namespace replication
Expand Down
9 changes: 9 additions & 0 deletions src/common/replication_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,15 @@ const std::string replica_envs::USER_SPECIFIED_COMPACTION("user_specified_compac
const std::string replica_envs::BACKUP_REQUEST_QPS_THROTTLING("replica.backup_request_throttling");
const std::string replica_envs::ROCKSDB_ALLOW_INGEST_BEHIND("rocksdb.allow_ingest_behind");
const std::string replica_envs::UPDATE_MAX_REPLICA_COUNT("max_replica_count.update");
const std::string replica_envs::ROCKSDB_WRITE_BUFFER_SIZE("rocksdb.write_buffer_size");
const std::string replica_envs::ROCKSDB_NUM_LEVELS("rocksdb.num_levels");
const std::string replica_envs::VALUE_VERSION("value_version");

const std::set<std::string> replica_envs::ROCKSDB_DYNAMIC_OPTIONS = {
replica_envs::ROCKSDB_WRITE_BUFFER_SIZE,
};
const std::set<std::string> replica_envs::ROCKSDB_STATIC_OPTIONS = {
replica_envs::ROCKSDB_NUM_LEVELS,
};
} // namespace replication
} // namespace dsn
55 changes: 54 additions & 1 deletion src/meta/app_env_validator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,23 @@

namespace dsn {
namespace replication {
bool validate_app_envs(const std::map<std::string, std::string> &envs)
{
if (envs.size() == 0)
return true;
// check app envs information
std::string hint_message;
bool all_envs_vaild = true;
for (auto &it : envs) {
if (!validate_app_env(it.first, it.second, hint_message)) {
LOG_WARNING(
"app env {}={} is invaild, hint_message:{}", it.first, it.second, hint_message);
all_envs_vaild = false;
break;
}
}
return all_envs_vaild;
}

bool validate_app_env(const std::string &env_name,
const std::string &env_value,
Expand Down Expand Up @@ -151,6 +168,36 @@ bool check_bool_value(const std::string &env_value, std::string &hint_message)
return true;
}

bool check_rocksdb_write_buffer_size(const std::string &env_value, std::string &hint_message)
{
size_t val = 0;

if (!dsn::buf2uint64(env_value, val)) {
hint_message = fmt::format("rocksdb.write_buffer_size cannot set this val: {}", env_value);
return false;
}
if (val < (32 << 20) || val > (512 << 20)) {
hint_message =
fmt::format("rocksdb.write_buffer_size suggest set val in range [33554432, 536870912]");
return false;
}
return true;
}
bool check_rocksdb_num_levels(const std::string &env_value, std::string &hint_message)
{
int32_t val = 0;

if (!dsn::buf2int32(env_value, val)) {
hint_message = fmt::format("rocksdb.num_levels cannot set this val:", env_value);
return false;
}
if (val < 1 || val > 10) {
hint_message = fmt::format("rocksdb.num_levels suggest set val in range [1 , 10]");
return false;
}
return true;
}

bool app_env_validator::validate_app_env(const std::string &env_name,
const std::string &env_value,
std::string &hint_message)
Expand Down Expand Up @@ -198,6 +245,10 @@ void app_env_validator::register_all_validators()
std::bind(&check_bool_value, std::placeholders::_1, std::placeholders::_2)},
{replica_envs::DENY_CLIENT_REQUEST,
std::bind(&check_deny_client, std::placeholders::_1, std::placeholders::_2)},
{replica_envs::ROCKSDB_WRITE_BUFFER_SIZE,
std::bind(&check_rocksdb_write_buffer_size, std::placeholders::_1, std::placeholders::_2)},
{replica_envs::ROCKSDB_NUM_LEVELS,
std::bind(&check_rocksdb_num_levels, std::placeholders::_1, std::placeholders::_2)},
// TODO(zhaoliwei): not implemented
{replica_envs::BUSINESS_INFO, nullptr},
{replica_envs::TABLE_LEVEL_DEFAULT_TTL, nullptr},
Expand All @@ -213,7 +264,9 @@ void app_env_validator::register_all_validators()
{replica_envs::MANUAL_COMPACT_PERIODIC_TARGET_LEVEL, nullptr},
{replica_envs::MANUAL_COMPACT_PERIODIC_BOTTOMMOST_LEVEL_COMPACTION, nullptr},
{replica_envs::REPLICA_ACCESS_CONTROLLER_ALLOWED_USERS, nullptr},
{replica_envs::REPLICA_ACCESS_CONTROLLER_RANGER_POLICIES, nullptr}};
{replica_envs::REPLICA_ACCESS_CONTROLLER_RANGER_POLICIES, nullptr},
{replica_envs::VALUE_VERSION, nullptr},
};
}

} // namespace replication
Expand Down
2 changes: 2 additions & 0 deletions src/meta/app_env_validator.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
namespace dsn {
namespace replication {

bool validate_app_envs(const std::map<std::string, std::string> &envs);

bool validate_app_env(const std::string &env_name,
const std::string &env_value,
std::string &hint_message);
Expand Down
9 changes: 9 additions & 0 deletions src/meta/server_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1151,6 +1151,9 @@ void server_state::create_app(dsn::message_ex *msg)
!validate_target_max_replica_count(request.options.replica_count)) {
response.err = ERR_INVALID_PARAMETERS;
will_create_app = false;
} else if (!validate_app_envs(request.options.envs)) {
response.err = ERR_INVALID_PARAMETERS;
will_create_app = false;
} else {
zauto_write_lock l(_lock);
app = get_app(request.app_name);
Expand Down Expand Up @@ -2754,6 +2757,12 @@ void server_state::set_app_envs(const app_env_rpc &env_rpc)
return;
}

if (replica_envs::ROCKSDB_STATIC_OPTIONS.find(keys[i]) !=
replica_envs::ROCKSDB_STATIC_OPTIONS.end()) {
env_rpc.response().err = ERR_INVALID_PARAMETERS;
env_rpc.response().hint_message = "static rocksdb option only can set by create app";
return;
}
os << keys[i] << "=" << values[i];
}
LOG_INFO("set app envs for app({}) from remote({}): kvs = {}",
Expand Down
16 changes: 16 additions & 0 deletions src/meta/test/meta_app_envs_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,22 @@ TEST_F(meta_app_envs_test, update_app_envs_test)
{replica_envs::MANUAL_COMPACT_ONCE_TARGET_LEVEL, "80", ERR_OK, "", "80"},
{replica_envs::MANUAL_COMPACT_PERIODIC_TRIGGER_TIME, "90", ERR_OK, "", "90"},
{replica_envs::MANUAL_COMPACT_PERIODIC_TARGET_LEVEL, "100", ERR_OK, "", "100"},
{replica_envs::ROCKSDB_WRITE_BUFFER_SIZE,
"100",
ERR_INVALID_PARAMETERS,
"rocksdb.write_buffer_size suggest set val in range [33554432, 536870912]",
"67108864"},
{replica_envs::ROCKSDB_WRITE_BUFFER_SIZE,
"636870912",
ERR_INVALID_PARAMETERS,
"rocksdb.write_buffer_size suggest set val in range [33554432, 536870912]",
"536870912"},
{replica_envs::ROCKSDB_WRITE_BUFFER_SIZE, "67108864", ERR_OK, "", "67108864"},
{replica_envs::ROCKSDB_NUM_LEVELS,
"5",
ERR_INVALID_PARAMETERS,
"static rocksdb option only can set by create app",
"5"},
{replica_envs::MANUAL_COMPACT_PERIODIC_BOTTOMMOST_LEVEL_COMPACTION,
"200",
ERR_OK,
Expand Down
58 changes: 58 additions & 0 deletions src/server/pegasus_server_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2625,6 +2625,61 @@ pegasus_server_impl::get_restore_dir_from_env(const std::map<std::string, std::s
return res;
}

void pegasus_server_impl::update_rocksdb_dynamic_options(
const std::map<std::string, std::string> &envs)
{
if (envs.size() == 0)
return;

std::unordered_map<std::string, std::string> new_options;
for (auto &option : ROCKSDB_DYNAMIC_OPTIONS) {
auto find = envs.find(option);
if (option.compare(ROCKSDB_WRITE_BUFFER_SIZE) == 0 && find != envs.end()) {
new_options["write_buffer_size"] = find->second;
}
}

// doing set option
if (new_options.size() != 0 && !set_options(new_options)) {
LOG_WARNING("Set options fails");
}
}

void pegasus_server_impl::update_rocksdb_options_before_create_replica(
const std::map<std::string, std::string> &envs)
{
if (envs.size() == 0)
return;

for (auto &option : ROCKSDB_STATIC_OPTIONS) {
auto find = envs.find(option);
bool is_set = false;
if (option.compare(ROCKSDB_NUM_LEVELS) == 0 && find != envs.end()) {
dsn::buf2int32(find->second, _data_cf_opts.num_levels);
is_set = true;
}

if (is_set)
LOG_INFO("Reset {} \"{}\" succeed", find->first, find->second);
else
LOG_WARNING("Reset {} \"{}\" failed", find->first, find->second);
}

for (auto &option : ROCKSDB_DYNAMIC_OPTIONS) {
auto find = envs.find(option);
bool is_set = false;

if (option.compare(ROCKSDB_WRITE_BUFFER_SIZE) == 0 && find != envs.end()) {
dsn::buf2uint64(find->second, _data_cf_opts.write_buffer_size);
is_set = true;
}
if (is_set)
LOG_INFO("Reset {} \"{}\" succeed", find->first, find->second);
else
LOG_WARNING("Reset {} \"{}\" failed", find->first, find->second);
}
}

void pegasus_server_impl::update_app_envs(const std::map<std::string, std::string> &envs)
{
update_usage_scenario(envs);
Expand All @@ -2637,6 +2692,7 @@ void pegasus_server_impl::update_app_envs(const std::map<std::string, std::strin
_manual_compact_svc.start_manual_compact_if_needed(envs);

update_throttling_controller(envs);
update_rocksdb_dynamic_options(envs);
}

void pegasus_server_impl::update_app_envs_before_open_db(
Expand All @@ -2650,6 +2706,7 @@ void pegasus_server_impl::update_app_envs_before_open_db(
update_validate_partition_hash(envs);
update_user_specified_compaction(envs);
_manual_compact_svc.start_manual_compact_if_needed(envs);
update_rocksdb_options_before_create_replica(envs);
}

void pegasus_server_impl::query_app_envs(/*out*/ std::map<std::string, std::string> &envs)
Expand Down Expand Up @@ -3054,6 +3111,7 @@ void pegasus_server_impl::reset_usage_scenario_options(
target_opts->max_compaction_bytes = base_opts.max_compaction_bytes;
target_opts->write_buffer_size = base_opts.write_buffer_size;
target_opts->max_write_buffer_number = base_opts.max_write_buffer_number;
target_opts->num_levels = base_opts.num_levels;
}

void pegasus_server_impl::recalculate_data_cf_options(
Expand Down
7 changes: 7 additions & 0 deletions src/server/pegasus_server_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <gtest/gtest_prod.h>
#include <rocksdb/options.h>
#include <rocksdb/slice.h>
#include <rocksdb/table.h>
#include <rrdb/rrdb_types.h>
#include <stdint.h>
#include <atomic>
Expand Down Expand Up @@ -328,6 +329,11 @@ class pegasus_server_impl : public pegasus_read_service

void update_user_specified_compaction(const std::map<std::string, std::string> &envs);

void update_rocksdb_dynamic_options(const std::map<std::string, std::string> &envs);

void
update_rocksdb_options_before_create_replica(const std::map<std::string, std::string> &envs);

void update_throttling_controller(const std::map<std::string, std::string> &envs);

bool parse_allow_ingest_behind(const std::map<std::string, std::string> &envs);
Expand Down Expand Up @@ -468,6 +474,7 @@ class pegasus_server_impl : public pegasus_read_service
// Dynamically calculate the value of current data_cf option according to the conf module file
// and usage scenario
rocksdb::ColumnFamilyOptions _table_data_cf_opts;
rocksdb::BlockBasedTableOptions tbl_opts;
rocksdb::ColumnFamilyOptions _meta_cf_opts;
rocksdb::ReadOptions _data_cf_rd_opts;
std::string _usage_scenario;
Expand Down
1 change: 0 additions & 1 deletion src/server/pegasus_server_impl_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,6 @@ pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
CHECK(parse_compression_types("none", _meta_cf_opts.compression_per_level),
"parse rocksdb_compression_type failed.");

rocksdb::BlockBasedTableOptions tbl_opts;
tbl_opts.read_amp_bytes_per_bit = FLAGS_read_amp_bytes_per_bit;

if (FLAGS_rocksdb_disable_table_block_cache) {
Expand Down

0 comments on commit 78080ac

Please sign in to comment.