Skip to content

Commit

Permalink
fix(config): bring back system params to config file (risingwavelabs#…
Browse files Browse the repository at this point in the history
  • Loading branch information
Gun9niR authored Mar 20, 2023
1 parent 3789023 commit f111bfb
Show file tree
Hide file tree
Showing 16 changed files with 216 additions and 92 deletions.
2 changes: 1 addition & 1 deletion ci/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ steps:
files: "*-junit.xml"
format: "junit"
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 15
timeout_in_minutes: 10
retry: *auto-retry

- label: "end-to-end test (release mode)"
Expand Down
105 changes: 105 additions & 0 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::collections::HashMap;
use std::fs;

use clap::ValueEnum;
use risingwave_pb::meta::SystemParams;
use serde::{Deserialize, Serialize};
use serde_json::Value;

Expand Down Expand Up @@ -112,6 +113,9 @@ pub struct RwConfig {
#[serde(default)]
pub storage: StorageConfig,

#[serde(default)]
pub system: SystemConfig,

#[serde(flatten)]
pub unrecognized: HashMap<String, Value>,
}
Expand Down Expand Up @@ -451,6 +455,67 @@ impl Default for DeveloperConfig {
}
}

/// The section `[system]` in `risingwave.toml`.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct SystemConfig {
/// The interval of periodic barrier.
#[serde(default = "default::system::barrier_interval_ms")]
pub barrier_interval_ms: u32,

/// There will be a checkpoint for every n barriers
#[serde(default = "default::system::checkpoint_frequency")]
pub checkpoint_frequency: u64,

/// Target size of the Sstable.
#[serde(default = "default::system::sstable_size_mb")]
pub sstable_size_mb: u32,

/// Size of each block in bytes in SST.
#[serde(default = "default::system::block_size_kb")]
pub block_size_kb: u32,

/// False positive probability of bloom filter.
#[serde(default = "default::system::bloom_false_positive")]
pub bloom_false_positive: f64,

#[serde(default = "default::system::state_store")]
pub state_store: String,

/// Remote directory for storing data and metadata objects.
#[serde(default = "default::system::data_directory")]
pub data_directory: String,

/// Remote storage url for storing snapshots.
#[serde(default = "default::system::backup_storage_url")]
pub backup_storage_url: String,

/// Remote directory for storing snapshots.
#[serde(default = "default::system::backup_storage_directory")]
pub backup_storage_directory: String,
}

impl Default for SystemConfig {
fn default() -> Self {
toml::from_str("").unwrap()
}
}

impl SystemConfig {
pub fn into_init_system_params(self) -> SystemParams {
SystemParams {
barrier_interval_ms: Some(self.barrier_interval_ms),
checkpoint_frequency: Some(self.checkpoint_frequency),
sstable_size_mb: Some(self.sstable_size_mb),
block_size_kb: Some(self.block_size_kb),
bloom_false_positive: Some(self.bloom_false_positive),
state_store: Some(self.state_store),
data_directory: Some(self.data_directory),
backup_storage_url: Some(self.backup_storage_url),
backup_storage_directory: Some(self.backup_storage_directory),
}
}
}

mod default {
pub mod meta {
use crate::config::MetaBackend;
Expand Down Expand Up @@ -662,4 +727,44 @@ mod default {
1024
}
}

pub mod system {
use crate::system_param;

pub fn barrier_interval_ms() -> u32 {
system_param::default::barrier_interval_ms()
}

pub fn checkpoint_frequency() -> u64 {
system_param::default::checkpoint_frequency()
}

pub fn sstable_size_mb() -> u32 {
system_param::default::sstable_size_mb()
}

pub fn block_size_kb() -> u32 {
system_param::default::block_size_kb()
}

pub fn bloom_false_positive() -> f64 {
system_param::default::bloom_false_positive()
}

pub fn state_store() -> String {
system_param::default::state_store()
}

pub fn data_directory() -> String {
system_param::default::data_directory()
}

pub fn backup_storage_url() -> String {
system_param::default::backup_storage_url()
}

pub fn backup_storage_directory() -> String {
system_param::default::backup_storage_directory()
}
}
}
2 changes: 1 addition & 1 deletion src/common/src/system_param/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ macro_rules! for_all_undeprecated_params {
{ sstable_size_mb, u32, 256_u32 },
{ block_size_kb, u32, 64_u32 },
{ bloom_false_positive, f64, 0.001_f64 },
{ state_store, String, "hummock+memory".to_string() },
{ state_store, String, "".to_string() },
{ data_directory, String, "hummock_001".to_string() },
{ backup_storage_url, String, "memory".to_string() },
{ backup_storage_directory, String, "backup".to_string() },
Expand Down
6 changes: 6 additions & 0 deletions src/config/ci-compaction-test-meta.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,9 @@ total_buffer_capacity_mb = 128
cache_file_fallocate_unit_mb = 512
cache_meta_fallocate_unit_mb = 16
cache_file_max_write_size_mb = 4

[system]
sstable_size_mb = 256
block_size_kb = 1024
bloom_false_positive = 0.001
data_directory = "hummock_001"
6 changes: 6 additions & 0 deletions src/config/ci-compaction-test.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,9 @@ total_buffer_capacity_mb = 128
cache_file_fallocate_unit_mb = 512
cache_meta_fallocate_unit_mb = 16
cache_file_max_write_size_mb = 4

[system]
sstable_size_mb = 256
block_size_kb = 1024
bloom_false_positive = 0.001
data_directory = "hummock_001"
4 changes: 3 additions & 1 deletion src/config/ci-iceberg-test.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ enable_committed_sst_sanity_check = true
max_heartbeat_interval_secs = 600

[streaming]
barrier_interval_ms = 5000
in_flight_barrier_nums = 10

[system]
barrier_interval_ms = 5000
checkpoint_frequency = 1
4 changes: 4 additions & 0 deletions src/config/ci-meta-backup-test.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,7 @@
min_sst_retention_time_sec = 0
collect_gc_watermark_spin_interval_sec = 1
vacuum_interval_sec = 10

[system]
backup_storage_url = "minio://hummockadmin:hummockadmin@127.0.0.1:9301/hummock001"
backup_storage_directory = "backup"
6 changes: 4 additions & 2 deletions src/config/ci-recovery.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ enable_committed_sst_sanity_check = true
max_heartbeat_interval_secs = 600

[streaming]
barrier_interval_ms = 250
in_flight_barrier_nums = 10
checkpoint_frequency = 5

[system]
barrier_interval_ms = 250
checkpoint_frequency = 5
6 changes: 4 additions & 2 deletions src/config/ci.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ enable_committed_sst_sanity_check = true
max_heartbeat_interval_secs = 600

[streaming]
barrier_interval_ms = 250
in_flight_barrier_nums = 10
checkpoint_frequency = 5

[system]
barrier_interval_ms = 250
checkpoint_frequency = 5
6 changes: 6 additions & 0 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,9 @@ stream_enable_executor_row_count = false
stream_connector_message_buffer_size = 16
unsafe_stream_extreme_cache_size = 1024
stream_chunk_size = 1024

[system]
sstable_size_mb = 256
block_size_kb = 1024
bloom_false_positive = 0.001
data_directory = "hummock_001"
90 changes: 41 additions & 49 deletions src/meta/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,8 @@ use std::time::Duration;

use clap::Parser;
pub use error::{MetaError, MetaResult};
use risingwave_common::system_param::default;
use risingwave_common::{GIT_SHA, RW_VERSION};
use risingwave_common_proc_macro::OverrideConfig;
use risingwave_pb::meta::SystemParams;

use crate::manager::MetaOpts;
use crate::rpc::server::{rpc_serve, AddressInfo, MetaStoreBackend};
Expand Down Expand Up @@ -107,42 +105,6 @@ pub struct MetaNodeOpts {
#[clap(long, env = "RW_PROMETHEUS_ENDPOINT")]
prometheus_endpoint: Option<String>,

/// State store url.
#[clap(long, env = "RW_STATE_STORE")]
state_store: Option<String>,

/// The interval of periodic barrier.
#[clap(long, env = "RW_BARRIER_INTERVAL_MS", default_value_t = default::barrier_interval_ms())]
barrier_interval_ms: u32,

/// There will be a checkpoint for every n barriers
#[clap(long, env = "RW_CHECKPOINT_FREQUENCY", default_value_t = default::checkpoint_frequency())]
pub checkpoint_frequency: u64,

/// Target size of the Sstable.
#[clap(long, env = "RW_SSTABLE_SIZE_MB", default_value_t = default::sstable_size_mb())]
sstable_size_mb: u32,

/// Size of each block in bytes in SST.
#[clap(long, env = "RW_BLOCK_SIZE_KB", default_value_t = default::block_size_kb())]
block_size_kb: u32,

/// False positive probability of bloom filter.
#[clap(long, env = "RW_BLOOM_FALSE_POSITIVE", default_value_t = default::bloom_false_positive())]
bloom_false_positive: f64,

/// Remote directory for storing data and metadata objects.
#[clap(long, env = "RW_DATA_DIRECTORY", default_value_t = default::data_directory())]
data_directory: String,

/// Remote storage url for storing snapshots.
#[clap(long, env = "RW_BACKUP_STORAGE_URL", default_value_t = default::backup_storage_url())]
backup_storage_url: String,

/// Remote directory for storing snapshots.
#[clap(long, env = "RW_STORAGE_DIRECTORY", default_value_t = default::backup_storage_directory())]
backup_storage_directory: String,

/// Endpoint of the connector node, there will be a sidecar connector node
/// colocated with Meta node in the cloud environment
#[clap(long, env = "RW_CONNECTOR_RPC_ENDPOINT")]
Expand All @@ -164,6 +126,46 @@ pub struct OverrideConfigOpts {
#[clap(long, env = "RW_BACKEND", value_enum)]
#[override_opts(path = meta.backend)]
backend: Option<MetaBackend>,

/// The interval of periodic barrier.
#[clap(long, env = "RW_BARRIER_INTERVAL_MS")]
#[override_opts(path = system.barrier_interval_ms)]
barrier_interval_ms: Option<u32>,

/// Target size of the Sstable.
#[clap(long, env = "RW_SSTABLE_SIZE_MB")]
#[override_opts(path = system.sstable_size_mb)]
sstable_size_mb: Option<u32>,

/// Size of each block in bytes in SST.
#[clap(long, env = "RW_BLOCK_SIZE_KB")]
#[override_opts(path = system.block_size_kb)]
block_size_kb: Option<u32>,

/// False positive probability of bloom filter.
#[clap(long, env = "RW_BLOOM_FALSE_POSITIVE")]
#[override_opts(path = system.bloom_false_positive)]
bloom_false_positive: Option<f64>,

/// State store url
#[clap(long, env = "RW_STATE_STORE")]
#[override_opts(path = system.state_store)]
state_store: Option<String>,

/// Remote directory for storing data and metadata objects.
#[clap(long, env = "RW_DATA_DIRECTORY")]
#[override_opts(path = system.data_directory)]
data_directory: Option<String>,

/// Remote storage url for storing snapshots.
#[clap(long, env = "RW_BACKUP_STORAGE_URL")]
#[override_opts(path = system.backup_storage_url)]
backup_storage_url: Option<String>,

/// Remote directory for storing snapshots.
#[clap(long, env = "RW_BACKUP_STORAGE_DIRECTORY")]
#[override_opts(path = system.backup_storage_directory)]
backup_storage_directory: Option<String>,
}

use std::future::Future;
Expand Down Expand Up @@ -244,17 +246,7 @@ pub fn start(opts: MetaNodeOpts) -> Pin<Box<dyn Future<Output = ()> + Send>> {
.meta
.periodic_ttl_reclaim_compaction_interval_sec,
},
SystemParams {
barrier_interval_ms: Some(opts.barrier_interval_ms),
checkpoint_frequency: Some(opts.checkpoint_frequency),
sstable_size_mb: Some(opts.sstable_size_mb),
block_size_kb: Some(opts.block_size_kb),
bloom_false_positive: Some(opts.bloom_false_positive),
state_store: Some(opts.state_store.unwrap_or_default()),
data_directory: Some(opts.data_directory),
backup_storage_url: Some(opts.backup_storage_url),
backup_storage_directory: Some(opts.backup_storage_directory),
},
config.system.into_init_system_params(),
)
.await
.unwrap();
Expand Down
3 changes: 0 additions & 3 deletions src/storage/backup/integration_tests/common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@ function clean_etcd_data() {

function start_cluster() {
cargo make d ci-meta-backup-test 1>/dev/null 2>&1
execute_sql_and_expect \
"alter system set backup_storage_url to \"minio://hummockadmin:hummockadmin@127.0.0.1:9301/hummock001\";" \
"ALTER_SYSTEM"
sleep 5
}

Expand Down
9 changes: 6 additions & 3 deletions src/tests/compaction_test/src/compaction_test_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,13 +138,16 @@ pub async fn start_meta_node(listen_addr: String, config_path: String) {
"--config-path",
&config_path,
]);
// We set a large checkpoint frequency to prevent the embedded meta node
// to commit new epochs to avoid bumping the hummock version during version log replay.
assert_eq!(CHECKPOINT_FREQ_FOR_REPLAY, meta_opts.checkpoint_frequency);
let config = load_config(
&meta_opts.config_path,
Some(meta_opts.override_opts.clone()),
);
// We set a large checkpoint frequency to prevent the embedded meta node
// to commit new epochs to avoid bumping the hummock version during version log replay.
assert_eq!(
CHECKPOINT_FREQ_FOR_REPLAY,
config.system.checkpoint_frequency
);
assert!(
config.meta.enable_compaction_deterministic,
"enable_compaction_deterministic should be set"
Expand Down
Loading

0 comments on commit f111bfb

Please sign in to comment.