Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature:Online Query and Dynamic Modification of Table-level RocksDB Options #1511

Merged
merged 11 commits into from
Aug 11, 2023
Merged
11 changes: 11 additions & 0 deletions src/base/pegasus_const.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,4 +101,15 @@ const std::string USER_SPECIFIED_COMPACTION("user_specified_compaction");
const std::string READ_SIZE_THROTTLING("replica.read_throttling_by_size");

const std::string ROCKSDB_ALLOW_INGEST_BEHIND("rocksdb.allow_ingest_behind");

const std::string ROCKSDB_WRITE_BUFFER_SIZE("rocksdb.write_buffer_size");

const std::string ROCKSDB_NUM_LEVELS("rocksdb.num_levels");

const std::set<std::string> ROCKSDB_DYNAMIC_OPTIONS = {
ROCKSDB_WRITE_BUFFER_SIZE,
};
const std::set<std::string> ROCKSDB_STATIC_OPTIONS = {
ROCKSDB_NUM_LEVELS,
};
} // namespace pegasus
9 changes: 9 additions & 0 deletions src/base/pegasus_const.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#pragma once

#include <set>
#include <string>

namespace pegasus {
Expand Down Expand Up @@ -72,4 +73,12 @@ extern const std::string USER_SPECIFIED_COMPACTION;
extern const std::string READ_SIZE_THROTTLING;

extern const std::string ROCKSDB_ALLOW_INGEST_BEHIND;

extern const std::string ROCKSDB_WRITE_BUFFER_SIZE;

extern const std::string ROCKSDB_NUM_LEVELS;

extern const std::set<std::string> ROCKSDB_DYNAMIC_OPTIONS;

extern const std::set<std::string> ROCKSDB_STATIC_OPTIONS;
} // namespace pegasus
6 changes: 6 additions & 0 deletions src/common/replica_envs.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

#include <cstdint>
#include <string>
#include <set>

namespace dsn {
namespace replication {
Expand Down Expand Up @@ -64,6 +65,11 @@ class replica_envs
static const std::string USER_SPECIFIED_COMPACTION;
static const std::string ROCKSDB_ALLOW_INGEST_BEHIND;
static const std::string UPDATE_MAX_REPLICA_COUNT;
static const std::string ROCKSDB_WRITE_BUFFER_SIZE;
static const std::string ROCKSDB_NUM_LEVELS;

static const std::set<std::string> ROCKSDB_DYNAMIC_OPTIONS;
static const std::set<std::string> ROCKSDB_STATIC_OPTIONS;
};

} // namespace replication
Expand Down
9 changes: 9 additions & 0 deletions src/common/replication_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <algorithm>
#include <fstream>
#include <memory>
#include <set>

#include "common/gpid.h"
#include "common/replica_envs.h"
Expand Down Expand Up @@ -394,6 +395,14 @@ const std::string replica_envs::USER_SPECIFIED_COMPACTION("user_specified_compac
const std::string replica_envs::BACKUP_REQUEST_QPS_THROTTLING("replica.backup_request_throttling");
const std::string replica_envs::ROCKSDB_ALLOW_INGEST_BEHIND("rocksdb.allow_ingest_behind");
const std::string replica_envs::UPDATE_MAX_REPLICA_COUNT("max_replica_count.update");
const std::string replica_envs::ROCKSDB_WRITE_BUFFER_SIZE("rocksdb.write_buffer_size");
const std::string replica_envs::ROCKSDB_NUM_LEVELS("rocksdb.num_levels");

const std::set<std::string> replica_envs::ROCKSDB_DYNAMIC_OPTIONS = {
replica_envs::ROCKSDB_WRITE_BUFFER_SIZE,
};
const std::set<std::string> replica_envs::ROCKSDB_STATIC_OPTIONS = {
replica_envs::ROCKSDB_NUM_LEVELS,
};
} // namespace replication
} // namespace dsn
58 changes: 57 additions & 1 deletion src/meta/app_env_validator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <fmt/core.h>
#include <stdint.h>
#include <memory>
#include <set>
#include <utility>
#include <vector>

Expand All @@ -31,6 +32,26 @@

namespace dsn {
namespace replication {
bool validate_app_envs(const std::map<std::string, std::string> &envs)
{
// only check rocksdb app envs currently

for (const auto &it : envs) {
if (replica_envs::ROCKSDB_STATIC_OPTIONS.find(it.first) ==
replica_envs::ROCKSDB_STATIC_OPTIONS.end() &&
replica_envs::ROCKSDB_DYNAMIC_OPTIONS.find(it.first) ==
replica_envs::ROCKSDB_DYNAMIC_OPTIONS.end()) {
continue;
}
std::string hint_message;
if (!validate_app_env(it.first, it.second, hint_message)) {
ruojieranyishen marked this conversation as resolved.
Show resolved Hide resolved
LOG_WARNING(
"app env {}={} is invaild, hint_message:{}", it.first, it.second, hint_message);
return false;
}
}
return true;
}

bool validate_app_env(const std::string &env_name,
const std::string &env_value,
Expand Down Expand Up @@ -151,6 +172,36 @@ bool check_bool_value(const std::string &env_value, std::string &hint_message)
return true;
}

bool check_rocksdb_write_buffer_size(const std::string &env_value, std::string &hint_message)
{
uint64_t val = 0;

if (!dsn::buf2uint64(env_value, val)) {
hint_message = fmt::format("rocksdb.write_buffer_size cannot set this val: {}", env_value);
return false;
}
if (val < (32 << 20) || val > (512 << 20)) {
ruojieranyishen marked this conversation as resolved.
Show resolved Hide resolved
hint_message =
fmt::format("rocksdb.write_buffer_size suggest set val in range [33554432, 536870912]");
return false;
}
return true;
}
bool check_rocksdb_num_levels(const std::string &env_value, std::string &hint_message)
{
int32_t val = 0;

if (!dsn::buf2int32(env_value, val)) {
hint_message = fmt::format("rocksdb.num_levels cannot set this val: {}", env_value);
return false;
}
if (val < 1 || val > 10) {
hint_message = fmt::format("rocksdb.num_levels suggest set val in range [1 , 10]");
return false;
}
return true;
}

bool app_env_validator::validate_app_env(const std::string &env_name,
const std::string &env_value,
std::string &hint_message)
Expand Down Expand Up @@ -198,6 +249,10 @@ void app_env_validator::register_all_validators()
std::bind(&check_bool_value, std::placeholders::_1, std::placeholders::_2)},
{replica_envs::DENY_CLIENT_REQUEST,
std::bind(&check_deny_client, std::placeholders::_1, std::placeholders::_2)},
{replica_envs::ROCKSDB_WRITE_BUFFER_SIZE,
std::bind(&check_rocksdb_write_buffer_size, std::placeholders::_1, std::placeholders::_2)},
{replica_envs::ROCKSDB_NUM_LEVELS,
std::bind(&check_rocksdb_num_levels, std::placeholders::_1, std::placeholders::_2)},
// TODO(zhaoliwei): not implemented
{replica_envs::BUSINESS_INFO, nullptr},
{replica_envs::TABLE_LEVEL_DEFAULT_TTL, nullptr},
Expand All @@ -213,7 +268,8 @@ void app_env_validator::register_all_validators()
{replica_envs::MANUAL_COMPACT_PERIODIC_TARGET_LEVEL, nullptr},
{replica_envs::MANUAL_COMPACT_PERIODIC_BOTTOMMOST_LEVEL_COMPACTION, nullptr},
{replica_envs::REPLICA_ACCESS_CONTROLLER_ALLOWED_USERS, nullptr},
{replica_envs::REPLICA_ACCESS_CONTROLLER_RANGER_POLICIES, nullptr}};
{replica_envs::REPLICA_ACCESS_CONTROLLER_RANGER_POLICIES, nullptr},
};
}

} // namespace replication
Expand Down
2 changes: 2 additions & 0 deletions src/meta/app_env_validator.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
namespace dsn {
namespace replication {

bool validate_app_envs(const std::map<std::string, std::string> &envs);

bool validate_app_env(const std::string &env_name,
const std::string &env_value,
std::string &hint_message);
Expand Down
3 changes: 3 additions & 0 deletions src/meta/server_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1151,6 +1151,9 @@ void server_state::create_app(dsn::message_ex *msg)
!validate_target_max_replica_count(request.options.replica_count)) {
response.err = ERR_INVALID_PARAMETERS;
will_create_app = false;
} else if (!validate_app_envs(request.options.envs)) {
ruojieranyishen marked this conversation as resolved.
Show resolved Hide resolved
response.err = ERR_INVALID_PARAMETERS;
will_create_app = false;
} else {
zauto_write_lock l(_lock);
app = get_app(request.app_name);
Expand Down
11 changes: 11 additions & 0 deletions src/meta/test/meta_app_envs_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,17 @@ TEST_F(meta_app_envs_test, update_app_envs_test)
{replica_envs::MANUAL_COMPACT_ONCE_TARGET_LEVEL, "80", ERR_OK, "", "80"},
{replica_envs::MANUAL_COMPACT_PERIODIC_TRIGGER_TIME, "90", ERR_OK, "", "90"},
{replica_envs::MANUAL_COMPACT_PERIODIC_TARGET_LEVEL, "100", ERR_OK, "", "100"},
{replica_envs::ROCKSDB_WRITE_BUFFER_SIZE,
"100",
ERR_INVALID_PARAMETERS,
"rocksdb.write_buffer_size suggest set val in range [33554432, 536870912]",
"67108864"},
{replica_envs::ROCKSDB_WRITE_BUFFER_SIZE,
"636870912",
ERR_INVALID_PARAMETERS,
"rocksdb.write_buffer_size suggest set val in range [33554432, 536870912]",
"536870912"},
{replica_envs::ROCKSDB_WRITE_BUFFER_SIZE, "67108864", ERR_OK, "", "67108864"},
{replica_envs::MANUAL_COMPACT_PERIODIC_BOTTOMMOST_LEVEL_COMPACTION,
"200",
ERR_OK,
Expand Down
94 changes: 93 additions & 1 deletion src/server/pegasus_server_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#include <list>
#include <mutex>
#include <ostream>
#include <set>

#include "base/pegasus_key_schema.h"
#include "base/pegasus_utils.h"
Expand Down Expand Up @@ -1663,7 +1664,7 @@ dsn::error_code pegasus_server_impl::start(int argc, char **argv)
// We don't use `loaded_data_cf_opts` directly because pointer-typed options will
// only be initialized with default values when calling 'LoadLatestOptions', see
// 'rocksdb/utilities/options_util.h'.
reset_usage_scenario_options(loaded_data_cf_opts, &_table_data_cf_opts);
reset_rocksdb_options(loaded_data_cf_opts, &_table_data_cf_opts);
_db_opts.allow_ingest_behind = parse_allow_ingest_behind(envs);
}
} else {
Expand Down Expand Up @@ -2625,6 +2626,79 @@ pegasus_server_impl::get_restore_dir_from_env(const std::map<std::string, std::s
return res;
}

void pegasus_server_impl::update_rocksdb_dynamic_options(
const std::map<std::string, std::string> &envs)
{
if (envs.size() == 0) {
return;
}
ruojieranyishen marked this conversation as resolved.
Show resolved Hide resolved
auto extract_option = [](const std::string &option) -> std::string {
std::stringstream ss(option);
std::string prefix, rocksdb_opt;
std::getline(ss, prefix, '.');
std::getline(ss, rocksdb_opt);
LOG_INFO("Extract rocksdb dynamic opt ({}) from ({})", rocksdb_opt, option);
return rocksdb_opt;
};

std::unordered_map<std::string, std::string> new_options;
for (const auto &option : ROCKSDB_DYNAMIC_OPTIONS) {
auto find = envs.find(option);
if (find == envs.end()) {
continue;
}
new_options[extract_option(option)] = find->second;
ruojieranyishen marked this conversation as resolved.
Show resolved Hide resolved
}

// doing set option
if (new_options.empty() && set_options(new_options)) {
ruojieranyishen marked this conversation as resolved.
Show resolved Hide resolved
LOG_INFO("Set rocksdb dynamic options success");
}
}

void pegasus_server_impl::set_rocksdb_options_before_creating(
const std::map<std::string, std::string> &envs)
{
if (envs.size() == 0) {
return;
}
ruojieranyishen marked this conversation as resolved.
Show resolved Hide resolved

for (const auto &option : pegasus::ROCKSDB_STATIC_OPTIONS) {
auto find = envs.find(option);
ruojieranyishen marked this conversation as resolved.
Show resolved Hide resolved
if (find == envs.end()) {
continue;
}
bool is_set = false;
if (option.compare(ROCKSDB_NUM_LEVELS) == 0) {
ruojieranyishen marked this conversation as resolved.
Show resolved Hide resolved
int32_t val = 0;
if (!dsn::buf2int32(find->second, val))
continue;
is_set = true;
_data_cf_opts.num_levels = val;
}

if (is_set)
LOG_INFO("Set {} \"{}\" succeed", find->first, find->second);
}

for (const auto &option : pegasus::ROCKSDB_DYNAMIC_OPTIONS) {
auto find = envs.find(option);
ruojieranyishen marked this conversation as resolved.
Show resolved Hide resolved
if (find == envs.end()) {
continue;
}
bool is_set = false;
if (option.compare(ROCKSDB_WRITE_BUFFER_SIZE) == 0) {
uint64_t val = 0;
if (!dsn::buf2uint64(find->second, val))
continue;
is_set = true;
_data_cf_opts.write_buffer_size = static_cast<size_t>(val);
}
if (is_set)
ruojieranyishen marked this conversation as resolved.
Show resolved Hide resolved
LOG_INFO("Set {} \"{}\" succeed", find->first, find->second);
}
}

void pegasus_server_impl::update_app_envs(const std::map<std::string, std::string> &envs)
{
update_usage_scenario(envs);
Expand All @@ -2637,6 +2711,7 @@ void pegasus_server_impl::update_app_envs(const std::map<std::string, std::strin
_manual_compact_svc.start_manual_compact_if_needed(envs);

update_throttling_controller(envs);
update_rocksdb_dynamic_options(envs);
}

void pegasus_server_impl::update_app_envs_before_open_db(
Expand All @@ -2650,6 +2725,7 @@ void pegasus_server_impl::update_app_envs_before_open_db(
update_validate_partition_hash(envs);
update_user_specified_compaction(envs);
_manual_compact_svc.start_manual_compact_if_needed(envs);
set_rocksdb_options_before_creating(envs);
}

void pegasus_server_impl::query_app_envs(/*out*/ std::map<std::string, std::string> &envs)
Expand Down Expand Up @@ -3038,6 +3114,22 @@ bool pegasus_server_impl::set_usage_scenario(const std::string &usage_scenario)
}
}

void pegasus_server_impl::reset_rocksdb_options(const rocksdb::ColumnFamilyOptions &base_opts,
rocksdb::ColumnFamilyOptions *target_opts)
{
// Reset rocksdb option includes two aspects:
// 1. Set usage_scenario related rocksdb options
// 2. Rocksdb option set in app envs, consists of ROCKSDB_DYNAMIC_OPTIONS and
ruojieranyishen marked this conversation as resolved.
Show resolved Hide resolved
// ROCKSDB_STATIC_OPTIONS

// aspect 1:
reset_usage_scenario_options(base_opts, target_opts);

// aspect 2:
target_opts->num_levels = base_opts.num_levels;
target_opts->write_buffer_size = base_opts.write_buffer_size;
}

void pegasus_server_impl::reset_usage_scenario_options(
const rocksdb::ColumnFamilyOptions &base_opts, rocksdb::ColumnFamilyOptions *target_opts)
{
Expand Down
9 changes: 9 additions & 0 deletions src/server/pegasus_server_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <gtest/gtest_prod.h>
#include <rocksdb/options.h>
#include <rocksdb/slice.h>
#include <rocksdb/table.h>
#include <rrdb/rrdb_types.h>
#include <stdint.h>
#include <atomic>
Expand Down Expand Up @@ -328,6 +329,10 @@ class pegasus_server_impl : public pegasus_read_service

void update_user_specified_compaction(const std::map<std::string, std::string> &envs);

void update_rocksdb_dynamic_options(const std::map<std::string, std::string> &envs);

void set_rocksdb_options_before_creating(const std::map<std::string, std::string> &envs);

void update_throttling_controller(const std::map<std::string, std::string> &envs);

bool parse_allow_ingest_behind(const std::map<std::string, std::string> &envs);
Expand Down Expand Up @@ -359,6 +364,9 @@ class pegasus_server_impl : public pegasus_read_service
void reset_usage_scenario_options(const rocksdb::ColumnFamilyOptions &base_opts,
rocksdb::ColumnFamilyOptions *target_opts);

void reset_rocksdb_options(const rocksdb::ColumnFamilyOptions &base_opts,
rocksdb::ColumnFamilyOptions *target_opts);

// return true if successfully set
bool set_options(const std::unordered_map<std::string, std::string> &new_options);

Expand Down Expand Up @@ -468,6 +476,7 @@ class pegasus_server_impl : public pegasus_read_service
// Dynamically calculate the value of current data_cf option according to the conf module file
// and usage scenario
rocksdb::ColumnFamilyOptions _table_data_cf_opts;
rocksdb::BlockBasedTableOptions _tbl_opts;
rocksdb::ColumnFamilyOptions _meta_cf_opts;
rocksdb::ReadOptions _data_cf_rd_opts;
std::string _usage_scenario;
Expand Down
Loading