Skip to content

Commit

Permalink
refactor:Feature:Online Query and Dynamic Modification of Table-level…
Browse files Browse the repository at this point in the history
… RocksDB Options (#1488)

#1488

Complete the dynamic setting function of num_levels and write_buffer_size option.I use rocksdb.write_buffer_size as a dynamically modifiable parameter and rocksdb.num_levels as a non-dynamically modifiable parameter to test my idea.

Pegasus shell case:
create lpf -e rocksdb.num_levels=12,rocksdb.write_buffer_size=100
create lpf -e rocksdb.num_levels=5,rocksdb.write_buffer_size=100
create lpf -e rocksdb.num_levels=5,rocksdb.write_buffer_size=33554432
set_app_envs rocksdb.write_buffer_size  67108864
set_app_envs rocksdb.num_levels 4
get_app_envs
  • Loading branch information
ruojieranyishen committed Jun 5, 2023
1 parent 86181aa commit 0438b7e
Show file tree
Hide file tree
Showing 11 changed files with 279 additions and 94 deletions.
11 changes: 11 additions & 0 deletions src/base/pegasus_const.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,4 +101,15 @@ const std::string USER_SPECIFIED_COMPACTION("user_specified_compaction");
const std::string READ_SIZE_THROTTLING("replica.read_throttling_by_size");

const std::string ROCKSDB_ALLOW_INGEST_BEHIND("rocksdb.allow_ingest_behind");

const std::string ROCKSDB_WRITE_BUFFER_SIZE("rocksdb.write_buffer_size");

const std::string ROCKSDB_NUM_LEVELS("rocksdb.num_levels");

const std::set<std::string> ROCKSDB_DYNAMIC_OPTIONS = {
ROCKSDB_WRITE_BUFFER_SIZE,
};
const std::set<std::string> ROCKSDB_STATIC_OPTIONS = {
ROCKSDB_NUM_LEVELS,
};
} // namespace pegasus
9 changes: 9 additions & 0 deletions src/base/pegasus_const.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#pragma once

#include <set>
#include <string>

namespace pegasus {
Expand Down Expand Up @@ -72,4 +73,12 @@ extern const std::string USER_SPECIFIED_COMPACTION;
extern const std::string READ_SIZE_THROTTLING;

extern const std::string ROCKSDB_ALLOW_INGEST_BEHIND;

extern const std::string ROCKSDB_WRITE_BUFFER_SIZE;

extern const std::string ROCKSDB_NUM_LEVELS;

extern const std::set<std::string> ROCKSDB_DYNAMIC_OPTIONS;

extern const std::set<std::string> ROCKSDB_STATIC_OPTIONS;
} // namespace pegasus
7 changes: 7 additions & 0 deletions src/common/replica_envs.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

#include <cstdint>
#include <string>
#include <set>

namespace dsn {
namespace replication {
Expand Down Expand Up @@ -64,6 +65,12 @@ class replica_envs
static const std::string USER_SPECIFIED_COMPACTION;
static const std::string ROCKSDB_ALLOW_INGEST_BEHIND;
static const std::string UPDATE_MAX_REPLICA_COUNT;
static const std::string ROCKSDB_WRITE_BUFFER_SIZE;
static const std::string ROCKSDB_NUM_LEVELS;
static const std::string VALUE_VERSION;

static const std::set<std::string> ROCKSDB_DYNAMIC_OPTIONS;
static const std::set<std::string> ROCKSDB_STATIC_OPTIONS;
};

} // namespace replication
Expand Down
9 changes: 9 additions & 0 deletions src/common/replication_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,15 @@ const std::string replica_envs::USER_SPECIFIED_COMPACTION("user_specified_compac
const std::string replica_envs::BACKUP_REQUEST_QPS_THROTTLING("replica.backup_request_throttling");
const std::string replica_envs::ROCKSDB_ALLOW_INGEST_BEHIND("rocksdb.allow_ingest_behind");
const std::string replica_envs::UPDATE_MAX_REPLICA_COUNT("max_replica_count.update");
const std::string replica_envs::ROCKSDB_WRITE_BUFFER_SIZE("rocksdb.write_buffer_size");
const std::string replica_envs::ROCKSDB_NUM_LEVELS("rocksdb.num_levels");
const std::string replica_envs::VALUE_VERSION("value_version");

const std::set<std::string> replica_envs::ROCKSDB_DYNAMIC_OPTIONS = {
replica_envs::ROCKSDB_WRITE_BUFFER_SIZE,
};
const std::set<std::string> replica_envs::ROCKSDB_STATIC_OPTIONS = {
replica_envs::ROCKSDB_NUM_LEVELS,
};
} // namespace replication
} // namespace dsn
55 changes: 54 additions & 1 deletion src/meta/app_env_validator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,23 @@

namespace dsn {
namespace replication {
bool validate_app_envs(const std::map<std::string, std::string> &envs)
{
if (envs.size() == 0)
return true;
// check app envs information
std::string hint_message;
bool all_envs_vaild = true;
for (auto &it : envs) {
if (!validate_app_env(it.first, it.second, hint_message)) {
LOG_WARNING(
"app env {}={} is invaild, hint_message:{}", it.first, it.second, hint_message);
all_envs_vaild = false;
break;
}
}
return all_envs_vaild;
}

bool validate_app_env(const std::string &env_name,
const std::string &env_value,
Expand Down Expand Up @@ -151,6 +168,36 @@ bool check_bool_value(const std::string &env_value, std::string &hint_message)
return true;
}

bool check_rocksdb_write_buffer_size(const std::string &env_value, std::string &hint_message)
{
size_t val = 0;

if (!dsn::buf2uint64(env_value, val)) {
hint_message = fmt::format("rocksdb.write_buffer_size cannot set this val: {}", env_value);
return false;
}
if (val < (32 << 20) || val > (512 << 20)) {
hint_message =
fmt::format("rocksdb.write_buffer_size suggest set val in range [33554432, 536870912]");
return false;
}
return true;
}
bool check_rocksdb_num_levels(const std::string &env_value, std::string &hint_message)
{
int32_t val = 0;

if (!dsn::buf2int32(env_value, val)) {
hint_message = fmt::format("rocksdb.num_levels cannot set this val:", env_value);
return false;
}
if (val < 1 || val > 10) {
hint_message = fmt::format("rocksdb.num_levels suggest set val in range [1 , 10]");
return false;
}
return true;
}

bool app_env_validator::validate_app_env(const std::string &env_name,
const std::string &env_value,
std::string &hint_message)
Expand Down Expand Up @@ -198,6 +245,10 @@ void app_env_validator::register_all_validators()
std::bind(&check_bool_value, std::placeholders::_1, std::placeholders::_2)},
{replica_envs::DENY_CLIENT_REQUEST,
std::bind(&check_deny_client, std::placeholders::_1, std::placeholders::_2)},
{replica_envs::ROCKSDB_WRITE_BUFFER_SIZE,
std::bind(&check_rocksdb_write_buffer_size, std::placeholders::_1, std::placeholders::_2)},
{replica_envs::ROCKSDB_NUM_LEVELS,
std::bind(&check_rocksdb_num_levels, std::placeholders::_1, std::placeholders::_2)},
// TODO(zhaoliwei): not implemented
{replica_envs::BUSINESS_INFO, nullptr},
{replica_envs::TABLE_LEVEL_DEFAULT_TTL, nullptr},
Expand All @@ -213,7 +264,9 @@ void app_env_validator::register_all_validators()
{replica_envs::MANUAL_COMPACT_PERIODIC_TARGET_LEVEL, nullptr},
{replica_envs::MANUAL_COMPACT_PERIODIC_BOTTOMMOST_LEVEL_COMPACTION, nullptr},
{replica_envs::REPLICA_ACCESS_CONTROLLER_ALLOWED_USERS, nullptr},
{replica_envs::REPLICA_ACCESS_CONTROLLER_RANGER_POLICIES, nullptr}};
{replica_envs::REPLICA_ACCESS_CONTROLLER_RANGER_POLICIES, nullptr},
{replica_envs::VALUE_VERSION, nullptr},
};
}

} // namespace replication
Expand Down
2 changes: 2 additions & 0 deletions src/meta/app_env_validator.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
namespace dsn {
namespace replication {

bool validate_app_envs(const std::map<std::string, std::string> &envs);

bool validate_app_env(const std::string &env_name,
const std::string &env_value,
std::string &hint_message);
Expand Down
131 changes: 70 additions & 61 deletions src/meta/server_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -543,10 +543,11 @@ error_code server_state::sync_apps_to_remote_storage()
error_code err;
dist::meta_state_service *storage = _meta_svc->get_remote_storage();

auto t = storage->create_node(apps_path,
LPC_META_CALLBACK,
[&err](error_code ec) { err = ec; },
blob(lock_state, 0, strlen(lock_state)));
auto t = storage->create_node(
apps_path,
LPC_META_CALLBACK,
[&err](error_code ec) { err = ec; },
blob(lock_state, 0, strlen(lock_state)));
t->wait();

if (err != ERR_NODE_ALREADY_EXIST && err != ERR_OK) {
Expand All @@ -566,19 +567,19 @@ error_code server_state::sync_apps_to_remote_storage()
"invalid app status");
blob value = app->to_json(app_status::AS_CREATING == app->status ? app_status::AS_AVAILABLE
: app_status::AS_DROPPED);
storage->create_node(path,
LPC_META_CALLBACK,
[&err, path](error_code ec) {
if (ec != ERR_OK && ec != ERR_NODE_ALREADY_EXIST) {
LOG_WARNING(
"create app node failed, path({}) reason({})", path, ec);
err = ec;
} else {
LOG_INFO("create app node {} ok", path);
}
},
value,
&tracker);
storage->create_node(
path,
LPC_META_CALLBACK,
[&err, path](error_code ec) {
if (ec != ERR_OK && ec != ERR_NODE_ALREADY_EXIST) {
LOG_WARNING("create app node failed, path({}) reason({})", path, ec);
err = ec;
} else {
LOG_INFO("create app node {} ok", path);
}
},
value,
&tracker);
}
tracker.wait_outstanding_tasks();

Expand All @@ -589,8 +590,8 @@ error_code server_state::sync_apps_to_remote_storage()
for (auto &kv : _all_apps) {
std::shared_ptr<app_state> &app = kv.second;
for (unsigned int i = 0; i != app->partition_count; ++i) {
task_ptr init_callback =
tasking::create_task(LPC_META_STATE_HIGH, &tracker, [] {}, sStateHash);
task_ptr init_callback = tasking::create_task(
LPC_META_STATE_HIGH, &tracker, [] {}, sStateHash);
init_app_partition_node(app, i, init_callback);
}
}
Expand All @@ -615,8 +616,9 @@ dsn::error_code server_state::sync_apps_from_remote_storage()
dsn::task_tracker tracker;

dist::meta_state_service *storage = _meta_svc->get_remote_storage();
auto sync_partition = [this, storage, &err, &tracker](
std::shared_ptr<app_state> &app, int partition_id, const std::string &partition_path) {
auto sync_partition = [this, storage, &err, &tracker](std::shared_ptr<app_state> &app,
int partition_id,
const std::string &partition_path) {
storage->get_data(
partition_path,
LPC_META_CALLBACK,
Expand Down Expand Up @@ -1151,6 +1153,9 @@ void server_state::create_app(dsn::message_ex *msg)
!validate_target_max_replica_count(request.options.replica_count)) {
response.err = ERR_INVALID_PARAMETERS;
will_create_app = false;
} else if (!validate_app_envs(request.options.envs)) {
response.err = ERR_INVALID_PARAMETERS;
will_create_app = false;
} else {
zauto_write_lock l(_lock);
app = get_app(request.app_name);
Expand Down Expand Up @@ -1706,15 +1711,14 @@ void server_state::on_update_configuration_on_remote_reply(
CHECK(app->status == app_status::AS_AVAILABLE || app->status == app_status::AS_DROPPING,
"if app removed, this task should be cancelled");
if (ec == ERR_TIMEOUT) {
cc.pending_sync_task =
tasking::enqueue(LPC_META_STATE_HIGH,
tracker(),
[this, config_request, &cc]() mutable {
cc.pending_sync_task =
update_configuration_on_remote(config_request);
},
0,
std::chrono::seconds(1));
cc.pending_sync_task = tasking::enqueue(
LPC_META_STATE_HIGH,
tracker(),
[this, config_request, &cc]() mutable {
cc.pending_sync_task = update_configuration_on_remote(config_request);
},
0,
std::chrono::seconds(1));
} else if (ec == ERR_OK) {
update_configuration_locally(*app, config_request);
cc.pending_sync_task = nullptr;
Expand Down Expand Up @@ -2233,8 +2237,9 @@ error_code server_state::construct_partitions(
std::ostringstream oss;
if (skip_lost_partitions) {
oss << "WARNING: partition(" << app->app_id << "."
<< pc.pid.get_partition_index() << ") has no replica collected, force "
"recover the lost partition to empty"
<< pc.pid.get_partition_index()
<< ") has no replica collected, force "
"recover the lost partition to empty"
<< std::endl;
} else {
oss << "ERROR: partition(" << app->app_id << "."
Expand Down Expand Up @@ -2706,8 +2711,7 @@ void server_state::do_update_app_info(const std::string &app_path,
{
// persistent envs to zookeeper
blob value = dsn::json::json_forwarder<app_info>::encode(info);
auto new_cb = [ this, app_path, info, user_cb = std::move(cb) ](error_code ec)
{
auto new_cb = [this, app_path, info, user_cb = std::move(cb)](error_code ec) {
if (ec == ERR_OK) {
user_cb(ec);
} else if (ec == ERR_TIMEOUT) {
Expand Down Expand Up @@ -2754,6 +2758,12 @@ void server_state::set_app_envs(const app_env_rpc &env_rpc)
return;
}

if (replica_envs::ROCKSDB_STATIC_OPTIONS.find(keys[i]) !=
replica_envs::ROCKSDB_STATIC_OPTIONS.end()) {
env_rpc.response().err = ERR_INVALID_PARAMETERS;
env_rpc.response().hint_message = "static rocksdb option only can set by create app";
return;
}
os << keys[i] << "=" << values[i];
}
LOG_INFO("set app envs for app({}) from remote({}): kvs = {}",
Expand Down Expand Up @@ -3654,21 +3664,21 @@ task_ptr server_state::update_partition_max_replica_count_on_remote(
new_ballot);

// NOTICE: pending_sync_task should be reassigned
return tasking::enqueue(LPC_META_STATE_HIGH,
tracker(),
[this, app, new_partition_config, on_partition_updated]() mutable {
const auto &gpid = new_partition_config.pid;
const auto partition_index = gpid.get_partition_index();
return tasking::enqueue(
LPC_META_STATE_HIGH,
tracker(),
[this, app, new_partition_config, on_partition_updated]() mutable {
const auto &gpid = new_partition_config.pid;
const auto partition_index = gpid.get_partition_index();

zauto_write_lock l(_lock);
zauto_write_lock l(_lock);

auto &context = app->helpers->contexts[partition_index];
context.pending_sync_task =
update_partition_max_replica_count_on_remote(
app, new_partition_config, on_partition_updated);
},
server_state::sStateHash,
std::chrono::seconds(1));
auto &context = app->helpers->contexts[partition_index];
context.pending_sync_task = update_partition_max_replica_count_on_remote(
app, new_partition_config, on_partition_updated);
},
server_state::sStateHash,
std::chrono::seconds(1));
}

LOG_INFO("request for updating partition-level max_replica_count on remote storage: "
Expand Down Expand Up @@ -3722,22 +3732,21 @@ void server_state::on_update_partition_max_replica_count_on_remote_reply(
auto &context = app->helpers->contexts[partition_index];
if (ec == ERR_TIMEOUT) {
// NOTICE: pending_sync_task need to be reassigned
context.pending_sync_task =
tasking::enqueue(LPC_META_STATE_HIGH,
tracker(),
[this, app, new_partition_config, on_partition_updated]() mutable {
const auto &gpid = new_partition_config.pid;
const auto partition_index = gpid.get_partition_index();
context.pending_sync_task = tasking::enqueue(
LPC_META_STATE_HIGH,
tracker(),
[this, app, new_partition_config, on_partition_updated]() mutable {
const auto &gpid = new_partition_config.pid;
const auto partition_index = gpid.get_partition_index();

zauto_write_lock l(_lock);
zauto_write_lock l(_lock);

auto &context = app->helpers->contexts[partition_index];
context.pending_sync_task =
update_partition_max_replica_count_on_remote(
app, new_partition_config, on_partition_updated);
},
server_state::sStateHash,
std::chrono::seconds(1));
auto &context = app->helpers->contexts[partition_index];
context.pending_sync_task = update_partition_max_replica_count_on_remote(
app, new_partition_config, on_partition_updated);
},
server_state::sStateHash,
std::chrono::seconds(1));
return;
}

Expand Down
Loading

0 comments on commit 0438b7e

Please sign in to comment.