diff --git a/src/common/src/config.rs b/src/common/src/config.rs index 144b43cab120..cb6bf1a0e637 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -755,6 +755,10 @@ pub struct StorageConfig { #[serde(default = "default::storage::compactor_iter_max_io_retry_times")] pub compactor_iter_max_io_retry_times: usize, + /// The window size of table info statistic history. + #[serde(default = "default::storage::table_info_statistic_history_times")] + pub table_info_statistic_history_times: usize, + #[serde(default, flatten)] #[config_doc(omitted)] pub unrecognized: Unrecognized, @@ -1574,6 +1578,10 @@ pub mod default { pub fn compactor_concurrent_uploading_sst_count() -> Option { None } + + pub fn table_info_statistic_history_times() -> usize { + 240 + } } pub mod streaming { diff --git a/src/config/docs.md b/src/config/docs.md index 59c8961a15be..a52ce9202a3b 100644 --- a/src/config/docs.md +++ b/src/config/docs.md @@ -138,6 +138,7 @@ This page is automatically generated by `./risedev generate-example-config` | shared_buffer_flush_ratio | The shared buffer will start flushing data to object when the ratio of memory usage to the shared buffer capacity exceed such ratio. | 0.800000011920929 | | shared_buffer_min_batch_flush_size_mb | The minimum total flush size of shared buffer spill. When a shared buffer spilled is trigger, the total flush size across multiple epochs should be at least higher than this size. | 800 | | sstable_id_remote_fetch_number | Number of SST ids fetched from meta per RPC | 10 | +| table_info_statistic_history_times | The window size of table info statistic history. | 240 | | write_conflict_detection_enabled | Whether to enable write conflict detection | true | ## streaming diff --git a/src/config/example.toml b/src/config/example.toml index 0e33ba465c9a..a708fed3b84b 100644 --- a/src/config/example.toml +++ b/src/config/example.toml @@ -150,6 +150,7 @@ max_preload_io_retry_times = 3 compactor_fast_max_compact_delete_ratio = 40 compactor_fast_max_compact_task_size = 2147483648 compactor_iter_max_io_retry_times = 8 +table_info_statistic_history_times = 240 mem_table_spill_threshold = 4194304 [storage.cache.block_cache_eviction] diff --git a/src/meta/node/src/lib.rs b/src/meta/node/src/lib.rs index 0e49f0805bf1..b6dbdf250010 100644 --- a/src/meta/node/src/lib.rs +++ b/src/meta/node/src/lib.rs @@ -380,6 +380,9 @@ pub fn start(opts: MetaNodeOpts) -> Pin + Send>> { .max_trivial_move_task_count_per_loop, max_get_task_probe_times: config.meta.developer.max_get_task_probe_times, secret_store_private_key: config.meta.secret_store_private_key, + table_info_statistic_history_times: config + .storage + .table_info_statistic_history_times, }, config.system.into_init_system_params(), Default::default(), diff --git a/src/meta/src/hummock/manager/commit_epoch.rs b/src/meta/src/hummock/manager/commit_epoch.rs index 1437a6eb3bfd..9a494bc509b4 100644 --- a/src/meta/src/hummock/manager/commit_epoch.rs +++ b/src/meta/src/hummock/manager/commit_epoch.rs @@ -34,7 +34,6 @@ use crate::hummock::manager::transaction::{ HummockVersionStatsTransaction, HummockVersionTransaction, }; use crate::hummock::manager::versioning::Versioning; -use crate::hummock::manager::HISTORY_TABLE_INFO_STATISTIC_TIME; use crate::hummock::metrics_utils::{ get_or_create_local_table_stat, trigger_local_table_stat, trigger_sst_stat, }; @@ -482,7 +481,7 @@ impl HummockManager { let throughput = (stat.total_value_size + stat.total_key_size) as u64; let entry = table_infos.entry(table_id).or_default(); entry.push_back(throughput); - if entry.len() > HISTORY_TABLE_INFO_STATISTIC_TIME { + if entry.len() > self.env.opts.table_info_statistic_history_times { entry.pop_front(); } } diff --git a/src/meta/src/hummock/manager/compaction.rs b/src/meta/src/hummock/manager/compaction.rs index e406292edef2..f728523f5803 100644 --- a/src/meta/src/hummock/manager/compaction.rs +++ b/src/meta/src/hummock/manager/compaction.rs @@ -27,7 +27,7 @@ // limitations under the License. use std::cmp::min; -use std::collections::{BTreeMap, HashMap, HashSet}; +use std::collections::{BTreeMap, HashMap, HashSet, VecDeque}; use std::sync::{Arc, LazyLock}; use std::time::{Instant, SystemTime}; @@ -42,6 +42,7 @@ use rand::seq::SliceRandom; use rand::thread_rng; use risingwave_common::util::epoch::Epoch; use risingwave_hummock_sdk::compaction_group::hummock_version_ext::HummockLevelsExt; +use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; use risingwave_hummock_sdk::table_stats::{ add_prost_table_stats_map, purge_prost_table_stats, PbTableStatsMap, }; @@ -1557,6 +1558,80 @@ impl HummockManager { .retain(|table_id, _| compact_task.existing_table_ids.contains(table_id)); } } + + pub async fn try_move_table_to_dedicated_cg( + &self, + table_write_throughput: &HashMap>, + table_id: &u32, + table_size: &u64, + is_creating_table: bool, + checkpoint_secs: u64, + parent_group_id: u64, + group_size: u64, + ) { + let default_group_id: CompactionGroupId = StaticCompactionGroupId::StateDefault.into(); + let mv_group_id: CompactionGroupId = StaticCompactionGroupId::MaterializedView.into(); + let partition_vnode_count = self.env.opts.partition_vnode_count; + let window_size = + self.env.opts.table_info_statistic_history_times / (checkpoint_secs as usize); + + let mut is_high_write_throughput = false; + let mut is_low_write_throughput = true; + if let Some(history) = table_write_throughput.get(table_id) { + if history.len() >= window_size { + is_high_write_throughput = history.iter().all(|throughput| { + *throughput / checkpoint_secs > self.env.opts.table_write_throughput_threshold + }); + is_low_write_throughput = history.iter().any(|throughput| { + *throughput / checkpoint_secs < self.env.opts.min_table_split_write_throughput + }); + } + } + + let state_table_size = *table_size; + + // 1. Avoid splitting a creating table + // 2. Avoid splitting a is_low_write_throughput creating table + // 3. Avoid splitting a non-high throughput medium-sized table + if is_creating_table + || (is_low_write_throughput) + || (state_table_size < self.env.opts.min_table_split_size && !is_high_write_throughput) + { + return; + } + + // do not split a large table and a small table because it would increase IOPS + // of small table. + if parent_group_id != default_group_id && parent_group_id != mv_group_id { + let rest_group_size = group_size - state_table_size; + if rest_group_size < state_table_size + && rest_group_size < self.env.opts.min_table_split_size + { + return; + } + } + + let ret = self + .move_state_table_to_compaction_group( + parent_group_id, + &[*table_id], + partition_vnode_count, + ) + .await; + match ret { + Ok(new_group_id) => { + tracing::info!("move state table [{}] from group-{} to group-{} success table_vnode_partition_count {:?}", table_id, parent_group_id, new_group_id, partition_vnode_count); + } + Err(e) => { + tracing::info!( + error = %e.as_report(), + "failed to move state table [{}] from group-{}", + table_id, + parent_group_id, + ) + } + } + } } #[cfg(any(test, feature = "test"))] diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index 8a49d91a55fc..e3511d1e4b20 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -72,7 +72,6 @@ pub use compaction::{check_cg_write_limit, WriteLimitType}; pub(crate) use utils::*; type Snapshot = ArcSwap; -const HISTORY_TABLE_INFO_STATISTIC_TIME: usize = 240; // Update to states are performed as follow: // - Initialize ValTransaction for the meta state to update diff --git a/src/meta/src/hummock/manager/timer_task.rs b/src/meta/src/hummock/manager/timer_task.rs index b7c8cae4b260..bb4a9fa86b06 100644 --- a/src/meta/src/hummock/manager/timer_task.rs +++ b/src/meta/src/hummock/manager/timer_task.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{HashMap, HashSet, VecDeque}; +use std::collections::{HashMap, HashSet}; use std::sync::Arc; use std::time::Duration; @@ -22,8 +22,6 @@ use futures::{FutureExt, StreamExt}; use itertools::Itertools; use risingwave_common::system_param::reader::SystemParamsRead; use risingwave_hummock_sdk::compaction_group::hummock_version_ext::get_compaction_group_ids; -use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; -use risingwave_hummock_sdk::CompactionGroupId; use risingwave_pb::hummock::compact_task::{self, TaskStatus}; use risingwave_pb::hummock::level_handler::RunningCompactTask; use rw_futures_util::select_all; @@ -33,7 +31,6 @@ use tokio::task::JoinHandle; use tokio_stream::wrappers::IntervalStream; use tracing::warn; -use crate::hummock::manager::HISTORY_TABLE_INFO_STATISTIC_TIME; use crate::hummock::metrics_utils::{trigger_lsm_stat, trigger_mv_stat}; use crate::hummock::{HummockManager, TASK_NORMAL}; @@ -468,7 +465,7 @@ impl HummockManager { } for (table_id, table_size) in &group.table_statistic { - self.calculate_table_align_rule( + self.try_move_table_to_dedicated_cg( &table_write_throughput, table_id, table_size, @@ -494,93 +491,4 @@ impl HummockManager { } } } - - async fn calculate_table_align_rule( - &self, - table_write_throughput: &HashMap>, - table_id: &u32, - table_size: &u64, - is_creating_table: bool, - checkpoint_secs: u64, - parent_group_id: u64, - group_size: u64, - ) { - let default_group_id: CompactionGroupId = StaticCompactionGroupId::StateDefault.into(); - let mv_group_id: CompactionGroupId = StaticCompactionGroupId::MaterializedView.into(); - let partition_vnode_count = self.env.opts.partition_vnode_count; - let window_size = HISTORY_TABLE_INFO_STATISTIC_TIME / (checkpoint_secs as usize); - - let mut is_high_write_throughput = false; - let mut is_low_write_throughput = true; - if let Some(history) = table_write_throughput.get(table_id) { - if !is_creating_table { - if history.len() >= window_size { - is_high_write_throughput = history.iter().all(|throughput| { - *throughput / checkpoint_secs - > self.env.opts.table_write_throughput_threshold - }); - is_low_write_throughput = history.iter().any(|throughput| { - *throughput / checkpoint_secs - < self.env.opts.min_table_split_write_throughput - }); - } - } else { - // For creating table, relax the checking restrictions to make the data alignment behavior more sensitive. - let sum = history.iter().sum::(); - is_low_write_throughput = sum - < self.env.opts.min_table_split_write_throughput - * history.len() as u64 - * checkpoint_secs; - } - } - - let state_table_size = *table_size; - - // 1. Avoid splitting a creating table - // 2. Avoid splitting a is_low_write_throughput creating table - // 3. Avoid splitting a non-high throughput medium-sized table - if is_creating_table - || (is_low_write_throughput) - || (state_table_size < self.env.opts.min_table_split_size && !is_high_write_throughput) - { - return; - } - - // do not split a large table and a small table because it would increase IOPS - // of small table. - if parent_group_id != default_group_id && parent_group_id != mv_group_id { - let rest_group_size = group_size - state_table_size; - if rest_group_size < state_table_size - && rest_group_size < self.env.opts.min_table_split_size - { - return; - } - } - - let ret = self - .move_state_table_to_compaction_group( - parent_group_id, - &[*table_id], - partition_vnode_count, - ) - .await; - match ret { - Ok(new_group_id) => { - tracing::info!( - "move state table [{}] from group-{} to group-{} success", - table_id, - parent_group_id, - new_group_id - ); - } - Err(e) => { - tracing::info!( - error = %e.as_report(), - "failed to move state table [{}] from group-{}", - table_id, - parent_group_id, - ) - } - } - } } diff --git a/src/meta/src/manager/env.rs b/src/meta/src/manager/env.rs index 7a284ca2fccd..e10d64a65632 100644 --- a/src/meta/src/manager/env.rs +++ b/src/meta/src/manager/env.rs @@ -284,6 +284,8 @@ pub struct MetaOpts { // The private key for the secret store, used when the secret is stored in the meta. pub secret_store_private_key: Vec, + + pub table_info_statistic_history_times: usize, } impl MetaOpts { @@ -345,6 +347,7 @@ impl MetaOpts { max_trivial_move_task_count_per_loop: 256, max_get_task_probe_times: 5, secret_store_private_key: "demo-secret-private-key".as_bytes().to_vec(), + table_info_statistic_history_times: 240, } } }