Skip to content

Commit

Permalink
feat(storage): remove unused code calculate_table_align_rule (#17199)
Browse files Browse the repository at this point in the history
  • Loading branch information
Li0k authored Jun 18, 2024
1 parent b786edc commit a696216
Show file tree
Hide file tree
Showing 9 changed files with 95 additions and 98 deletions.
8 changes: 8 additions & 0 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -755,6 +755,10 @@ pub struct StorageConfig {
#[serde(default = "default::storage::compactor_iter_max_io_retry_times")]
pub compactor_iter_max_io_retry_times: usize,

/// The window size of table info statistic history.
#[serde(default = "default::storage::table_info_statistic_history_times")]
pub table_info_statistic_history_times: usize,

#[serde(default, flatten)]
#[config_doc(omitted)]
pub unrecognized: Unrecognized<Self>,
Expand Down Expand Up @@ -1574,6 +1578,10 @@ pub mod default {
pub fn compactor_concurrent_uploading_sst_count() -> Option<usize> {
None
}

pub fn table_info_statistic_history_times() -> usize {
240
}
}

pub mod streaming {
Expand Down
1 change: 1 addition & 0 deletions src/config/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ This page is automatically generated by `./risedev generate-example-config`
| shared_buffer_flush_ratio | The shared buffer will start flushing data to object when the ratio of memory usage to the shared buffer capacity exceed such ratio. | 0.800000011920929 |
| shared_buffer_min_batch_flush_size_mb | The minimum total flush size of shared buffer spill. When a shared buffer spilled is trigger, the total flush size across multiple epochs should be at least higher than this size. | 800 |
| sstable_id_remote_fetch_number | Number of SST ids fetched from meta per RPC | 10 |
| table_info_statistic_history_times | The window size of table info statistic history. | 240 |
| write_conflict_detection_enabled | Whether to enable write conflict detection | true |

## streaming
Expand Down
1 change: 1 addition & 0 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ max_preload_io_retry_times = 3
compactor_fast_max_compact_delete_ratio = 40
compactor_fast_max_compact_task_size = 2147483648
compactor_iter_max_io_retry_times = 8
table_info_statistic_history_times = 240
mem_table_spill_threshold = 4194304

[storage.cache.block_cache_eviction]
Expand Down
3 changes: 3 additions & 0 deletions src/meta/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,9 @@ pub fn start(opts: MetaNodeOpts) -> Pin<Box<dyn Future<Output = ()> + Send>> {
.max_trivial_move_task_count_per_loop,
max_get_task_probe_times: config.meta.developer.max_get_task_probe_times,
secret_store_private_key: config.meta.secret_store_private_key,
table_info_statistic_history_times: config
.storage
.table_info_statistic_history_times,
},
config.system.into_init_system_params(),
Default::default(),
Expand Down
3 changes: 1 addition & 2 deletions src/meta/src/hummock/manager/commit_epoch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ use crate::hummock::manager::transaction::{
HummockVersionStatsTransaction, HummockVersionTransaction,
};
use crate::hummock::manager::versioning::Versioning;
use crate::hummock::manager::HISTORY_TABLE_INFO_STATISTIC_TIME;
use crate::hummock::metrics_utils::{
get_or_create_local_table_stat, trigger_local_table_stat, trigger_sst_stat,
};
Expand Down Expand Up @@ -482,7 +481,7 @@ impl HummockManager {
let throughput = (stat.total_value_size + stat.total_key_size) as u64;
let entry = table_infos.entry(table_id).or_default();
entry.push_back(throughput);
if entry.len() > HISTORY_TABLE_INFO_STATISTIC_TIME {
if entry.len() > self.env.opts.table_info_statistic_history_times {
entry.pop_front();
}
}
Expand Down
77 changes: 76 additions & 1 deletion src/meta/src/hummock/manager/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
// limitations under the License.

use std::cmp::min;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
use std::sync::{Arc, LazyLock};
use std::time::{Instant, SystemTime};

Expand All @@ -42,6 +42,7 @@ use rand::seq::SliceRandom;
use rand::thread_rng;
use risingwave_common::util::epoch::Epoch;
use risingwave_hummock_sdk::compaction_group::hummock_version_ext::HummockLevelsExt;
use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
use risingwave_hummock_sdk::table_stats::{
add_prost_table_stats_map, purge_prost_table_stats, PbTableStatsMap,
};
Expand Down Expand Up @@ -1557,6 +1558,80 @@ impl HummockManager {
.retain(|table_id, _| compact_task.existing_table_ids.contains(table_id));
}
}

pub async fn try_move_table_to_dedicated_cg(
&self,
table_write_throughput: &HashMap<u32, VecDeque<u64>>,
table_id: &u32,
table_size: &u64,
is_creating_table: bool,
checkpoint_secs: u64,
parent_group_id: u64,
group_size: u64,
) {
let default_group_id: CompactionGroupId = StaticCompactionGroupId::StateDefault.into();
let mv_group_id: CompactionGroupId = StaticCompactionGroupId::MaterializedView.into();
let partition_vnode_count = self.env.opts.partition_vnode_count;
let window_size =
self.env.opts.table_info_statistic_history_times / (checkpoint_secs as usize);

let mut is_high_write_throughput = false;
let mut is_low_write_throughput = true;
if let Some(history) = table_write_throughput.get(table_id) {
if history.len() >= window_size {
is_high_write_throughput = history.iter().all(|throughput| {
*throughput / checkpoint_secs > self.env.opts.table_write_throughput_threshold
});
is_low_write_throughput = history.iter().any(|throughput| {
*throughput / checkpoint_secs < self.env.opts.min_table_split_write_throughput
});
}
}

let state_table_size = *table_size;

// 1. Avoid splitting a creating table
// 2. Avoid splitting a is_low_write_throughput creating table
// 3. Avoid splitting a non-high throughput medium-sized table
if is_creating_table
|| (is_low_write_throughput)
|| (state_table_size < self.env.opts.min_table_split_size && !is_high_write_throughput)
{
return;
}

// do not split a large table and a small table because it would increase IOPS
// of small table.
if parent_group_id != default_group_id && parent_group_id != mv_group_id {
let rest_group_size = group_size - state_table_size;
if rest_group_size < state_table_size
&& rest_group_size < self.env.opts.min_table_split_size
{
return;
}
}

let ret = self
.move_state_table_to_compaction_group(
parent_group_id,
&[*table_id],
partition_vnode_count,
)
.await;
match ret {
Ok(new_group_id) => {
tracing::info!("move state table [{}] from group-{} to group-{} success table_vnode_partition_count {:?}", table_id, parent_group_id, new_group_id, partition_vnode_count);
}
Err(e) => {
tracing::info!(
error = %e.as_report(),
"failed to move state table [{}] from group-{}",
table_id,
parent_group_id,
)
}
}
}
}

#[cfg(any(test, feature = "test"))]
Expand Down
1 change: 0 additions & 1 deletion src/meta/src/hummock/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ pub use compaction::{check_cg_write_limit, WriteLimitType};
pub(crate) use utils::*;

type Snapshot = ArcSwap<HummockSnapshot>;
const HISTORY_TABLE_INFO_STATISTIC_TIME: usize = 240;

// Update to states are performed as follow:
// - Initialize ValTransaction for the meta state to update
Expand Down
96 changes: 2 additions & 94 deletions src/meta/src/hummock/manager/timer_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::{HashMap, HashSet, VecDeque};
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::Duration;

Expand All @@ -22,8 +22,6 @@ use futures::{FutureExt, StreamExt};
use itertools::Itertools;
use risingwave_common::system_param::reader::SystemParamsRead;
use risingwave_hummock_sdk::compaction_group::hummock_version_ext::get_compaction_group_ids;
use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
use risingwave_hummock_sdk::CompactionGroupId;
use risingwave_pb::hummock::compact_task::{self, TaskStatus};
use risingwave_pb::hummock::level_handler::RunningCompactTask;
use rw_futures_util::select_all;
Expand All @@ -33,7 +31,6 @@ use tokio::task::JoinHandle;
use tokio_stream::wrappers::IntervalStream;
use tracing::warn;

use crate::hummock::manager::HISTORY_TABLE_INFO_STATISTIC_TIME;
use crate::hummock::metrics_utils::{trigger_lsm_stat, trigger_mv_stat};
use crate::hummock::{HummockManager, TASK_NORMAL};

Expand Down Expand Up @@ -468,7 +465,7 @@ impl HummockManager {
}

for (table_id, table_size) in &group.table_statistic {
self.calculate_table_align_rule(
self.try_move_table_to_dedicated_cg(
&table_write_throughput,
table_id,
table_size,
Expand All @@ -494,93 +491,4 @@ impl HummockManager {
}
}
}

async fn calculate_table_align_rule(
&self,
table_write_throughput: &HashMap<u32, VecDeque<u64>>,
table_id: &u32,
table_size: &u64,
is_creating_table: bool,
checkpoint_secs: u64,
parent_group_id: u64,
group_size: u64,
) {
let default_group_id: CompactionGroupId = StaticCompactionGroupId::StateDefault.into();
let mv_group_id: CompactionGroupId = StaticCompactionGroupId::MaterializedView.into();
let partition_vnode_count = self.env.opts.partition_vnode_count;
let window_size = HISTORY_TABLE_INFO_STATISTIC_TIME / (checkpoint_secs as usize);

let mut is_high_write_throughput = false;
let mut is_low_write_throughput = true;
if let Some(history) = table_write_throughput.get(table_id) {
if !is_creating_table {
if history.len() >= window_size {
is_high_write_throughput = history.iter().all(|throughput| {
*throughput / checkpoint_secs
> self.env.opts.table_write_throughput_threshold
});
is_low_write_throughput = history.iter().any(|throughput| {
*throughput / checkpoint_secs
< self.env.opts.min_table_split_write_throughput
});
}
} else {
// For creating table, relax the checking restrictions to make the data alignment behavior more sensitive.
let sum = history.iter().sum::<u64>();
is_low_write_throughput = sum
< self.env.opts.min_table_split_write_throughput
* history.len() as u64
* checkpoint_secs;
}
}

let state_table_size = *table_size;

// 1. Avoid splitting a creating table
// 2. Avoid splitting a is_low_write_throughput creating table
// 3. Avoid splitting a non-high throughput medium-sized table
if is_creating_table
|| (is_low_write_throughput)
|| (state_table_size < self.env.opts.min_table_split_size && !is_high_write_throughput)
{
return;
}

// do not split a large table and a small table because it would increase IOPS
// of small table.
if parent_group_id != default_group_id && parent_group_id != mv_group_id {
let rest_group_size = group_size - state_table_size;
if rest_group_size < state_table_size
&& rest_group_size < self.env.opts.min_table_split_size
{
return;
}
}

let ret = self
.move_state_table_to_compaction_group(
parent_group_id,
&[*table_id],
partition_vnode_count,
)
.await;
match ret {
Ok(new_group_id) => {
tracing::info!(
"move state table [{}] from group-{} to group-{} success",
table_id,
parent_group_id,
new_group_id
);
}
Err(e) => {
tracing::info!(
error = %e.as_report(),
"failed to move state table [{}] from group-{}",
table_id,
parent_group_id,
)
}
}
}
}
3 changes: 3 additions & 0 deletions src/meta/src/manager/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,8 @@ pub struct MetaOpts {

// The private key for the secret store, used when the secret is stored in the meta.
pub secret_store_private_key: Vec<u8>,

pub table_info_statistic_history_times: usize,
}

impl MetaOpts {
Expand Down Expand Up @@ -345,6 +347,7 @@ impl MetaOpts {
max_trivial_move_task_count_per_loop: 256,
max_get_task_probe_times: 5,
secret_store_private_key: "demo-secret-private-key".as_bytes().to_vec(),
table_info_statistic_history_times: 240,
}
}
}
Expand Down

0 comments on commit a696216

Please sign in to comment.